1 # Copyright 2016 Red Hat Inc & Xena Networks.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 # Dan Amzulescu, Xena Networks
17 # Christian Trautman, Red Hat Inc.
19 # Usage can be seen below in unit test. This implementation is designed for one
20 # module two port Xena chassis runs only.
27 from collections import OrderedDict
33 import scapy.layers.inet as inet
35 _LOGGER = logging.getLogger(__name__)
36 _LOCALE = locale.getlocale()[1]
39 class XenaJSON(object):
41 Class to modify and read Xena JSON configuration files.
43 def __init__(self, json_path='./profiles/baseconfig.x2544'):
46 :param json_path: path to JSON file to read. Expected files must have
47 two module ports with each port having its own stream config profile.
48 :return: XenaJSON object
50 self.json_data = read_json_file(json_path)
52 self.packet_data = OrderedDict()
53 self.packet_data['layer2'] = None
54 self.packet_data['vlan'] = None
55 self.packet_data['layer3'] = None
56 self.packet_data['layer4'] = None
58 def _add_multistream_layer(self, entity, seg_uuid, stop_value, layer):
60 Add the multi stream layers to the json file based on the layer provided
61 :param entity: Entity to append the segment to in entity list
62 :param seg_uuid: The UUID to attach the multistream layer to
63 :param stop_value: The number of flows to configure
64 :param layer: the layer that the multistream will be attached to
68 2: ('Dst MAC addr', 'Src MAC addr'),
69 3: ('Dest IP Addr', 'Src IP Addr'),
70 4: ('Dest Port', 'Src Port')
75 "Mask": "//8=", # mask of 255/255
78 "StopValue": stop_value,
81 "SegmentId": seg_uuid,
82 "FieldName": field_name[int(layer)][0]
86 "Mask": "//8=", # mask of 255/255
89 "StopValue": stop_value,
92 "SegmentId": seg_uuid,
93 "FieldName": field_name[int(layer)][1]
97 self.json_data['StreamProfileHandler']['EntityList'][entity][
98 'StreamConfig']['HwModifiers'] = (segments)
100 def _create_packet_header(self):
102 Create the scapy packet header based on what has been built in this
103 instance using the set header methods. Return tuple of the two byte
104 arrays, one for each port.
105 :return: Scapy packet headers as bytearrays
107 if not self.packet_data['layer2']:
108 _LOGGER.warning('Using dummy info for layer 2 in Xena JSON file')
109 self.set_header_layer2()
110 packet1, packet2 = (self.packet_data['layer2'][0],
111 self.packet_data['layer2'][1])
112 for packet_header in list(self.packet_data.copy().values())[1:]:
114 packet1 /= packet_header[0]
115 packet2 /= packet_header[1]
116 ret = (bytes(packet1), bytes(packet2))
119 def add_header_segments(self, flows=0, multistream_layer=None):
121 Build the header segments to write to the JSON file.
122 :param flows: Number of flows to configure for multistream if enabled
123 :param multistream_layer: layer to set multistream flows as string.
124 Acceptable values are L2, L3 or L4
127 packet = self._create_packet_header()
131 if self.packet_data['layer2']:
132 # slice out the layer 2 bytes from the packet header byte array
133 layer2 = packet[0][header_pos: len(self.packet_data['layer2'][0])]
134 seg = create_segment(
135 "ETHERNET", encode_byte_array(layer2).decode(_LOCALE))
136 if multistream_layer == 'L2' and flows > 0:
137 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
138 stop_value=flows, layer=2)
140 # now do the other port data with reversed src, dst info
141 layer2 = packet[1][header_pos: len(self.packet_data['layer2'][1])]
142 seg = create_segment(
143 "ETHERNET", encode_byte_array(layer2).decode(_LOCALE))
145 if multistream_layer == 'L2' and flows > 0:
146 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
147 stop_value=flows, layer=2)
148 header_pos = len(layer2)
149 if self.packet_data['vlan']:
150 # slice out the vlan bytes from the packet header byte array
151 vlan = packet[0][header_pos: len(
152 self.packet_data['vlan'][0]) + header_pos]
153 segment1.append(create_segment(
154 "VLAN", encode_byte_array(vlan).decode(_LOCALE)))
155 segment2.append(create_segment(
156 "VLAN", encode_byte_array(vlan).decode(_LOCALE)))
157 header_pos += len(vlan)
158 if self.packet_data['layer3']:
159 # slice out the layer 3 bytes from the packet header byte array
160 layer3 = packet[0][header_pos: len(
161 self.packet_data['layer3'][0]) + header_pos]
162 seg = create_segment(
163 "IP", encode_byte_array(layer3).decode(_LOCALE))
165 if multistream_layer == 'L3' and flows > 0:
166 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
167 stop_value=flows, layer=3)
168 # now do the other port data with reversed src, dst info
169 layer3 = packet[1][header_pos: len(
170 self.packet_data['layer3'][1]) + header_pos]
171 seg = create_segment(
172 "IP", encode_byte_array(layer3).decode(_LOCALE))
174 if multistream_layer == 'L3' and flows > 0:
175 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
176 stop_value=flows, layer=3)
177 header_pos += len(layer3)
178 if self.packet_data['layer4']:
179 # slice out the layer 4 bytes from the packet header byte array
180 layer4 = packet[0][header_pos: len(
181 self.packet_data['layer4'][0]) + header_pos]
182 seg = create_segment(
183 "UDP", encode_byte_array(layer4).decode(_LOCALE))
185 if multistream_layer == 'L4' and flows > 0:
186 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
187 stop_value=flows, layer=4)
188 # now do the other port data with reversed src, dst info
189 layer4 = packet[1][header_pos: len(
190 self.packet_data['layer4'][1]) + header_pos]
191 seg = create_segment(
192 "UDP", encode_byte_array(layer4).decode(_LOCALE))
194 if multistream_layer == 'L4' and flows > 0:
195 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
196 stop_value=flows, layer=4)
197 header_pos += len(layer4)
199 self.json_data['StreamProfileHandler']['EntityList'][0][
200 'StreamConfig']['HeaderSegments'] = segment1
201 self.json_data['StreamProfileHandler']['EntityList'][1][
202 'StreamConfig']['HeaderSegments'] = segment2
204 def disable_back2back_test(self):
206 Disable the rfc2544 back to back test
209 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
212 def disable_throughput_test(self):
214 Disable the rfc2544 throughput test
217 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
220 def enable_back2back_test(self):
222 Enable the rfc2544 back to back test
225 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
228 def enable_throughput_test(self):
230 Enable the rfc2544 throughput test
233 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
236 def set_chassis_info(self, hostname, pwd):
239 :param hostname: hostname as string of ip
240 :param pwd: password to chassis as string
243 self.json_data['ChassisManager']['ChassisList'][0][
244 'HostName'] = hostname
245 self.json_data['ChassisManager']['ChassisList'][0][
248 def set_header_layer2(self, dst_mac='cc:cc:cc:cc:cc:cc',
249 src_mac='bb:bb:bb:bb:bb:bb', **kwargs):
251 Build a scapy Ethernet L2 objects inside instance packet_data structure
252 :param dst_mac: destination mac as string. Example "aa:aa:aa:aa:aa:aa"
253 :param src_mac: source mac as string. Example "bb:bb:bb:bb:bb:bb"
254 :param kwargs: Extra params per scapy usage.
257 self.packet_data['layer2'] = [
258 inet.Ether(dst=dst_mac, src=src_mac, **kwargs),
259 inet.Ether(dst=src_mac, src=dst_mac, **kwargs)]
261 def set_header_layer3(self, src_ip='192.168.0.2', dst_ip='192.168.0.3',
262 protocol='UDP', **kwargs):
264 Build scapy IPV4 L3 objects inside instance packet_data structure
265 :param src_ip: source IP as string in dot notation format
266 :param dst_ip: destination IP as string in dot notation format
267 :param protocol: protocol for l4
268 :param kwargs: Extra params per scapy usage
271 self.packet_data['layer3'] = [
272 inet.IP(src=src_ip, dst=dst_ip, proto=protocol.lower(), **kwargs),
273 inet.IP(src=dst_ip, dst=src_ip, proto=protocol.lower(), **kwargs)]
275 def set_header_layer4_udp(self, source_port, destination_port, **kwargs):
277 Build scapy UDP L4 objects inside instance packet_data structure
278 :param source_port: Source port as int
279 :param destination_port: Destination port as int
280 :param kwargs: Extra params per scapy usage
283 self.packet_data['layer4'] = [
284 inet.UDP(sport=source_port, dport=destination_port, **kwargs),
285 inet.UDP(sport=source_port, dport=destination_port, **kwargs)]
287 def set_header_vlan(self, vlan_id=1, **kwargs):
289 Build a Dot1Q scapy object inside instance packet_data structure
290 :param vlan_id: The VLAN ID
291 :param kwargs: Extra params per scapy usage
294 self.packet_data['vlan'] = [
295 inet.Dot1Q(vlan=vlan_id, **kwargs),
296 inet.Dot1Q(vlan=vlan_id, **kwargs)]
298 def set_port(self, index, module, port):
300 Set the module and port for the 0 index port to use with the test
301 :param index: Index of port to set, 0 = port1, 1=port2, etc..
302 :param module: module location as int
303 :param port: port location in module as int
306 self.json_data['PortHandler']['EntityList'][index]['PortRef'][
307 'ModuleIndex'] = module
308 self.json_data['PortHandler']['EntityList'][index]['PortRef'][
311 def set_port_ip_v4(self, port, ip_addr, netmask, gateway):
314 :param port: port number as int of port to set ip info
315 :param ip_addr: ip address in dot notation format as string
316 :param netmask: cidr number for netmask (ie 24/16/8) as int
317 :param gateway: gateway address in dot notation format
320 available_ports = range(len(
321 self.json_data['PortHandler']['EntityList']))
322 if port not in available_ports:
323 raise ValueError("{}{}{}".format(
324 'Port assignment must be an available port ',
325 'number in baseconfig file. Port=', port))
326 self.json_data['PortHandler']['EntityList'][
327 port]["IpV4Address"] = ip_addr
328 self.json_data['PortHandler']['EntityList'][
329 port]["IpV4Gateway"] = gateway
330 self.json_data['PortHandler']['EntityList'][
331 port]["IpV4RoutingPrefix"] = int(netmask)
333 def set_port_ip_v6(self, port, ip_addr, netmask, gateway):
336 :param port: port number as int of port to set ip info
337 :param ip_addr: ip address as 8 groups of 4 hexadecimal groups separated
339 :param netmask: cidr number for netmask (ie 24/16/8) as int
340 :param gateway: gateway address as string in 8 group of 4 hexadecimal
341 groups separated by a colon.
344 available_ports = range(len(
345 self.json_data['PortHandler']['EntityList']))
346 if port not in available_ports:
347 raise ValueError("{}{}{}".format(
348 'Port assignment must be an available port ',
349 'number in baseconfig file. Port=', port))
350 self.json_data['PortHandler']['EntityList'][
351 port]["IpV6Address"] = ip_addr
352 self.json_data['PortHandler']['EntityList'][
353 port]["IpV6Gateway"] = gateway
354 self.json_data['PortHandler']['EntityList'][
355 port]["IpV6RoutingPrefix"] = int(netmask)
357 def set_test_options(self, packet_sizes, duration, iterations, loss_rate,
361 :param packet_sizes: List of packet sizes to test, single int entry is
362 acceptable for one packet size testing
363 :param duration: time for each test in seconds as int
364 :param iterations: number of iterations of testing as int
365 :param loss_rate: acceptable loss rate as float
366 :param micro_tpld: boolean if micro_tpld should be enabled or disabled
369 if isinstance(packet_sizes, int):
370 packet_sizes = [packet_sizes]
371 self.json_data['TestOptions']['PacketSizes'][
372 'CustomPacketSizes'] = packet_sizes
373 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
374 'Duration'] = duration
375 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
376 'RateIterationOptions']['AcceptableLoss'] = loss_rate
377 self.json_data['TestOptions']['FlowCreationOptions'][
378 'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
379 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
380 'Iterations'] = iterations
382 def set_topology_blocks(self):
384 Set the test topology to a West to East config for half duplex flow with
385 port 0 as the sender and port 1 as the receiver.
388 self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'BLOCKS'
389 self.json_data['TestOptions']['TopologyConfig'][
390 'Direction'] = 'WEST_EAST'
391 self.json_data['PortHandler']['EntityList'][0][
392 'PortGroup'] = "WEST"
393 self.json_data['PortHandler']['EntityList'][1][
394 'PortGroup'] = "EAST"
396 def set_topology_mesh(self):
398 Set the test topology to Mesh for bi directional full duplex flow
401 self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'MESH'
402 self.json_data['TestOptions']['TopologyConfig']['Direction'] = 'BIDIR'
403 self.json_data['PortHandler']['EntityList'][0][
404 'PortGroup'] = "UNDEFINED"
405 self.json_data['PortHandler']['EntityList'][1][
406 'PortGroup'] = "UNDEFINED"
408 def write_config(self, path='./2bUsed.x2544'):
410 Write the config to out as file
411 :param path: Output file to export the json data to
414 if not write_json_file(self.json_data, path):
415 raise RuntimeError("Could not write out file, please check config")
418 def create_segment(header_type, encode_64_string):
420 Create segment for JSON file
421 :param header_type: Type of header as string
422 :param encode_64_string: 64 byte encoded string value of the hex bytes
423 :return: segment as dictionary
426 "SegmentType": header_type.upper(),
427 "SegmentValue": encode_64_string,
428 "ItemID": str(uuid.uuid4()),
433 def decode_byte_array(enc_str):
434 """ Decodes the base64-encoded string to a byte array
435 :param enc_str: The base64-encoded string representing a byte array
436 :return: The decoded byte array
438 dec_string = base64.b64decode(enc_str)
440 barray.extend(dec_string)
444 def encode_byte_array(byte_arr):
445 """ Encodes the byte array as a base64-encoded string
446 :param byte_arr: A bytearray containing the bytes to convert
447 :return: A base64 encoded string
449 enc_string = base64.b64encode(bytes(byte_arr))
453 def print_json_report(json_data):
455 Print out info from the json data for testing purposes only.
456 :param json_data: json loaded data from json.loads
459 print("<<Xena JSON Config Report>>\n")
461 print("### Chassis Info ###")
462 print("Chassis IP: {}".format(json_data['ChassisManager'][
463 'ChassisList'][0]['HostName']))
464 print("Chassis Password: {}".format(json_data['ChassisManager'][
465 'ChassisList'][0]['Password']))
466 print("### Port Configuration ###")
467 print("Port 1 IPv4:{}/{} gateway:{}".format(
468 json_data['PortHandler']['EntityList'][0]["IpV4Address"],
469 json_data['PortHandler']['EntityList'][0]["IpV4RoutingPrefix"],
470 json_data['PortHandler']['EntityList'][0]["IpV4Gateway"]))
471 print("Port 1 IPv6:{}/{} gateway:{}".format(
472 json_data['PortHandler']['EntityList'][0]["IpV6Address"],
473 json_data['PortHandler']['EntityList'][0]["IpV6RoutingPrefix"],
474 json_data['PortHandler']['EntityList'][0]["IpV6Gateway"]))
475 print("Port 2 IPv4:{}/{} gateway:{}".format(
476 json_data['PortHandler']['EntityList'][1]["IpV4Address"],
477 json_data['PortHandler']['EntityList'][1]["IpV4RoutingPrefix"],
478 json_data['PortHandler']['EntityList'][1]["IpV4Gateway"]))
479 print("Port 2 IPv6:{}/{} gateway:{}".format(
480 json_data['PortHandler']['EntityList'][1]["IpV6Address"],
481 json_data['PortHandler']['EntityList'][1]["IpV6RoutingPrefix"],
482 json_data['PortHandler']['EntityList'][1]["IpV6Gateway"]))
483 print("Port 1: {}/{} group: {}".format(
484 json_data['PortHandler']['EntityList'][0]['PortRef']['ModuleIndex'],
485 json_data['PortHandler']['EntityList'][0]['PortRef']['PortIndex'],
486 json_data['PortHandler']['EntityList'][0]['PortGroup']))
487 print("Port 2: {}/{} group: {}".format(
488 json_data['PortHandler']['EntityList'][1]['PortRef']['ModuleIndex'],
489 json_data['PortHandler']['EntityList'][1]['PortRef']['PortIndex'],
490 json_data['PortHandler']['EntityList'][1]['PortGroup']))
491 print("### Tests Enabled ###")
492 print("Back2Back Enabled: {}".format(json_data['TestOptions'][
493 'TestTypeOptionMap']['Back2Back']['Enabled']))
494 print("Throughput Enabled: {}".format(json_data['TestOptions'][
495 'TestTypeOptionMap']['Throughput']['Enabled']))
496 print("### Test Options ###")
497 print("Test topology: {}/{}".format(
498 json_data['TestOptions']['TopologyConfig']['Topology'],
499 json_data['TestOptions']['TopologyConfig']['Direction']))
500 print("Packet Sizes: {}".format(json_data['TestOptions'][
501 'PacketSizes']['CustomPacketSizes']))
502 print("Test duration: {}".format(json_data['TestOptions'][
503 'TestTypeOptionMap']['Throughput']['Duration']))
504 print("Acceptable loss rate: {}".format(json_data['TestOptions'][
505 'TestTypeOptionMap']['Throughput']['RateIterationOptions'][
507 print("Micro TPLD enabled: {}".format(json_data['TestOptions'][
508 'FlowCreationOptions']['UseMicroTpldOnDemand']))
509 print("Test iterations: {}".format(json_data['TestOptions'][
510 'TestTypeOptionMap']['Throughput']['Iterations']))
511 if 'StreamConfig' in json_data['StreamProfileHandler']['EntityList'][0]:
512 print("### Header segments ###")
513 for seg in json_data['StreamProfileHandler']['EntityList']:
514 for header in seg['StreamConfig']['HeaderSegments']:
515 print("Type: {}".format(
516 header['SegmentType']))
517 print("Value: {}".format(decode_byte_array(
518 header['SegmentValue'])))
519 print("### Multi Stream config ###")
520 for seg in json_data['StreamProfileHandler']['EntityList']:
521 for header in seg['StreamConfig']['HwModifiers']:
523 except KeyError as exc:
524 print("Error setting not found in JSON data: {}".format(exc))
527 def read_json_file(json_file):
529 Read the json file path and return a dictionary of the data
530 :param json_file: path to json file
531 :return: dictionary of json data
534 with open(json_file, 'r', encoding=_LOCALE) as data_file:
535 file_data = json.loads(data_file.read())
536 except ValueError as exc:
537 # general json exception, Python 3.5 adds new exception type
538 _LOGGER.exception("Exception with json read: %s", exc)
540 except IOError as exc:
542 'Exception during file open: %s file=%s', exc, json_file)
547 def write_json_file(json_data, output_path):
549 Write out the dictionary of data to a json file
550 :param json_data: dictionary of json data
551 :param output_path: file path to write output
552 :return: Boolean if success
555 with open(output_path, 'w', encoding=_LOCALE) as fileh:
556 json.dump(json_data, fileh, indent=2, sort_keys=True,
559 except ValueError as exc:
560 # general json exception, Python 3.5 adds new exception type
562 "Exception with json write: %s", exc)
564 except IOError as exc:
566 'Exception during file write: %s file=%s', exc, output_path)
570 if __name__ == "__main__":
571 print("Running UnitTest for XenaJSON")
573 print_json_report(JSON.json_data)
574 JSON.set_chassis_info('192.168.0.5', 'vsperf')
575 JSON.set_port(0, 1, 0)
576 JSON.set_port(1, 1, 1)
577 JSON.set_port_ip_v4(0, '192.168.240.10', 32, '192.168.240.1')
578 JSON.set_port_ip_v4(1, '192.168.240.11', 32, '192.168.240.1')
579 JSON.set_port_ip_v6(0, 'a1a1:a2a2:a3a3:a4a4:a5a5:a6a6:a7a7:a8a8', 128,
580 'a1a1:a2a2:a3a3:a4a4:a5a5:a6a6:a7a7:1111')
581 JSON.set_port_ip_v6(1, 'b1b1:b2b2:b3b3:b4b4:b5b5:b6b6:b7b7:b8b8', 128,
582 'b1b1:b2b2:b3b3:b4b4:b5b5:b6b6:b7b7:1111')
583 JSON.set_header_layer2(dst_mac='dd:dd:dd:dd:dd:dd',
584 src_mac='ee:ee:ee:ee:ee:ee')
585 JSON.set_header_vlan(vlan_id=5)
586 JSON.set_header_layer3(src_ip='192.168.100.2', dst_ip='192.168.100.3',
588 JSON.set_header_layer4_udp(source_port=3000, destination_port=3001)
589 JSON.set_test_options(packet_sizes=[64], duration=10, iterations=1,
590 loss_rate=0.0, micro_tpld=True)
591 JSON.add_header_segments(flows=4000, multistream_layer='L4')
592 JSON.set_topology_blocks()
593 write_json_file(JSON.json_data, './testthis.x2544')
594 JSON = XenaJSON('./testthis.x2544')
595 print_json_report(JSON.json_data)