1 # Copyright 2016 Red Hat Inc & Xena Networks.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 # Dan Amzulescu, Xena Networks
17 # Christian Trautman, Red Hat Inc.
19 # Usage can be seen below in unit test. This implementation is designed for one
20 # module two port Xena chassis runs only.
27 from collections import OrderedDict
33 import scapy.layers.inet as inet
35 _LOGGER = logging.getLogger(__name__)
36 _LOCALE = locale.getlocale()[1]
39 class XenaJSON(object):
41 Class to modify and read Xena JSON configuration files.
43 def __init__(self, json_path='./profiles/baseconfig.x2544'):
46 :param json_path: path to JSON file to read. Expected files must have
47 two module ports with each port having its own stream config profile.
48 :return: XenaJSON object
50 self.json_data = read_json_file(json_path)
52 self.packet_data = OrderedDict()
53 self.packet_data['layer2'] = None
54 self.packet_data['vlan'] = None
55 self.packet_data['layer3'] = None
56 self.packet_data['layer4'] = None
58 def _add_multistream_layer(self, entity, seg_uuid, stop_value, layer):
60 Add the multi stream layers to the json file based on the layer provided
61 :param entity: Entity to append the segment to in entity list
62 :param seg_uuid: The UUID to attach the multistream layer to
63 :param stop_value: The number of flows to configure
64 :param layer: the layer that the multistream will be attached to
68 2: ('Dst MAC addr', 'Src MAC addr'),
69 3: ('Dest IP Addr', 'Src IP Addr'),
70 4: ('Dest Port', 'Src Port')
75 "Mask": "//8=", # mask of 255/255
78 "StopValue": stop_value,
81 "SegmentId": seg_uuid,
82 "FieldName": field_name[int(layer)][0]
86 "Mask": "//8=", # mask of 255/255
89 "StopValue": stop_value,
92 "SegmentId": seg_uuid,
93 "FieldName": field_name[int(layer)][1]
97 self.json_data['StreamProfileHandler']['EntityList'][entity][
98 'StreamConfig']['HwModifiers'].append(segments)
100 def _create_packet_header(self):
102 Create the scapy packet header based on what has been built in this
103 instance using the set header methods. Return tuple of the two byte
104 arrays, one for each port.
105 :return: Scapy packet headers as bytearrays
107 if not self.packet_data['layer2']:
108 _LOGGER.warning('Using dummy info for layer 2 in Xena JSON file')
109 self.set_header_layer2()
110 packet1, packet2 = (self.packet_data['layer2'][0],
111 self.packet_data['layer2'][1])
112 for packet_header in list(self.packet_data.copy().values())[1:]:
114 packet1 /= packet_header[0]
115 packet2 /= packet_header[1]
116 ret = (bytes(packet1), bytes(packet2))
119 def add_header_segments(self, flows=0, multistream_layer=None):
121 Build the header segments to write to the JSON file.
122 :param flows: Number of flows to configure for multistream if enabled
123 :param multistream_layer: layer to set multistream flows as string.
124 Acceptable values are L2, L3 or L4
127 packet = self._create_packet_header()
131 if self.packet_data['layer2']:
132 # slice out the layer 2 bytes from the packet header byte array
133 layer2 = packet[0][header_pos: len(self.packet_data['layer2'][0])]
134 seg = create_segment(
135 "ETHERNET", encode_byte_array(layer2).decode(_LOCALE))
136 if multistream_layer == 'L2' and flows > 0:
137 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
138 stop_value=flows, layer=2)
140 # now do the other port data with reversed src, dst info
141 layer2 = packet[1][header_pos: len(self.packet_data['layer2'][1])]
142 seg = create_segment(
143 "ETHERNET", encode_byte_array(layer2).decode(_LOCALE))
145 if multistream_layer == 'L2' and flows > 0:
146 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
147 stop_value=flows, layer=2)
148 header_pos = len(layer2)
149 if self.packet_data['vlan']:
150 # slice out the vlan bytes from the packet header byte array
151 vlan = packet[0][header_pos: len(
152 self.packet_data['vlan'][0]) + header_pos]
153 segment1.append(create_segment(
154 "VLAN", encode_byte_array(vlan).decode(_LOCALE)))
155 segment2.append(create_segment(
156 "VLAN", encode_byte_array(vlan).decode(_LOCALE)))
157 header_pos += len(vlan)
158 if self.packet_data['layer3']:
159 # slice out the layer 3 bytes from the packet header byte array
160 layer3 = packet[0][header_pos: len(
161 self.packet_data['layer3'][0]) + header_pos]
162 seg = create_segment(
163 "IP", encode_byte_array(layer3).decode(_LOCALE))
165 if multistream_layer == 'L3' and flows > 0:
166 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
167 stop_value=flows, layer=3)
168 # now do the other port data with reversed src, dst info
169 layer3 = packet[1][header_pos: len(
170 self.packet_data['layer3'][1]) + header_pos]
171 seg = create_segment(
172 "IP", encode_byte_array(layer3).decode(_LOCALE))
174 if multistream_layer == 'L3' and flows > 0:
175 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
176 stop_value=flows, layer=3)
177 header_pos += len(layer3)
178 if self.packet_data['layer4']:
179 # slice out the layer 4 bytes from the packet header byte array
180 layer4 = packet[0][header_pos: len(
181 self.packet_data['layer4'][0]) + header_pos]
182 seg = create_segment(
183 "UDP", encode_byte_array(layer4).decode(_LOCALE))
185 if multistream_layer == 'L4' and flows > 0:
186 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
187 stop_value=flows, layer=4)
188 # now do the other port data with reversed src, dst info
189 layer4 = packet[1][header_pos: len(
190 self.packet_data['layer4'][1]) + header_pos]
191 seg = create_segment(
192 "UDP", encode_byte_array(layer4).decode(_LOCALE))
194 if multistream_layer == 'L4' and flows > 0:
195 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
196 stop_value=flows, layer=4)
197 header_pos += len(layer4)
199 self.json_data['StreamProfileHandler']['EntityList'][0][
200 'StreamConfig']['HeaderSegments'] = segment1
201 self.json_data['StreamProfileHandler']['EntityList'][1][
202 'StreamConfig']['HeaderSegments'] = segment2
204 def disable_back2back_test(self):
206 Disable the rfc2544 back to back test
209 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
212 def disable_throughput_test(self):
214 Disable the rfc2544 throughput test
217 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
220 def enable_back2back_test(self):
222 Enable the rfc2544 back to back test
225 self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
228 def enable_throughput_test(self):
230 Enable the rfc2544 throughput test
233 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
236 def set_chassis_info(self, hostname, pwd):
239 :param hostname: hostname as string of ip
240 :param pwd: password to chassis as string
243 self.json_data['ChassisManager']['ChassisList'][0][
244 'HostName'] = hostname
245 self.json_data['ChassisManager']['ChassisList'][0][
248 def set_header_layer2(self, dst_mac='cc:cc:cc:cc:cc:cc',
249 src_mac='bb:bb:bb:bb:bb:bb', **kwargs):
251 Build a scapy Ethernet L2 objects inside instance packet_data structure
252 :param dst_mac: destination mac as string. Example "aa:aa:aa:aa:aa:aa"
253 :param src_mac: source mac as string. Example "bb:bb:bb:bb:bb:bb"
254 :param kwargs: Extra params per scapy usage.
257 self.packet_data['layer2'] = [
258 inet.Ether(dst=dst_mac, src=src_mac, **kwargs),
259 inet.Ether(dst=src_mac, src=dst_mac, **kwargs)]
261 def set_header_layer3(self, src_ip='192.168.0.2', dst_ip='192.168.0.3',
262 protocol='UDP', **kwargs):
264 Build scapy IPV4 L3 objects inside instance packet_data structure
265 :param src_ip: source IP as string in dot notation format
266 :param dst_ip: destination IP as string in dot notation format
267 :param protocol: protocol for l4
268 :param kwargs: Extra params per scapy usage
271 self.packet_data['layer3'] = [
272 inet.IP(src=src_ip, dst=dst_ip, proto=protocol.lower(), **kwargs),
273 inet.IP(src=dst_ip, dst=src_ip, proto=protocol.lower(), **kwargs)]
275 def set_header_layer4_udp(self, source_port, destination_port, **kwargs):
277 Build scapy UDP L4 objects inside instance packet_data structure
278 :param source_port: Source port as int
279 :param destination_port: Destination port as int
280 :param kwargs: Extra params per scapy usage
283 self.packet_data['layer4'] = [
284 inet.UDP(sport=source_port, dport=destination_port, **kwargs),
285 inet.UDP(sport=source_port, dport=destination_port, **kwargs)]
287 def set_header_vlan(self, vlan_id=1, **kwargs):
289 Build a Dot1Q scapy object inside instance packet_data structure
290 :param vlan_id: The VLAN ID
291 :param kwargs: Extra params per scapy usage
294 self.packet_data['vlan'] = [
295 inet.Dot1Q(vlan=vlan_id, **kwargs),
296 inet.Dot1Q(vlan=vlan_id, **kwargs)]
298 def set_port(self, index, module, port):
300 Set the module and port for the 0 index port to use with the test
301 :param index: Index of port to set, 0 = port1, 1=port2, etc..
302 :param module: module location as int
303 :param port: port location in module as int
306 self.json_data['PortHandler']['EntityList'][index]['PortRef'][
307 'ModuleIndex'] = module
308 self.json_data['PortHandler']['EntityList'][index]['PortRef'][
311 def set_test_options(self, packet_sizes, duration, iterations, loss_rate,
315 :param packet_sizes: List of packet sizes to test, single int entry is
316 acceptable for one packet size testing
317 :param duration: time for each test in seconds as int
318 :param iterations: number of iterations of testing as int
319 :param loss_rate: acceptable loss rate as float
320 :param micro_tpld: boolean if micro_tpld should be enabled or disabled
323 if isinstance(packet_sizes, int):
324 packet_sizes = [packet_sizes]
325 self.json_data['TestOptions']['PacketSizes'][
326 'CustomPacketSizes'] = packet_sizes
327 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
328 'Duration'] = duration
329 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
330 'RateIterationOptions']['AcceptableLoss'] = loss_rate
331 self.json_data['TestOptions']['FlowCreationOptions'][
332 'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
333 self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
334 'Iterations'] = iterations
336 def set_topology_blocks(self):
338 Set the test topology to a West to East config for half duplex flow with
339 port 0 as the sender and port 1 as the receiver.
342 self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'BLOCKS'
343 self.json_data['TestOptions']['TopologyConfig'][
344 'Direction'] = 'WEST_EAST'
345 self.json_data['PortHandler']['EntityList'][0][
346 'PortGroup'] = "WEST"
347 self.json_data['PortHandler']['EntityList'][1][
348 'PortGroup'] = "EAST"
350 def set_topology_mesh(self):
352 Set the test topology to Mesh for bi directional full duplex flow
355 self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'MESH'
356 self.json_data['TestOptions']['TopologyConfig']['Direction'] = 'BIDIR'
357 self.json_data['PortHandler']['EntityList'][0][
358 'PortGroup'] = "UNDEFINED"
359 self.json_data['PortHandler']['EntityList'][1][
360 'PortGroup'] = "UNDEFINED"
362 def write_config(self, path='./2bUsed.x2544'):
364 Write the config to out as file
365 :param path: Output file to export the json data to
368 if not write_json_file(self.json_data, path):
369 raise RuntimeError("Could not write out file, please check config")
372 def create_segment(header_type, encode_64_string):
374 Create segment for JSON file
375 :param header_type: Type of header as string
376 :param encode_64_string: 64 byte encoded string value of the hex bytes
377 :return: segment as dictionary
380 "SegmentType": header_type.upper(),
381 "SegmentValue": encode_64_string,
382 "ItemID": str(uuid.uuid4()),
387 def decode_byte_array(enc_str):
388 """ Decodes the base64-encoded string to a byte array
389 :param enc_str: The base64-encoded string representing a byte array
390 :return: The decoded byte array
392 dec_string = base64.b64decode(enc_str)
394 barray.extend(dec_string)
398 def encode_byte_array(byte_arr):
399 """ Encodes the byte array as a base64-encoded string
400 :param byte_arr: A bytearray containing the bytes to convert
401 :return: A base64 encoded string
403 enc_string = base64.b64encode(bytes(byte_arr))
407 def print_json_report(json_data):
409 Print out info from the json data for testing purposes only.
410 :param json_data: json loaded data from json.loads
413 print("<<Xena JSON Config Report>>\n")
415 print("### Chassis Info ###")
416 print("Chassis IP: {}".format(json_data['ChassisManager'][
417 'ChassisList'][0]['HostName']))
418 print("Chassis Password: {}".format(json_data['ChassisManager'][
419 'ChassisList'][0]['Password']))
420 print("### Port Configuration ###")
421 print("Port 1: {}/{} group: {}".format(
422 json_data['PortHandler']['EntityList'][0]['PortRef']['ModuleIndex'],
423 json_data['PortHandler']['EntityList'][0]['PortRef']['PortIndex'],
424 json_data['PortHandler']['EntityList'][0]['PortGroup']))
425 print("Port 2: {}/{} group: {}".format(
426 json_data['PortHandler']['EntityList'][1]['PortRef']['ModuleIndex'],
427 json_data['PortHandler']['EntityList'][1]['PortRef']['PortIndex'],
428 json_data['PortHandler']['EntityList'][1]['PortGroup']))
429 print("### Tests Enabled ###")
430 print("Back2Back Enabled: {}".format(json_data['TestOptions'][
431 'TestTypeOptionMap']['Back2Back']['Enabled']))
432 print("Throughput Enabled: {}".format(json_data['TestOptions'][
433 'TestTypeOptionMap']['Throughput']['Enabled']))
434 print("### Test Options ###")
435 print("Test topology: {}/{}".format(
436 json_data['TestOptions']['TopologyConfig']['Topology'],
437 json_data['TestOptions']['TopologyConfig']['Direction']))
438 print("Packet Sizes: {}".format(json_data['TestOptions'][
439 'PacketSizes']['CustomPacketSizes']))
440 print("Test duration: {}".format(json_data['TestOptions'][
441 'TestTypeOptionMap']['Throughput']['Duration']))
442 print("Acceptable loss rate: {}".format(json_data['TestOptions'][
443 'TestTypeOptionMap']['Throughput']['RateIterationOptions'][
445 print("Micro TPLD enabled: {}".format(json_data['TestOptions'][
446 'FlowCreationOptions']['UseMicroTpldOnDemand']))
447 print("Test iterations: {}".format(json_data['TestOptions'][
448 'TestTypeOptionMap']['Throughput']['Iterations']))
449 if 'StreamConfig' in json_data['StreamProfileHandler']['EntityList'][0]:
450 print("### Header segments ###")
451 for seg in json_data['StreamProfileHandler']['EntityList']:
452 for header in seg['StreamConfig']['HeaderSegments']:
453 print("Type: {}".format(
454 header['SegmentType']))
455 print("Value: {}".format(decode_byte_array(
456 header['SegmentValue'])))
457 print("### Multi Stream config ###")
458 for seg in json_data['StreamProfileHandler']['EntityList']:
459 for header in seg['StreamConfig']['HwModifiers']:
461 except KeyError as exc:
462 print("Error setting not found in JSON data: {}".format(exc))
465 def read_json_file(json_file):
467 Read the json file path and return a dictionary of the data
468 :param json_file: path to json file
469 :return: dictionary of json data
472 with open(json_file, 'r', encoding=_LOCALE) as data_file:
473 file_data = json.loads(data_file.read())
474 except ValueError as exc:
475 # general json exception, Python 3.5 adds new exception type
476 _LOGGER.exception("Exception with json read: %s", exc)
478 except IOError as exc:
480 'Exception during file open: %s file=%s', exc, json_file)
485 def write_json_file(json_data, output_path):
487 Write out the dictionary of data to a json file
488 :param json_data: dictionary of json data
489 :param output_path: file path to write output
490 :return: Boolean if success
493 with open(output_path, 'w', encoding=_LOCALE) as fileh:
494 json.dump(json_data, fileh, indent=2, sort_keys=True,
497 except ValueError as exc:
498 # general json exception, Python 3.5 adds new exception type
500 "Exception with json write: %s", exc)
502 except IOError as exc:
504 'Exception during file write: %s file=%s', exc, output_path)
508 if __name__ == "__main__":
509 print("Running UnitTest for XenaJSON")
511 print_json_report(JSON.json_data)
512 JSON.set_chassis_info('192.168.0.5', 'vsperf')
513 JSON.set_port(0, 1, 0)
514 JSON.set_port(1, 1, 1)
515 JSON.set_header_layer2(dst_mac='dd:dd:dd:dd:dd:dd',
516 src_mac='ee:ee:ee:ee:ee:ee')
517 JSON.set_header_vlan(vlan_id=5)
518 JSON.set_header_layer3(src_ip='192.168.100.2', dst_ip='192.168.100.3',
520 JSON.set_header_layer4_udp(source_port=3000, destination_port=3001)
521 JSON.set_test_options(packet_sizes=[64], duration=10, iterations=1,
522 loss_rate=0.0, micro_tpld=True)
523 JSON.add_header_segments(flows=4000, multistream_layer='L4')
524 JSON.set_topology_blocks()
525 write_json_file(JSON.json_data, './testthis.x2544')
526 JSON = XenaJSON('./testthis.x2544')
527 print_json_report(JSON.json_data)