1 # Copyright 2016 Red Hat Inc & Xena Networks.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 # Dan Amzulescu, Xena Networks
17 # Christian Trautman, Red Hat Inc.
19 # Usage can be seen below in unit test. This implementation is designed for one
20 # module two port Xena chassis runs only.
27 from collections import OrderedDict
33 import scapy.layers.inet as inet
35 _LOGGER = logging.getLogger(__name__)
36 _LOCALE = locale.getlocale()[1]
39 class XenaJSON(object):
41 Class to modify and read Xena JSON configuration files.
43 def __init__(self, json_path='./profiles/baseconfig.x2544'):
46 :param json_path: path to JSON file to read. Expected files must have
47 two module ports with each port having its own stream config profile.
48 :return: XenaJSON object
50 self.json_data = read_json_file(json_path)
52 self.packet_data = OrderedDict()
53 self.packet_data['layer2'] = None
54 self.packet_data['vlan'] = None
55 self.packet_data['layer3'] = None
56 self.packet_data['layer4'] = None
58 def _add_multistream_layer(self, entity, seg_uuid, stop_value, layer):
60 Add the multi stream layers to the json file based on the layer provided
61 :param entity: Entity to append the segment to in entity list
62 :param seg_uuid: The UUID to attach the multistream layer to
63 :param stop_value: The number of flows to configure
64 :param layer: the layer that the multistream will be attached to
68 2: ('Dst MAC addr', 'Src MAC addr'),
69 3: ('Dest IP Addr', 'Src IP Addr'),
70 4: ('Dest Port', 'Src Port')
75 "Mask": "//8=", # mask of 255/255
78 "StopValue": stop_value,
81 "SegmentId": seg_uuid,
82 "FieldName": field_name[int(layer)][0]
86 "Mask": "//8=", # mask of 255/255
89 "StopValue": stop_value,
92 "SegmentId": seg_uuid,
93 "FieldName": field_name[int(layer)][1]
97 self.json_data['StreamProfileHandler']['EntityList'][entity][
98 'StreamConfig']['HwModifiers'] = (segments)
100 def _create_packet_header(self):
102 Create the scapy packet header based on what has been built in this
103 instance using the set header methods. Return tuple of the two byte
104 arrays, one for each port.
105 :return: Scapy packet headers as bytearrays
107 if not self.packet_data['layer2']:
108 _LOGGER.warning('Using dummy info for layer 2 in Xena JSON file')
109 self.set_header_layer2()
110 packet1, packet2 = (self.packet_data['layer2'][0],
111 self.packet_data['layer2'][1])
112 for packet_header in list(self.packet_data.copy().values())[1:]:
114 packet1 /= packet_header[0]
115 packet2 /= packet_header[1]
116 ret = (bytes(packet1), bytes(packet2))
119 def add_header_segments(self, flows=0, multistream_layer=None):
121 Build the header segments to write to the JSON file.
122 :param flows: Number of flows to configure for multistream if enabled
123 :param multistream_layer: layer to set multistream flows as string.
124 Acceptable values are L2, L3 or L4
127 packet = self._create_packet_header()
131 if self.packet_data['layer2']:
132 # slice out the layer 2 bytes from the packet header byte array
133 layer2 = packet[0][header_pos: len(self.packet_data['layer2'][0])]
134 seg = create_segment(
135 "ETHERNET", encode_byte_array(layer2).decode(_LOCALE))
136 if multistream_layer == 'L2' and flows > 0:
137 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
138 stop_value=flows, layer=2)
140 # now do the other port data with reversed src, dst info
141 layer2 = packet[1][header_pos: len(self.packet_data['layer2'][1])]
142 seg = create_segment(
143 "ETHERNET", encode_byte_array(layer2).decode(_LOCALE))
145 if multistream_layer == 'L2' and flows > 0:
146 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
147 stop_value=flows, layer=2)
148 header_pos = len(layer2)
149 if self.packet_data['vlan']:
150 # slice out the vlan bytes from the packet header byte array
151 vlan = packet[0][header_pos: len(
152 self.packet_data['vlan'][0]) + header_pos]
153 segment1.append(create_segment(
154 "VLAN", encode_byte_array(vlan).decode(_LOCALE)))
155 segment2.append(create_segment(
156 "VLAN", encode_byte_array(vlan).decode(_LOCALE)))
157 header_pos += len(vlan)
158 if self.packet_data['layer3']:
159 # slice out the layer 3 bytes from the packet header byte array
160 layer3 = packet[0][header_pos: len(
161 self.packet_data['layer3'][0]) + header_pos]
162 seg = create_segment(
163 "IP", encode_byte_array(layer3).decode(_LOCALE))
165 if multistream_layer == 'L3' and flows > 0:
166 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
167 stop_value=flows, layer=3)
168 # now do the other port data with reversed src, dst info
169 layer3 = packet[1][header_pos: len(
170 self.packet_data['layer3'][1]) + header_pos]
171 seg = create_segment(
172 "IP", encode_byte_array(layer3).decode(_LOCALE))
174 if multistream_layer == 'L3' and flows > 0:
175 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
176 stop_value=flows, layer=3)
177 header_pos += len(layer3)
178 if self.packet_data['layer4']:
179 # slice out the layer 4 bytes from the packet header byte array
180 layer4 = packet[0][header_pos: len(
181 self.packet_data['layer4'][0]) + header_pos]
182 seg = create_segment(
183 "UDP", encode_byte_array(layer4).decode(_LOCALE))
185 if multistream_layer == 'L4' and flows > 0:
186 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
187 stop_value=flows, layer=4)
188 # now do the other port data with reversed src, dst info
189 layer4 = packet[1][header_pos: len(
190 self.packet_data['layer4'][1]) + header_pos]
191 seg = create_segment(
192 "UDP", encode_byte_array(layer4).decode(_LOCALE))
194 if multistream_layer == 'L4' and flows > 0:
195 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
196 stop_value=flows, layer=4)
197 header_pos += len(layer4)
199 self.json_data['StreamProfileHandler']['EntityList'][0][
200 'StreamConfig']['HeaderSegments'] = segment1
201 self.json_data['StreamProfileHandler']['EntityList'][1][
202 'StreamConfig']['HeaderSegments'] = segment2
204 def disable_back2back_test(self):
206 Disable the rfc2544 back to back test
209 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
212 def disable_throughput_test(self):
214 Disable the rfc2544 throughput test
217 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
220 def enable_back2back_test(self):
222 Enable the rfc2544 back to back test
225 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
228 def enable_throughput_test(self):
230 Enable the rfc2544 throughput test
233 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
236 def set_chassis_info(self, hostname, pwd):
239 :param hostname: hostname as string of ip
240 :param pwd: password to chassis as string
243 self.json_data['ChassisManager']['ChassisList'][0][
244 'HostName'] = hostname
245 self.json_data['ChassisManager']['ChassisList'][0][
248 def set_header_layer2(self, dst_mac='cc:cc:cc:cc:cc:cc',
249 src_mac='bb:bb:bb:bb:bb:bb', **kwargs):
251 Build a scapy Ethernet L2 objects inside instance packet_data structure
252 :param dst_mac: destination mac as string. Example "aa:aa:aa:aa:aa:aa"
253 :param src_mac: source mac as string. Example "bb:bb:bb:bb:bb:bb"
254 :param kwargs: Extra params per scapy usage.
257 self.packet_data['layer2'] = [
258 inet.Ether(dst=dst_mac, src=src_mac, **kwargs),
259 inet.Ether(dst=src_mac, src=dst_mac, **kwargs)]
261 def set_header_layer3(self, src_ip='192.168.0.2', dst_ip='192.168.0.3',
262 protocol='UDP', **kwargs):
264 Build scapy IPV4 L3 objects inside instance packet_data structure
265 :param src_ip: source IP as string in dot notation format
266 :param dst_ip: destination IP as string in dot notation format
267 :param protocol: protocol for l4
268 :param kwargs: Extra params per scapy usage
271 self.packet_data['layer3'] = [
272 inet.IP(src=src_ip, dst=dst_ip, proto=protocol.lower(), **kwargs),
273 inet.IP(src=dst_ip, dst=src_ip, proto=protocol.lower(), **kwargs)]
275 def set_header_layer4_udp(self, source_port, destination_port, **kwargs):
277 Build scapy UDP L4 objects inside instance packet_data structure
278 :param source_port: Source port as int
279 :param destination_port: Destination port as int
280 :param kwargs: Extra params per scapy usage
283 self.packet_data['layer4'] = [
284 inet.UDP(sport=source_port, dport=destination_port, **kwargs),
285 inet.UDP(sport=source_port, dport=destination_port, **kwargs)]
287 def set_header_vlan(self, vlan_id=1, **kwargs):
289 Build a Dot1Q scapy object inside instance packet_data structure
290 :param vlan_id: The VLAN ID
291 :param kwargs: Extra params per scapy usage
294 self.packet_data['vlan'] = [
295 inet.Dot1Q(vlan=vlan_id, **kwargs),
296 inet.Dot1Q(vlan=vlan_id, **kwargs)]
298 def set_port(self, index, module, port):
300 Set the module and port for the 0 index port to use with the test
301 :param index: Index of port to set, 0 = port1, 1=port2, etc..
302 :param module: module location as int
303 :param port: port location in module as int
306 self.json_data['PortHandler']['EntityList'][index]['PortRef'][
307 'ModuleIndex'] = module
308 self.json_data['PortHandler']['EntityList'][index]['PortRef'][
311 def set_port_ip_v4(self, port, ip_addr, netmask, gateway):
314 :param port: port number as int of port to set ip info
315 :param ip_addr: ip address in dot notation format as string
316 :param netmask: cidr number for netmask (ie 24/16/8) as int
317 :param gateway: gateway address in dot notation format
320 available_ports = range(len(
321 self.json_data['PortHandler']['EntityList']))
322 if port not in available_ports:
323 raise ValueError("{}{}{}".format(
324 'Port assignment must be an available port ',
325 'number in baseconfig file. Port=', port))
326 self.json_data['PortHandler']['EntityList'][
327 port]["IpV4Address"] = ip_addr
328 self.json_data['PortHandler']['EntityList'][
329 port]["IpV4Gateway"] = gateway
330 self.json_data['PortHandler']['EntityList'][
331 port]["IpV4RoutingPrefix"] = int(netmask)
333 def set_port_ip_v6(self, port, ip_addr, netmask, gateway):
336 :param port: port number as int of port to set ip info
337 :param ip_addr: ip address as 8 groups of 4 hexadecimal groups separated
339 :param netmask: cidr number for netmask (ie 24/16/8) as int
340 :param gateway: gateway address as string in 8 group of 4 hexadecimal
341 groups separated by a colon.
344 available_ports = range(len(
345 self.json_data['PortHandler']['EntityList']))
346 if port not in available_ports:
347 raise ValueError("{}{}{}".format(
348 'Port assignment must be an available port ',
349 'number in baseconfig file. Port=', port))
350 self.json_data['PortHandler']['EntityList'][
351 port]["IpV6Address"] = ip_addr
352 self.json_data['PortHandler']['EntityList'][
353 port]["IpV6Gateway"] = gateway
354 self.json_data['PortHandler']['EntityList'][
355 port]["IpV6RoutingPrefix"] = int(netmask)
357 def set_test_options_tput(self, packet_sizes, duration, iterations,
358 loss_rate, micro_tpld=False):
360 Set the tput test options
361 :param packet_sizes: List of packet sizes to test, single int entry is
362 acceptable for one packet size testing
363 :param duration: time for each test in seconds as int
364 :param iterations: number of iterations of testing as int
365 :param loss_rate: acceptable loss rate as float
366 :param micro_tpld: boolean if micro_tpld should be enabled or disabled
369 if isinstance(packet_sizes, int):
370 packet_sizes = [packet_sizes]
371 self.json_data['TestOptions']['PacketSizes'][
372 'CustomPacketSizes'] = packet_sizes
373 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
374 'Duration'] = duration
375 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
376 'RateIterationOptions']['AcceptableLoss'] = loss_rate
377 self.json_data['TestOptions']['FlowCreationOptions'][
378 'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
379 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
380 'Iterations'] = iterations
382 def set_test_options_back2back(self, packet_sizes, duration,
383 iterations, startvalue, endvalue,
386 Set the back2back test options
387 :param packet_sizes: List of packet sizes to test, single int entry is
388 acceptable for one packet size testing
389 :param duration: time for each test in seconds as int
390 :param iterations: number of iterations of testing as int
391 :param micro_tpld: boolean if micro_tpld should be enabled or disabled
392 :param StartValue: start value
393 :param EndValue: end value
396 if isinstance(packet_sizes, int):
397 packet_sizes = [packet_sizes]
398 self.json_data['TestOptions']['PacketSizes'][
399 'CustomPacketSizes'] = packet_sizes
400 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
401 'Duration'] = duration
402 self.json_data['TestOptions']['FlowCreationOptions'][
403 'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
404 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
405 'Iterations'] = iterations
406 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
407 'RateSweepOptions']['StartValue'] = startvalue
408 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
409 'RateSweepOptions']['EndValue'] = endvalue
411 def set_topology_blocks(self):
413 Set the test topology to a West to East config for half duplex flow with
414 port 0 as the sender and port 1 as the receiver.
417 self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'BLOCKS'
418 self.json_data['TestOptions']['TopologyConfig'][
419 'Direction'] = 'WEST_EAST'
420 self.json_data['PortHandler']['EntityList'][0][
421 'PortGroup'] = "WEST"
422 self.json_data['PortHandler']['EntityList'][1][
423 'PortGroup'] = "EAST"
425 def set_topology_mesh(self):
427 Set the test topology to Mesh for bi directional full duplex flow
430 self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'MESH'
431 self.json_data['TestOptions']['TopologyConfig']['Direction'] = 'BIDIR'
432 self.json_data['PortHandler']['EntityList'][0][
433 'PortGroup'] = "UNDEFINED"
434 self.json_data['PortHandler']['EntityList'][1][
435 'PortGroup'] = "UNDEFINED"
437 def write_config(self, path='./2bUsed.x2544'):
439 Write the config to out as file
440 :param path: Output file to export the json data to
443 if not write_json_file(self.json_data, path):
444 raise RuntimeError("Could not write out file, please check config")
447 def create_segment(header_type, encode_64_string):
449 Create segment for JSON file
450 :param header_type: Type of header as string
451 :param encode_64_string: 64 byte encoded string value of the hex bytes
452 :return: segment as dictionary
455 "SegmentType": header_type.upper(),
456 "SegmentValue": encode_64_string,
457 "ItemID": str(uuid.uuid4()),
462 def decode_byte_array(enc_str):
463 """ Decodes the base64-encoded string to a byte array
464 :param enc_str: The base64-encoded string representing a byte array
465 :return: The decoded byte array
467 dec_string = base64.b64decode(enc_str)
469 barray.extend(dec_string)
473 def encode_byte_array(byte_arr):
474 """ Encodes the byte array as a base64-encoded string
475 :param byte_arr: A bytearray containing the bytes to convert
476 :return: A base64 encoded string
478 enc_string = base64.b64encode(bytes(byte_arr))
482 def print_json_report(json_data):
484 Print out info from the json data for testing purposes only.
485 :param json_data: json loaded data from json.loads
488 print("<<Xena JSON Config Report>>\n")
490 print("### Chassis Info ###")
491 print("Chassis IP: {}".format(json_data['ChassisManager'][
492 'ChassisList'][0]['HostName']))
493 print("Chassis Password: {}".format(json_data['ChassisManager'][
494 'ChassisList'][0]['Password']))
495 print("### Port Configuration ###")
496 print("Port 1 IPv4:{}/{} gateway:{}".format(
497 json_data['PortHandler']['EntityList'][0]["IpV4Address"],
498 json_data['PortHandler']['EntityList'][0]["IpV4RoutingPrefix"],
499 json_data['PortHandler']['EntityList'][0]["IpV4Gateway"]))
500 print("Port 1 IPv6:{}/{} gateway:{}".format(
501 json_data['PortHandler']['EntityList'][0]["IpV6Address"],
502 json_data['PortHandler']['EntityList'][0]["IpV6RoutingPrefix"],
503 json_data['PortHandler']['EntityList'][0]["IpV6Gateway"]))
504 print("Port 2 IPv4:{}/{} gateway:{}".format(
505 json_data['PortHandler']['EntityList'][1]["IpV4Address"],
506 json_data['PortHandler']['EntityList'][1]["IpV4RoutingPrefix"],
507 json_data['PortHandler']['EntityList'][1]["IpV4Gateway"]))
508 print("Port 2 IPv6:{}/{} gateway:{}".format(
509 json_data['PortHandler']['EntityList'][1]["IpV6Address"],
510 json_data['PortHandler']['EntityList'][1]["IpV6RoutingPrefix"],
511 json_data['PortHandler']['EntityList'][1]["IpV6Gateway"]))
512 print("Port 1: {}/{} group: {}".format(
513 json_data['PortHandler']['EntityList'][0]['PortRef']['ModuleIndex'],
514 json_data['PortHandler']['EntityList'][0]['PortRef']['PortIndex'],
515 json_data['PortHandler']['EntityList'][0]['PortGroup']))
516 print("Port 2: {}/{} group: {}".format(
517 json_data['PortHandler']['EntityList'][1]['PortRef']['ModuleIndex'],
518 json_data['PortHandler']['EntityList'][1]['PortRef']['PortIndex'],
519 json_data['PortHandler']['EntityList'][1]['PortGroup']))
520 print("### Tests Enabled ###")
521 print("Back2Back Enabled: {}".format(json_data['TestOptions'][
522 'TestTypeOptionMap']['Back2Back']['Enabled']))
523 print("Throughput Enabled: {}".format(json_data['TestOptions'][
524 'TestTypeOptionMap']['Throughput']['Enabled']))
525 print("### Test Options ###")
526 print("Test topology: {}/{}".format(
527 json_data['TestOptions']['TopologyConfig']['Topology'],
528 json_data['TestOptions']['TopologyConfig']['Direction']))
529 print("Packet Sizes: {}".format(json_data['TestOptions'][
530 'PacketSizes']['CustomPacketSizes']))
531 print("Test duration: {}".format(json_data['TestOptions'][
532 'TestTypeOptionMap']['Throughput']['Duration']))
533 print("Acceptable loss rate: {}".format(json_data['TestOptions'][
534 'TestTypeOptionMap']['Throughput']['RateIterationOptions'][
536 print("Micro TPLD enabled: {}".format(json_data['TestOptions'][
537 'FlowCreationOptions']['UseMicroTpldOnDemand']))
538 print("Test iterations: {}".format(json_data['TestOptions'][
539 'TestTypeOptionMap']['Throughput']['Iterations']))
540 if 'StreamConfig' in json_data['StreamProfileHandler']['EntityList'][0]:
541 print("### Header segments ###")
542 for seg in json_data['StreamProfileHandler']['EntityList']:
543 for header in seg['StreamConfig']['HeaderSegments']:
544 print("Type: {}".format(
545 header['SegmentType']))
546 print("Value: {}".format(decode_byte_array(
547 header['SegmentValue'])))
548 print("### Multi Stream config ###")
549 for seg in json_data['StreamProfileHandler']['EntityList']:
550 for header in seg['StreamConfig']['HwModifiers']:
552 except KeyError as exc:
553 print("Error setting not found in JSON data: {}".format(exc))
556 def read_json_file(json_file):
558 Read the json file path and return a dictionary of the data
559 :param json_file: path to json file
560 :return: dictionary of json data
563 with open(json_file, 'r', encoding=_LOCALE) as data_file:
564 file_data = json.loads(data_file.read())
565 except ValueError as exc:
566 # general json exception, Python 3.5 adds new exception type
567 _LOGGER.exception("Exception with json read: %s", exc)
569 except IOError as exc:
571 'Exception during file open: %s file=%s', exc, json_file)
576 def write_json_file(json_data, output_path):
578 Write out the dictionary of data to a json file
579 :param json_data: dictionary of json data
580 :param output_path: file path to write output
581 :return: Boolean if success
584 with open(output_path, 'w', encoding=_LOCALE) as fileh:
585 json.dump(json_data, fileh, indent=2, sort_keys=True,
588 except ValueError as exc:
589 # general json exception, Python 3.5 adds new exception type
591 "Exception with json write: %s", exc)
593 except IOError as exc:
595 'Exception during file write: %s file=%s', exc, output_path)
599 if __name__ == "__main__":
600 print("Running UnitTest for XenaJSON")
602 print_json_report(JSON.json_data)
603 JSON.set_chassis_info('192.168.0.5', 'vsperf')
604 JSON.set_port(0, 1, 0)
605 JSON.set_port(1, 1, 1)
606 JSON.set_port_ip_v4(0, '192.168.240.10', 32, '192.168.240.1')
607 JSON.set_port_ip_v4(1, '192.168.240.11', 32, '192.168.240.1')
608 JSON.set_port_ip_v6(0, 'a1a1:a2a2:a3a3:a4a4:a5a5:a6a6:a7a7:a8a8', 128,
609 'a1a1:a2a2:a3a3:a4a4:a5a5:a6a6:a7a7:1111')
610 JSON.set_port_ip_v6(1, 'b1b1:b2b2:b3b3:b4b4:b5b5:b6b6:b7b7:b8b8', 128,
611 'b1b1:b2b2:b3b3:b4b4:b5b5:b6b6:b7b7:1111')
612 JSON.set_header_layer2(dst_mac='dd:dd:dd:dd:dd:dd',
613 src_mac='ee:ee:ee:ee:ee:ee')
614 JSON.set_header_vlan(vlan_id=5)
615 JSON.set_header_layer3(src_ip='192.168.100.2', dst_ip='192.168.100.3',
617 JSON.set_header_layer4_udp(source_port=3000, destination_port=3001)
618 JSON.set_test_options_tput(packet_sizes=[64], duration=10, iterations=1,
619 loss_rate=0.0, micro_tpld=True)
620 JSON.add_header_segments(flows=4000, multistream_layer='L4')
621 JSON.set_topology_blocks()
622 write_json_file(JSON.json_data, './testthis.x2544')
623 JSON = XenaJSON('./testthis.x2544')
624 print_json_report(JSON.json_data)