1 # Copyright 2016-2017 Red Hat Inc & Xena Networks.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 # Dan Amzulescu, Xena Networks
17 # Christian Trautman, Red Hat Inc.
19 # Usage can be seen below in unit test. This implementation is designed for one
20 # module two port Xena chassis runs only.
26 from collections import OrderedDict
31 import scapy.layers.inet as inet
33 from tools.pkt_gen.xena.json import json_utilities
35 _LOGGER = logging.getLogger(__name__)
36 _LOCALE = locale.getlocale()[1]
38 _CURR_DIR = os.path.dirname(os.path.realpath(__file__))
41 class XenaJSON(object):
43 Parent Class to modify and read Xena JSON configuration files.
46 json_path=os.path.join(
47 _CURR_DIR, '../profiles/baseconfig.x2544')):
50 :param json_path: path to JSON file to read. Expected files must have
51 two module ports with each port having its own stream config profile.
52 :return: XenaJSON object
54 self.json_data = json_utilities.read_json_file(json_path)
56 self.packet_data = OrderedDict()
57 self.packet_data['layer2'] = None
58 self.packet_data['vlan'] = None
59 self.packet_data['layer3'] = None
60 self.packet_data['layer4'] = None
62 def _add_multistream_layer(self, entity, seg_uuid, stop_value, layer):
64 Add the multi stream layers to the json file based on the layer provided
65 :param entity: Entity to append the segment to in entity list
66 :param seg_uuid: The UUID to attach the multistream layer to
67 :param stop_value: The number of flows to configure
68 :param layer: the layer that the multistream will be attached to
72 2: ('Dst MAC addr', 'Src MAC addr'),
73 3: ('Dest IP Addr', 'Src IP Addr'),
74 4: ('Dest Port', 'Src Port')
79 "Mask": "//8=", # mask of 255/255
82 "StopValue": stop_value,
85 "SegmentId": seg_uuid,
86 "FieldName": field_name[int(layer)][0]
90 "Mask": "//8=", # mask of 255/255
93 "StopValue": stop_value,
96 "SegmentId": seg_uuid,
97 "FieldName": field_name[int(layer)][1]
101 self.json_data['StreamProfileHandler']['EntityList'][entity][
102 'StreamConfig']['HwModifiers'] = (segments)
104 def _create_packet_header(self):
106 Create the scapy packet header based on what has been built in this
107 instance using the set header methods. Return tuple of the two byte
108 arrays, one for each port.
109 :return: Scapy packet headers as bytearrays
111 if not self.packet_data['layer2']:
112 _LOGGER.warning('Using dummy info for layer 2 in Xena JSON file')
113 self.set_header_layer2()
114 packet1, packet2 = (self.packet_data['layer2'][0],
115 self.packet_data['layer2'][1])
116 for packet_header in list(self.packet_data.copy().values())[1:]:
118 packet1 /= packet_header[0]
119 packet2 /= packet_header[1]
120 ret = (bytes(packet1), bytes(packet2))
123 def add_header_segments(self, flows=0, multistream_layer=None):
125 Build the header segments to write to the JSON file.
126 :param flows: Number of flows to configure for multistream if enabled
127 :param multistream_layer: layer to set multistream flows as string.
128 Acceptable values are L2, L3 or L4
131 packet = self._create_packet_header()
135 if self.packet_data['layer2']:
136 # slice out the layer 2 bytes from the packet header byte array
137 layer2 = packet[0][header_pos: len(self.packet_data['layer2'][0])]
138 seg = json_utilities.create_segment(
139 "ETHERNET", json_utilities.encode_byte_array(layer2).decode(
141 if multistream_layer == 'L2' and flows > 0:
142 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
143 stop_value=flows, layer=2)
145 # now do the other port data with reversed src, dst info
146 layer2 = packet[1][header_pos: len(self.packet_data['layer2'][1])]
147 seg = json_utilities.create_segment(
148 "ETHERNET", json_utilities.encode_byte_array(layer2).decode(
151 if multistream_layer == 'L2' and flows > 0:
152 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
153 stop_value=flows, layer=2)
154 header_pos = len(layer2)
155 if self.packet_data['vlan']:
156 # slice out the vlan bytes from the packet header byte array
157 vlan = packet[0][header_pos: len(
158 self.packet_data['vlan'][0]) + header_pos]
159 segment1.append(json_utilities.create_segment(
160 "VLAN", json_utilities.encode_byte_array(vlan).decode(_LOCALE)))
161 segment2.append(json_utilities.create_segment(
162 "VLAN", json_utilities.encode_byte_array(vlan).decode(_LOCALE)))
163 header_pos += len(vlan)
164 if self.packet_data['layer3']:
165 # slice out the layer 3 bytes from the packet header byte array
166 layer3 = packet[0][header_pos: len(
167 self.packet_data['layer3'][0]) + header_pos]
168 seg = json_utilities.create_segment(
169 "IP", json_utilities.encode_byte_array(layer3).decode(_LOCALE))
171 if multistream_layer == 'L3' and flows > 0:
172 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
173 stop_value=flows, layer=3)
174 # now do the other port data with reversed src, dst info
175 layer3 = packet[1][header_pos: len(
176 self.packet_data['layer3'][1]) + header_pos]
177 seg = json_utilities.create_segment(
178 "IP", json_utilities.encode_byte_array(layer3).decode(_LOCALE))
180 if multistream_layer == 'L3' and flows > 0:
181 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
182 stop_value=flows, layer=3)
183 header_pos += len(layer3)
184 if self.packet_data['layer4']:
185 # slice out the layer 4 bytes from the packet header byte array
186 layer4 = packet[0][header_pos: len(
187 self.packet_data['layer4'][0]) + header_pos]
188 seg = json_utilities.create_segment(
189 "UDP", json_utilities.encode_byte_array(layer4).decode(_LOCALE))
191 if multistream_layer == 'L4' and flows > 0:
192 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
193 stop_value=flows, layer=4)
194 # now do the other port data with reversed src, dst info
195 layer4 = packet[1][header_pos: len(
196 self.packet_data['layer4'][1]) + header_pos]
197 seg = json_utilities.create_segment(
198 "UDP", json_utilities.encode_byte_array(layer4).decode(_LOCALE))
200 if multistream_layer == 'L4' and flows > 0:
201 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
202 stop_value=flows, layer=4)
203 header_pos += len(layer4)
205 self.json_data['StreamProfileHandler']['EntityList'][0][
206 'StreamConfig']['HeaderSegments'] = segment1
207 self.json_data['StreamProfileHandler']['EntityList'][1][
208 'StreamConfig']['HeaderSegments'] = segment2
210 def disable_back2back_test(self):
212 Disable the rfc2544 back to back test
215 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
218 def disable_throughput_test(self):
220 Disable the rfc2544 throughput test
223 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
226 def enable_back2back_test(self):
228 Enable the rfc2544 back to back test
231 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
234 def enable_throughput_test(self):
236 Enable the rfc2544 throughput test
239 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
241 # pylint: disable=too-many-arguments
242 def modify_2544_tput_options(self, initial_value, minimum_value,
243 maximum_value, value_resolution,
244 use_pass_threshhold, pass_threshhold):
246 modify_2544_tput_options
248 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
249 'RateIterationOptions']['InitialValue'] = initial_value
250 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
251 'RateIterationOptions']['MinimumValue'] = minimum_value
252 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
253 'RateIterationOptions']['MaximumValue'] = maximum_value
254 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
255 'RateIterationOptions']['ValueResolution'] = value_resolution
256 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
257 'RateIterationOptions']['UsePassThreshold'] = use_pass_threshhold
258 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
259 'RateIterationOptions']['PassThreshold'] = pass_threshhold
261 def set_chassis_info(self, hostname, pwd):
264 :param hostname: hostname as string of ip
265 :param pwd: password to chassis as string
268 self.json_data['ChassisManager']['ChassisList'][0][
269 'HostName'] = hostname
270 self.json_data['ChassisManager']['ChassisList'][0][
273 def set_header_layer2(self, dst_mac='cc:cc:cc:cc:cc:cc',
274 src_mac='bb:bb:bb:bb:bb:bb', **kwargs):
276 Build a scapy Ethernet L2 objects inside instance packet_data structure
277 :param dst_mac: destination mac as string. Example "aa:aa:aa:aa:aa:aa"
278 :param src_mac: source mac as string. Example "bb:bb:bb:bb:bb:bb"
279 :param kwargs: Extra params per scapy usage.
282 self.packet_data['layer2'] = [
283 inet.Ether(dst=dst_mac, src=src_mac, **kwargs),
284 inet.Ether(dst=src_mac, src=dst_mac, **kwargs)]
286 def set_header_layer3(self, src_ip='192.168.0.2', dst_ip='192.168.0.3',
287 protocol='UDP', **kwargs):
289 Build scapy IPV4 L3 objects inside instance packet_data structure
290 :param src_ip: source IP as string in dot notation format
291 :param dst_ip: destination IP as string in dot notation format
292 :param protocol: protocol for l4
293 :param kwargs: Extra params per scapy usage
296 self.packet_data['layer3'] = [
297 inet.IP(src=src_ip, dst=dst_ip, proto=protocol.lower(), **kwargs),
298 inet.IP(src=dst_ip, dst=src_ip, proto=protocol.lower(), **kwargs)]
300 def set_header_layer4_udp(self, source_port, destination_port, **kwargs):
302 Build scapy UDP L4 objects inside instance packet_data structure
303 :param source_port: Source port as int
304 :param destination_port: Destination port as int
305 :param kwargs: Extra params per scapy usage
308 self.packet_data['layer4'] = [
309 inet.UDP(sport=source_port, dport=destination_port, **kwargs),
310 inet.UDP(sport=source_port, dport=destination_port, **kwargs)]
312 def set_header_vlan(self, vlan_id=1, **kwargs):
314 Build a Dot1Q scapy object inside instance packet_data structure
315 :param vlan_id: The VLAN ID
316 :param kwargs: Extra params per scapy usage
319 self.packet_data['vlan'] = [
320 inet.Dot1Q(vlan=vlan_id, **kwargs),
321 inet.Dot1Q(vlan=vlan_id, **kwargs)]
323 def set_port(self, index, module, port):
325 Set the module and port for the 0 index port to use with the test
326 :param index: Index of port to set, 0 = port1, 1=port2, etc..
327 :param module: module location as int
328 :param port: port location in module as int
331 self.json_data['PortHandler']['EntityList'][index]['PortRef'][
332 'ModuleIndex'] = module
333 self.json_data['PortHandler']['EntityList'][index]['PortRef'][
336 def set_port_ip_v4(self, port, ip_addr, netmask, gateway):
339 :param port: port number as int of port to set ip info
340 :param ip_addr: ip address in dot notation format as string
341 :param netmask: cidr number for netmask (ie 24/16/8) as int
342 :param gateway: gateway address in dot notation format
345 available_ports = range(len(
346 self.json_data['PortHandler']['EntityList']))
347 if port not in available_ports:
348 raise ValueError("{}{}{}".format(
349 'Port assignment must be an available port ',
350 'number in baseconfig file. Port=', port))
351 self.json_data['PortHandler']['EntityList'][
352 port]["IpV4Address"] = ip_addr
353 self.json_data['PortHandler']['EntityList'][
354 port]["IpV4Gateway"] = gateway
355 self.json_data['PortHandler']['EntityList'][
356 port]["IpV4RoutingPrefix"] = int(netmask)
358 def set_port_ip_v6(self, port, ip_addr, netmask, gateway):
361 :param port: port number as int of port to set ip info
362 :param ip_addr: ip address as 8 groups of 4 hexadecimal groups separated
364 :param netmask: cidr number for netmask (ie 24/16/8) as int
365 :param gateway: gateway address as string in 8 group of 4 hexadecimal
366 groups separated by a colon.
369 available_ports = range(len(
370 self.json_data['PortHandler']['EntityList']))
371 if port not in available_ports:
372 raise ValueError("{}{}{}".format(
373 'Port assignment must be an available port ',
374 'number in baseconfig file. Port=', port))
375 self.json_data['PortHandler']['EntityList'][
376 port]["IpV6Address"] = ip_addr
377 self.json_data['PortHandler']['EntityList'][
378 port]["IpV6Gateway"] = gateway
379 self.json_data['PortHandler']['EntityList'][
380 port]["IpV6RoutingPrefix"] = int(netmask)
382 def set_test_options_tput(self, packet_sizes, duration, iterations,
383 loss_rate, micro_tpld=False):
385 Set the tput test options
386 :param packet_sizes: List of packet sizes to test, single int entry is
387 acceptable for one packet size testing
388 :param duration: time for each test in seconds as int
389 :param iterations: number of iterations of testing as int
390 :param loss_rate: acceptable loss rate as float
391 :param micro_tpld: boolean if micro_tpld should be enabled or disabled
394 if isinstance(packet_sizes, int):
395 packet_sizes = [packet_sizes]
396 self.json_data['TestOptions']['PacketSizes'][
397 'CustomPacketSizes'] = packet_sizes
398 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
399 'Duration'] = duration
400 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
401 'RateIterationOptions']['AcceptableLoss'] = loss_rate
402 self.json_data['TestOptions']['FlowCreationOptions'][
403 'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
404 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
405 'Iterations'] = iterations
407 def set_test_options_back2back(self, packet_sizes, duration,
408 iterations, startvalue, endvalue,
411 Set the back2back test options
412 :param packet_sizes: List of packet sizes to test, single int entry is
413 acceptable for one packet size testing
414 :param duration: time for each test in seconds as int
415 :param iterations: number of iterations of testing as int
416 :param micro_tpld: boolean if micro_tpld should be enabled or disabled
417 :param StartValue: start value
418 :param EndValue: end value
421 if isinstance(packet_sizes, int):
422 packet_sizes = [packet_sizes]
423 self.json_data['TestOptions']['PacketSizes'][
424 'CustomPacketSizes'] = packet_sizes
425 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
426 'Duration'] = duration
427 self.json_data['TestOptions']['FlowCreationOptions'][
428 'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
429 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
430 'Iterations'] = iterations
431 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
432 'RateSweepOptions']['StartValue'] = startvalue
433 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
434 'RateSweepOptions']['EndValue'] = endvalue
436 def write_config(self, path='./2bUsed.x2544'):
438 Write the config to out as file
439 :param path: Output file to export the json data to
442 if not json_utilities.write_json_file(self.json_data, path):
443 raise RuntimeError("Could not write out file, please check config")