Merge "xena_bug: Resolve baseconfig reporting option to work with new versions"
[vswitchperf.git] / tools / pkt_gen / xena / xena_json.py
1 # Copyright 2016 Red Hat Inc & Xena Networks.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #   http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 # Contributors:
16 #   Dan Amzulescu, Xena Networks
17 #   Christian Trautman, Red Hat Inc.
18 #
19 # Usage can be seen below in unit test. This implementation is designed for one
20 # module two port Xena chassis runs only.
21
22 """
23 Xena JSON module
24 """
25
26 import base64
27 from collections import OrderedDict
28 import json
29 import locale
30 import logging
31 import uuid
32
33 import scapy.layers.inet as inet
34
35 _LOGGER = logging.getLogger(__name__)
36 _LOCALE = locale.getlocale()[1]
37
38
39 class XenaJSON(object):
40     """
41     Class to modify and read Xena JSON configuration files.
42     """
43     def __init__(self, json_path='./profiles/baseconfig.x2544'):
44         """
45         Constructor
46         :param json_path: path to JSON file to read. Expected files must have
47          two module ports with each port having its own stream config profile.
48         :return: XenaJSON object
49         """
50         self.json_data = read_json_file(json_path)
51
52         self.packet_data = OrderedDict()
53         self.packet_data['layer2'] = None
54         self.packet_data['vlan'] = None
55         self.packet_data['layer3'] = None
56         self.packet_data['layer4'] = None
57
58     def _add_multistream_layer(self, entity, seg_uuid, stop_value, layer):
59         """
60         Add the multi stream layers to the json file based on the layer provided
61         :param entity: Entity to append the segment to in entity list
62         :param seg_uuid: The UUID to attach the multistream layer to
63         :param stop_value: The number of flows to configure
64         :param layer: the layer that the multistream will be attached to
65         :return: None
66         """
67         field_name = {
68             2: ('Dst MAC addr', 'Src MAC addr'),
69             3: ('Dest IP Addr', 'Src IP Addr'),
70             4: ('Dest Port', 'Src Port')
71         }
72         segments = [
73             {
74                 "Offset": 0,
75                 "Mask": "//8=",  # mask of 255/255
76                 "Action": "INC",
77                 "StartValue": 0,
78                 "StopValue": stop_value,
79                 "StepValue": 1,
80                 "RepeatCount": 1,
81                 "SegmentId": seg_uuid,
82                 "FieldName": field_name[int(layer)][0]
83             },
84             {
85                 "Offset": 0,
86                 "Mask": "//8=",  # mask of 255/255
87                 "Action": "INC",
88                 "StartValue": 0,
89                 "StopValue": stop_value,
90                 "StepValue": 1,
91                 "RepeatCount": 1,
92                 "SegmentId": seg_uuid,
93                 "FieldName": field_name[int(layer)][1]
94             }
95         ]
96
97         self.json_data['StreamProfileHandler']['EntityList'][entity][
98             'StreamConfig']['HwModifiers'] = (segments)
99
100     def _create_packet_header(self):
101         """
102         Create the scapy packet header based on what has been built in this
103         instance using the set header methods. Return tuple of the two byte
104         arrays, one for each port.
105         :return: Scapy packet headers as bytearrays
106         """
107         if not self.packet_data['layer2']:
108             _LOGGER.warning('Using dummy info for layer 2 in Xena JSON file')
109             self.set_header_layer2()
110         packet1, packet2 = (self.packet_data['layer2'][0],
111                             self.packet_data['layer2'][1])
112         for packet_header in list(self.packet_data.copy().values())[1:]:
113             if packet_header:
114                 packet1 /= packet_header[0]
115                 packet2 /= packet_header[1]
116         ret = (bytes(packet1), bytes(packet2))
117         return ret
118
119     def add_header_segments(self, flows=0, multistream_layer=None):
120         """
121         Build the header segments to write to the JSON file.
122         :param flows: Number of flows to configure for multistream if enabled
123         :param multistream_layer: layer to set multistream flows as string.
124         Acceptable values are L2, L3 or L4
125         :return: None
126         """
127         packet = self._create_packet_header()
128         segment1 = list()
129         segment2 = list()
130         header_pos = 0
131         if self.packet_data['layer2']:
132             # slice out the layer 2 bytes from the packet header byte array
133             layer2 = packet[0][header_pos: len(self.packet_data['layer2'][0])]
134             seg = create_segment(
135                 "ETHERNET", encode_byte_array(layer2).decode(_LOCALE))
136             if multistream_layer == 'L2' and flows > 0:
137                 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
138                                             stop_value=flows, layer=2)
139             segment1.append(seg)
140             # now do the other port data with reversed src, dst info
141             layer2 = packet[1][header_pos: len(self.packet_data['layer2'][1])]
142             seg = create_segment(
143                 "ETHERNET", encode_byte_array(layer2).decode(_LOCALE))
144             segment2.append(seg)
145             if multistream_layer == 'L2' and flows > 0:
146                 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
147                                             stop_value=flows, layer=2)
148             header_pos = len(layer2)
149         if self.packet_data['vlan']:
150             # slice out the vlan bytes from the packet header byte array
151             vlan = packet[0][header_pos: len(
152                 self.packet_data['vlan'][0]) + header_pos]
153             segment1.append(create_segment(
154                 "VLAN", encode_byte_array(vlan).decode(_LOCALE)))
155             segment2.append(create_segment(
156                 "VLAN", encode_byte_array(vlan).decode(_LOCALE)))
157             header_pos += len(vlan)
158         if self.packet_data['layer3']:
159             # slice out the layer 3 bytes from the packet header byte array
160             layer3 = packet[0][header_pos: len(
161                 self.packet_data['layer3'][0]) + header_pos]
162             seg = create_segment(
163                 "IP", encode_byte_array(layer3).decode(_LOCALE))
164             segment1.append(seg)
165             if multistream_layer == 'L3' and flows > 0:
166                 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
167                                             stop_value=flows, layer=3)
168             # now do the other port data with reversed src, dst info
169             layer3 = packet[1][header_pos: len(
170                 self.packet_data['layer3'][1]) + header_pos]
171             seg = create_segment(
172                 "IP", encode_byte_array(layer3).decode(_LOCALE))
173             segment2.append(seg)
174             if multistream_layer == 'L3' and flows > 0:
175                 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
176                                             stop_value=flows, layer=3)
177             header_pos += len(layer3)
178         if self.packet_data['layer4']:
179             # slice out the layer 4 bytes from the packet header byte array
180             layer4 = packet[0][header_pos: len(
181                 self.packet_data['layer4'][0]) + header_pos]
182             seg = create_segment(
183                 "UDP", encode_byte_array(layer4).decode(_LOCALE))
184             segment1.append(seg)
185             if multistream_layer == 'L4' and flows > 0:
186                 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
187                                             stop_value=flows, layer=4)
188             # now do the other port data with reversed src, dst info
189             layer4 = packet[1][header_pos: len(
190                 self.packet_data['layer4'][1]) + header_pos]
191             seg = create_segment(
192                 "UDP", encode_byte_array(layer4).decode(_LOCALE))
193             segment2.append(seg)
194             if multistream_layer == 'L4' and flows > 0:
195                 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
196                                             stop_value=flows, layer=4)
197             header_pos += len(layer4)
198
199         self.json_data['StreamProfileHandler']['EntityList'][0][
200             'StreamConfig']['HeaderSegments'] = segment1
201         self.json_data['StreamProfileHandler']['EntityList'][1][
202             'StreamConfig']['HeaderSegments'] = segment2
203
204     def disable_back2back_test(self):
205         """
206         Disable the rfc2544 back to back test
207         :return: None
208         """
209         self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
210             'Enabled'] = 'false'
211
212     def disable_throughput_test(self):
213         """
214         Disable the rfc2544 throughput test
215         :return: None
216         """
217         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
218             'Enabled'] = 'false'
219
220     def enable_back2back_test(self):
221         """
222         Enable the rfc2544 back to back test
223         :return: None
224         """
225         self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
226             'Enabled'] = 'true'
227
228     def enable_throughput_test(self):
229         """
230         Enable the rfc2544 throughput test
231         :return: None
232         """
233         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
234             'Enabled'] = 'true'
235
236     def set_chassis_info(self, hostname, pwd):
237         """
238         Set the chassis info
239         :param hostname: hostname as string of ip
240         :param pwd: password to chassis as string
241         :return: None
242         """
243         self.json_data['ChassisManager']['ChassisList'][0][
244             'HostName'] = hostname
245         self.json_data['ChassisManager']['ChassisList'][0][
246             'Password'] = pwd
247
248     def set_header_layer2(self, dst_mac='cc:cc:cc:cc:cc:cc',
249                           src_mac='bb:bb:bb:bb:bb:bb', **kwargs):
250         """
251         Build a scapy Ethernet L2 objects inside instance packet_data structure
252         :param dst_mac: destination mac as string. Example "aa:aa:aa:aa:aa:aa"
253         :param src_mac: source mac as string. Example "bb:bb:bb:bb:bb:bb"
254         :param kwargs: Extra params per scapy usage.
255         :return: None
256         """
257         self.packet_data['layer2'] = [
258             inet.Ether(dst=dst_mac, src=src_mac, **kwargs),
259             inet.Ether(dst=src_mac, src=dst_mac, **kwargs)]
260
261     def set_header_layer3(self, src_ip='192.168.0.2', dst_ip='192.168.0.3',
262                           protocol='UDP', **kwargs):
263         """
264         Build scapy IPV4 L3 objects inside instance packet_data structure
265         :param src_ip: source IP as string in dot notation format
266         :param dst_ip: destination IP as string in dot notation format
267         :param protocol: protocol for l4
268         :param kwargs: Extra params per scapy usage
269         :return: None
270         """
271         self.packet_data['layer3'] = [
272             inet.IP(src=src_ip, dst=dst_ip, proto=protocol.lower(), **kwargs),
273             inet.IP(src=dst_ip, dst=src_ip, proto=protocol.lower(), **kwargs)]
274
275     def set_header_layer4_udp(self, source_port, destination_port, **kwargs):
276         """
277         Build scapy UDP L4 objects inside instance packet_data structure
278         :param source_port: Source port as int
279         :param destination_port: Destination port as int
280         :param kwargs: Extra params per scapy usage
281         :return: None
282         """
283         self.packet_data['layer4'] = [
284             inet.UDP(sport=source_port, dport=destination_port, **kwargs),
285             inet.UDP(sport=source_port, dport=destination_port, **kwargs)]
286
287     def set_header_vlan(self, vlan_id=1, **kwargs):
288         """
289         Build a Dot1Q scapy object inside instance packet_data structure
290         :param vlan_id: The VLAN ID
291         :param kwargs: Extra params per scapy usage
292         :return: None
293         """
294         self.packet_data['vlan'] = [
295             inet.Dot1Q(vlan=vlan_id, **kwargs),
296             inet.Dot1Q(vlan=vlan_id, **kwargs)]
297
298     def set_port(self, index, module, port):
299         """
300         Set the module and port for the 0 index port to use with the test
301         :param index: Index of port to set, 0 = port1, 1=port2, etc..
302         :param module: module location as int
303         :param port: port location in module as int
304         :return: None
305         """
306         self.json_data['PortHandler']['EntityList'][index]['PortRef'][
307             'ModuleIndex'] = module
308         self.json_data['PortHandler']['EntityList'][index]['PortRef'][
309             'PortIndex'] = port
310
311     def set_port_ip_v4(self, port, ip_addr, netmask, gateway):
312         """
313         Set the port IP info
314         :param port: port number as int of port to set ip info
315         :param ip_addr: ip address in dot notation format as string
316         :param netmask: cidr number for netmask (ie 24/16/8) as int
317         :param gateway: gateway address in dot notation format
318         :return: None
319         """
320         available_ports = range(len(
321             self.json_data['PortHandler']['EntityList']))
322         if port not in available_ports:
323             raise ValueError("{}{}{}".format(
324                 'Port assignment must be an available port ',
325                 'number in baseconfig file. Port=', port))
326         self.json_data['PortHandler']['EntityList'][
327             port]["IpV4Address"] = ip_addr
328         self.json_data['PortHandler']['EntityList'][
329             port]["IpV4Gateway"] = gateway
330         self.json_data['PortHandler']['EntityList'][
331             port]["IpV4RoutingPrefix"] = int(netmask)
332
333     def set_port_ip_v6(self, port, ip_addr, netmask, gateway):
334         """
335         Set the port IP info
336         :param port: port number as int of port to set ip info
337         :param ip_addr: ip address as 8 groups of 4 hexadecimal groups separated
338          by a colon.
339         :param netmask: cidr number for netmask (ie 24/16/8) as int
340         :param gateway: gateway address as string in 8 group of 4 hexadecimal
341                         groups separated by a colon.
342         :return: None
343         """
344         available_ports = range(len(
345             self.json_data['PortHandler']['EntityList']))
346         if port not in available_ports:
347             raise ValueError("{}{}{}".format(
348                 'Port assignment must be an available port ',
349                 'number in baseconfig file. Port=', port))
350         self.json_data['PortHandler']['EntityList'][
351             port]["IpV6Address"] = ip_addr
352         self.json_data['PortHandler']['EntityList'][
353             port]["IpV6Gateway"] = gateway
354         self.json_data['PortHandler']['EntityList'][
355             port]["IpV6RoutingPrefix"] = int(netmask)
356
357     def set_test_options_tput(self, packet_sizes, duration, iterations,
358                               loss_rate, micro_tpld=False):
359         """
360         Set the tput test options
361         :param packet_sizes: List of packet sizes to test, single int entry is
362          acceptable for one packet size testing
363         :param duration: time for each test in seconds as int
364         :param iterations: number of iterations of testing as int
365         :param loss_rate: acceptable loss rate as float
366         :param micro_tpld: boolean if micro_tpld should be enabled or disabled
367         :return: None
368         """
369         if isinstance(packet_sizes, int):
370             packet_sizes = [packet_sizes]
371         self.json_data['TestOptions']['PacketSizes'][
372             'CustomPacketSizes'] = packet_sizes
373         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
374             'Duration'] = duration
375         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
376             'RateIterationOptions']['AcceptableLoss'] = loss_rate
377         self.json_data['TestOptions']['FlowCreationOptions'][
378             'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
379         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
380             'Iterations'] = iterations
381
382     def set_test_options_back2back(self, packet_sizes, duration,
383                                    iterations, startvalue, endvalue,
384                                    micro_tpld=False):
385         """
386         Set the back2back test options
387         :param packet_sizes: List of packet sizes to test, single int entry is
388          acceptable for one packet size testing
389         :param duration: time for each test in seconds as int
390         :param iterations: number of iterations of testing as int
391         :param micro_tpld: boolean if micro_tpld should be enabled or disabled
392         :param StartValue: start value
393         :param EndValue: end value
394         :return: None
395         """
396         if isinstance(packet_sizes, int):
397             packet_sizes = [packet_sizes]
398         self.json_data['TestOptions']['PacketSizes'][
399             'CustomPacketSizes'] = packet_sizes
400         self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
401             'Duration'] = duration
402         self.json_data['TestOptions']['FlowCreationOptions'][
403             'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
404         self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
405             'Iterations'] = iterations
406         self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
407             'RateSweepOptions']['StartValue'] = startvalue
408         self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
409             'RateSweepOptions']['EndValue'] = endvalue
410
411     def set_topology_blocks(self):
412         """
413         Set the test topology to a West to East config for half duplex flow with
414         port 0 as the sender and port 1 as the receiver.
415         :return: None
416         """
417         self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'BLOCKS'
418         self.json_data['TestOptions']['TopologyConfig'][
419             'Direction'] = 'WEST_EAST'
420         self.json_data['PortHandler']['EntityList'][0][
421             'PortGroup'] = "WEST"
422         self.json_data['PortHandler']['EntityList'][1][
423             'PortGroup'] = "EAST"
424
425     def set_topology_mesh(self):
426         """
427         Set the test topology to Mesh for bi directional full duplex flow
428         :return: None
429         """
430         self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'MESH'
431         self.json_data['TestOptions']['TopologyConfig']['Direction'] = 'BIDIR'
432         self.json_data['PortHandler']['EntityList'][0][
433             'PortGroup'] = "UNDEFINED"
434         self.json_data['PortHandler']['EntityList'][1][
435             'PortGroup'] = "UNDEFINED"
436
437     def write_config(self, path='./2bUsed.x2544'):
438         """
439         Write the config to out as file
440         :param path: Output file to export the json data to
441         :return: None
442         """
443         if not write_json_file(self.json_data, path):
444             raise RuntimeError("Could not write out file, please check config")
445
446
447 def create_segment(header_type, encode_64_string):
448     """
449     Create segment for JSON file
450     :param header_type: Type of header as string
451     :param encode_64_string: 64 byte encoded string value of the hex bytes
452     :return: segment as dictionary
453     """
454     return {
455         "SegmentType": header_type.upper(),
456         "SegmentValue": encode_64_string,
457         "ItemID": str(uuid.uuid4()),
458         "ParentID": "",
459         "Label": ""}
460
461
462 def decode_byte_array(enc_str):
463     """ Decodes the base64-encoded string to a byte array
464     :param enc_str: The base64-encoded string representing a byte array
465     :return: The decoded byte array
466     """
467     dec_string = base64.b64decode(enc_str)
468     barray = bytearray()
469     barray.extend(dec_string)
470     return barray
471
472
473 def encode_byte_array(byte_arr):
474     """ Encodes the byte array as a base64-encoded string
475     :param byte_arr: A bytearray containing the bytes to convert
476     :return: A base64 encoded string
477     """
478     enc_string = base64.b64encode(bytes(byte_arr))
479     return enc_string
480
481
482 def print_json_report(json_data):
483     """
484     Print out info from the json data for testing purposes only.
485     :param json_data: json loaded data from json.loads
486     :return: None
487     """
488     print("<<Xena JSON Config Report>>\n")
489     try:
490         print("### Chassis Info ###")
491         print("Chassis IP: {}".format(json_data['ChassisManager'][
492             'ChassisList'][0]['HostName']))
493         print("Chassis Password: {}".format(json_data['ChassisManager'][
494             'ChassisList'][0]['Password']))
495         print("### Port Configuration ###")
496         print("Port 1 IPv4:{}/{} gateway:{}".format(
497             json_data['PortHandler']['EntityList'][0]["IpV4Address"],
498             json_data['PortHandler']['EntityList'][0]["IpV4RoutingPrefix"],
499             json_data['PortHandler']['EntityList'][0]["IpV4Gateway"]))
500         print("Port 1 IPv6:{}/{} gateway:{}".format(
501             json_data['PortHandler']['EntityList'][0]["IpV6Address"],
502             json_data['PortHandler']['EntityList'][0]["IpV6RoutingPrefix"],
503             json_data['PortHandler']['EntityList'][0]["IpV6Gateway"]))
504         print("Port 2 IPv4:{}/{} gateway:{}".format(
505             json_data['PortHandler']['EntityList'][1]["IpV4Address"],
506             json_data['PortHandler']['EntityList'][1]["IpV4RoutingPrefix"],
507             json_data['PortHandler']['EntityList'][1]["IpV4Gateway"]))
508         print("Port 2 IPv6:{}/{} gateway:{}".format(
509             json_data['PortHandler']['EntityList'][1]["IpV6Address"],
510             json_data['PortHandler']['EntityList'][1]["IpV6RoutingPrefix"],
511             json_data['PortHandler']['EntityList'][1]["IpV6Gateway"]))
512         print("Port 1: {}/{} group: {}".format(
513             json_data['PortHandler']['EntityList'][0]['PortRef']['ModuleIndex'],
514             json_data['PortHandler']['EntityList'][0]['PortRef']['PortIndex'],
515             json_data['PortHandler']['EntityList'][0]['PortGroup']))
516         print("Port 2: {}/{} group: {}".format(
517             json_data['PortHandler']['EntityList'][1]['PortRef']['ModuleIndex'],
518             json_data['PortHandler']['EntityList'][1]['PortRef']['PortIndex'],
519             json_data['PortHandler']['EntityList'][1]['PortGroup']))
520         print("### Tests Enabled ###")
521         print("Back2Back Enabled: {}".format(json_data['TestOptions'][
522             'TestTypeOptionMap']['Back2Back']['Enabled']))
523         print("Throughput Enabled: {}".format(json_data['TestOptions'][
524             'TestTypeOptionMap']['Throughput']['Enabled']))
525         print("### Test Options ###")
526         print("Test topology: {}/{}".format(
527             json_data['TestOptions']['TopologyConfig']['Topology'],
528             json_data['TestOptions']['TopologyConfig']['Direction']))
529         print("Packet Sizes: {}".format(json_data['TestOptions'][
530             'PacketSizes']['CustomPacketSizes']))
531         print("Test duration: {}".format(json_data['TestOptions'][
532             'TestTypeOptionMap']['Throughput']['Duration']))
533         print("Acceptable loss rate: {}".format(json_data['TestOptions'][
534             'TestTypeOptionMap']['Throughput']['RateIterationOptions'][
535                 'AcceptableLoss']))
536         print("Micro TPLD enabled: {}".format(json_data['TestOptions'][
537             'FlowCreationOptions']['UseMicroTpldOnDemand']))
538         print("Test iterations: {}".format(json_data['TestOptions'][
539             'TestTypeOptionMap']['Throughput']['Iterations']))
540         if 'StreamConfig' in json_data['StreamProfileHandler']['EntityList'][0]:
541             print("### Header segments ###")
542             for seg in json_data['StreamProfileHandler']['EntityList']:
543                 for header in seg['StreamConfig']['HeaderSegments']:
544                     print("Type: {}".format(
545                         header['SegmentType']))
546                     print("Value: {}".format(decode_byte_array(
547                         header['SegmentValue'])))
548             print("### Multi Stream config ###")
549             for seg in json_data['StreamProfileHandler']['EntityList']:
550                 for header in seg['StreamConfig']['HwModifiers']:
551                     print(header)
552     except KeyError as exc:
553         print("Error setting not found in JSON data: {}".format(exc))
554
555
556 def read_json_file(json_file):
557     """
558     Read the json file path and return a dictionary of the data
559     :param json_file: path to json file
560     :return: dictionary of json data
561     """
562     try:
563         with open(json_file, 'r', encoding=_LOCALE) as data_file:
564             file_data = json.loads(data_file.read())
565     except ValueError as exc:
566         # general json exception, Python 3.5 adds new exception type
567         _LOGGER.exception("Exception with json read: %s", exc)
568         raise
569     except IOError as exc:
570         _LOGGER.exception(
571             'Exception during file open: %s file=%s', exc, json_file)
572         raise
573     return file_data
574
575
576 def write_json_file(json_data, output_path):
577     """
578     Write out the dictionary of data to a json file
579     :param json_data: dictionary of json data
580     :param output_path: file path to write output
581     :return: Boolean if success
582     """
583     try:
584         with open(output_path, 'w', encoding=_LOCALE) as fileh:
585             json.dump(json_data, fileh, indent=2, sort_keys=True,
586                       ensure_ascii=True)
587         return True
588     except ValueError as exc:
589         # general json exception, Python 3.5 adds new exception type
590         _LOGGER.exception(
591             "Exception with json write: %s", exc)
592         return False
593     except IOError as exc:
594         _LOGGER.exception(
595             'Exception during file write: %s file=%s', exc, output_path)
596         return False
597
598
599 if __name__ == "__main__":
600     print("Running UnitTest for XenaJSON")
601     JSON = XenaJSON()
602     print_json_report(JSON.json_data)
603     JSON.set_chassis_info('192.168.0.5', 'vsperf')
604     JSON.set_port(0, 1, 0)
605     JSON.set_port(1, 1, 1)
606     JSON.set_port_ip_v4(0, '192.168.240.10', 32, '192.168.240.1')
607     JSON.set_port_ip_v4(1, '192.168.240.11', 32, '192.168.240.1')
608     JSON.set_port_ip_v6(0, 'a1a1:a2a2:a3a3:a4a4:a5a5:a6a6:a7a7:a8a8', 128,
609                         'a1a1:a2a2:a3a3:a4a4:a5a5:a6a6:a7a7:1111')
610     JSON.set_port_ip_v6(1, 'b1b1:b2b2:b3b3:b4b4:b5b5:b6b6:b7b7:b8b8', 128,
611                         'b1b1:b2b2:b3b3:b4b4:b5b5:b6b6:b7b7:1111')
612     JSON.set_header_layer2(dst_mac='dd:dd:dd:dd:dd:dd',
613                            src_mac='ee:ee:ee:ee:ee:ee')
614     JSON.set_header_vlan(vlan_id=5)
615     JSON.set_header_layer3(src_ip='192.168.100.2', dst_ip='192.168.100.3',
616                            protocol='udp')
617     JSON.set_header_layer4_udp(source_port=3000, destination_port=3001)
618     JSON.set_test_options_tput(packet_sizes=[64], duration=10, iterations=1,
619                                loss_rate=0.0, micro_tpld=True)
620     JSON.add_header_segments(flows=4000, multistream_layer='L4')
621     JSON.set_topology_blocks()
622     write_json_file(JSON.json_data, './testthis.x2544')
623     JSON = XenaJSON('./testthis.x2544')
624     print_json_report(JSON.json_data)
625