1 # Copyright 2016 Red Hat Inc & Xena Networks.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 # Rick Alongi, Red Hat Inc.
17 # Amit Supugade, Red Hat Inc.
18 # Dan Amzulescu, Xena Networks
19 # Christian Trautman, Red Hat Inc.
22 Xena Traffic Generator Model
30 from time import sleep
31 import xml.etree.ElementTree as ET
32 from collections import OrderedDict
35 from conf import settings
36 from core.results.results_constants import ResultsConstants
37 from tools.pkt_gen.trafficgen.trafficgenhelper import (
40 from tools.pkt_gen.trafficgen.trafficgen import ITrafficGenerator
43 from tools.pkt_gen.xena.xena_json import XenaJSON
44 from tools.pkt_gen.xena.XenaDriver import (
52 import scapy.layers.inet as inet
55 class Xena(ITrafficGenerator):
57 Xena Traffic generator wrapper class
59 _traffic_defaults = TRAFFIC_DEFAULTS.copy()
60 _logger = logging.getLogger(__name__)
72 def traffic_defaults(self):
73 """Default traffic values.
75 These can be expected to be constant across traffic generators,
76 so no setter is provided. Changes to the structure or contents
77 will likely break traffic generator implementations or tests
80 return self._traffic_defaults
83 def _create_throughput_result(root):
85 Create the results based off the output xml file from the Xena2544.exe
87 :param root: root dictionary from xml import
88 :return: Results Ordered dictionary based off ResultsConstants
90 # get the test type from the report file
91 test_type = root[0][1].get('TestType')
92 # set the version from the report file
93 settings.setValue('XENA_VERSION', root[0][0][1].get('GeneratedBy'))
95 if test_type == 'Throughput':
96 results = OrderedDict()
97 results[ResultsConstants.THROUGHPUT_RX_FPS] = int(
98 root[0][1][0][0].get('PortRxPps')) + int(
99 root[0][1][0][1].get('PortRxPps'))
100 results[ResultsConstants.THROUGHPUT_RX_MBPS] = (int(
101 root[0][1][0][0].get('PortRxBpsL1')) + int(
102 root[0][1][0][1].get('PortRxBpsL1')))/ 1000000
103 results[ResultsConstants.THROUGHPUT_RX_PERCENT] = (
104 100 - int(root[0][1][0].get('TotalLossRatioPcnt'))) * float(
105 root[0][1][0].get('TotalTxRatePcnt'))/100
106 results[ResultsConstants.TX_RATE_FPS] = root[0][1][0].get(
108 results[ResultsConstants.TX_RATE_MBPS] = float(
109 root[0][1][0].get('TotalTxRateBpsL1')) / 1000000
110 results[ResultsConstants.TX_RATE_PERCENT] = root[0][1][0].get(
113 results[ResultsConstants.MIN_LATENCY_NS] = float(
114 root[0][1][0][0].get('MinLatency')) * 1000
116 # Stats for latency returned as N/A so just post them
117 results[ResultsConstants.MIN_LATENCY_NS] = root[0][1][0][0].get(
120 results[ResultsConstants.MAX_LATENCY_NS] = float(
121 root[0][1][0][0].get('MaxLatency')) * 1000
123 # Stats for latency returned as N/A so just post them
124 results[ResultsConstants.MAX_LATENCY_NS] = root[0][1][0][0].get(
127 results[ResultsConstants.AVG_LATENCY_NS] = float(
128 root[0][1][0][0].get('AvgLatency')) * 1000
130 # Stats for latency returned as N/A so just post them
131 results[ResultsConstants.AVG_LATENCY_NS] = root[0][1][0][0].get(
133 elif test_type == 'Back2Back':
134 results = OrderedDict()
136 # Just mimic what Ixia does and only return the b2b frame count.
137 # This may change later once its decided the common results stats
138 # to be returned should be.
139 results[ResultsConstants.B2B_FRAMES] = root[0][1][0][0].get(
140 'TotalTxBurstFrames')
142 raise NotImplementedError('Unknown test type in report file.')
146 def _build_packet_header(self, reverse=False):
148 Build a packet header based on traffic profile using scapy external
150 :param reverse: Swap source and destination info when building header
151 :return: packet header in hex
153 srcmac = self._params['traffic']['l2'][
154 'srcmac'] if not reverse else self._params['traffic']['l2'][
156 dstmac = self._params['traffic']['l2'][
157 'dstmac'] if not reverse else self._params['traffic']['l2'][
159 srcip = self._params['traffic']['l3'][
160 'srcip'] if not reverse else self._params['traffic']['l3']['dstip']
161 dstip = self._params['traffic']['l3'][
162 'dstip'] if not reverse else self._params['traffic']['l3']['srcip']
163 layer2 = inet.Ether(src=srcmac, dst=dstmac)
164 layer3 = inet.IP(src=srcip, dst=dstip,
165 proto=self._params['traffic']['l3']['proto'])
166 layer4 = inet.UDP(sport=self._params['traffic']['l4']['srcport'],
167 dport=self._params['traffic']['l4']['dstport'])
168 if self._params['traffic']['vlan']['enabled']:
169 vlan = inet.Dot1Q(vlan=self._params['traffic']['vlan']['id'],
170 prio=self._params['traffic']['vlan']['priority'],
171 id=self._params['traffic']['vlan']['cfi'])
174 packet = layer2/vlan/layer3/layer4 if vlan else layer2/layer3/layer4
175 packet_bytes = bytes(packet)
176 packet_hex = '0x' + binascii.hexlify(packet_bytes).decode('utf-8')
179 def _create_api_result(self):
181 Create result dictionary per trafficgen specifications from socket API
182 stats. If stats are not available return values of 0.
183 :return: ResultsConstants as dictionary
185 # Handle each case of statistics based on if the data is available.
186 # This prevents uncaught exceptions when the stats aren't available.
187 result_dict = OrderedDict()
188 if self.tx_stats.data.get(self.tx_stats.pt_stream_keys[0]):
189 result_dict[ResultsConstants.TX_FRAMES] = self.tx_stats.data[
190 self.tx_stats.pt_stream_keys[0]]['packets']
191 result_dict[ResultsConstants.TX_RATE_FPS] = self.tx_stats.data[
192 self.tx_stats.pt_stream_keys[0]]['pps']
193 result_dict[ResultsConstants.TX_RATE_MBPS] = self.tx_stats.data[
194 self.tx_stats.pt_stream_keys[0]]['bps'] / 1000000
195 result_dict[ResultsConstants.TX_BYTES] = self.tx_stats.data[
196 self.tx_stats.pt_stream_keys[0]]['bytes']
197 # tx rate percent may need to be halved if bi directional
198 result_dict[ResultsConstants.TX_RATE_PERCENT] = line_percentage(
199 self.xmanager.ports[0], self.tx_stats, self._duration,
200 self._params['traffic']['l2']['framesize']) if \
201 self._params['traffic']['bidir'] == 'False' else\
203 self.xmanager.ports[0], self.tx_stats, self._duration,
204 self._params['traffic']['l2']['framesize']) / 2
206 self._logger.error('Transmit stats not available.')
207 result_dict[ResultsConstants.TX_FRAMES] = 0
208 result_dict[ResultsConstants.TX_RATE_FPS] = 0
209 result_dict[ResultsConstants.TX_RATE_MBPS] = 0
210 result_dict[ResultsConstants.TX_BYTES] = 0
211 result_dict[ResultsConstants.TX_RATE_PERCENT] = 0
213 if self.rx_stats.data.get('pr_tpldstraffic'):
214 result_dict[ResultsConstants.RX_FRAMES] = self.rx_stats.data[
215 'pr_tpldstraffic']['0']['packets']
217 ResultsConstants.THROUGHPUT_RX_FPS] = self.rx_stats.data[
218 'pr_tpldstraffic']['0']['pps']
220 ResultsConstants.THROUGHPUT_RX_MBPS] = self.rx_stats.data[
221 'pr_tpldstraffic']['0']['bps'] / 1000000
222 result_dict[ResultsConstants.RX_BYTES] = self.rx_stats.data[
223 'pr_tpldstraffic']['0']['bytes']
224 # throughput percent may need to be halved if bi directional
226 ResultsConstants.THROUGHPUT_RX_PERCENT] = line_percentage(
227 self.xmanager.ports[1], self.rx_stats, self._duration,
228 self._params['traffic']['l2']['framesize']) if \
229 self._params['traffic']['bidir'] == 'False' else \
231 self.xmanager.ports[1], self.rx_stats, self._duration,
232 self._params['traffic']['l2']['framesize']) / 2
235 self._logger.error('Receive stats not available.')
236 result_dict[ResultsConstants.RX_FRAMES] = 0
237 result_dict[ResultsConstants.THROUGHPUT_RX_FPS] = 0
238 result_dict[ResultsConstants.THROUGHPUT_RX_MBPS] = 0
239 result_dict[ResultsConstants.RX_BYTES] = 0
240 result_dict[ResultsConstants.THROUGHPUT_RX_PERCENT] = 0
242 if self.rx_stats.data.get('pr_tplderrors'):
243 result_dict[ResultsConstants.PAYLOAD_ERR] = self.rx_stats.data[
244 'pr_tplderrors']['0']['pld']
245 result_dict[ResultsConstants.SEQ_ERR] = self.rx_stats.data[
246 'pr_tplderrors']['0']['seq']
248 result_dict[ResultsConstants.PAYLOAD_ERR] = 0
249 result_dict[ResultsConstants.SEQ_ERR] = 0
251 if self.rx_stats.data.get('pr_tpldlatency'):
252 result_dict[ResultsConstants.MIN_LATENCY_NS] = self.rx_stats.data[
253 'pr_tpldlatency']['0']['min']
254 result_dict[ResultsConstants.MAX_LATENCY_NS] = self.rx_stats.data[
255 'pr_tpldlatency']['0']['max']
256 result_dict[ResultsConstants.AVG_LATENCY_NS] = self.rx_stats.data[
257 'pr_tpldlatency']['0']['avg']
259 result_dict[ResultsConstants.MIN_LATENCY_NS] = 0
260 result_dict[ResultsConstants.MAX_LATENCY_NS] = 0
261 result_dict[ResultsConstants.AVG_LATENCY_NS] = 0
265 def _setup_json_config(self, trials, loss_rate, testtype=None):
267 Create a 2bUsed json file that will be used for xena2544.exe execution.
268 :param trials: Number of trials
269 :param loss_rate: The acceptable loss rate as float
270 :param testtype: Either '2544_b2b' or '2544_throughput' as string
274 j_file = XenaJSON('./tools/pkt_gen/xena/profiles/baseconfig.x2544')
275 j_file.set_chassis_info(
276 settings.getValue('TRAFFICGEN_XENA_IP'),
277 settings.getValue('TRAFFICGEN_XENA_PASSWORD')
279 j_file.set_port(0, settings.getValue('TRAFFICGEN_XENA_MODULE1'),
280 settings.getValue('TRAFFICGEN_XENA_PORT1'))
281 j_file.set_port(1, settings.getValue('TRAFFICGEN_XENA_MODULE2'),
282 settings.getValue('TRAFFICGEN_XENA_PORT2'))
283 j_file.set_port_ip_v4(
284 0, settings.getValue("TRAFFICGEN_XENA_PORT0_IP"),
285 settings.getValue("TRAFFICGEN_XENA_PORT0_CIDR"),
286 settings.getValue("TRAFFICGEN_XENA_PORT0_GATEWAY"))
287 j_file.set_port_ip_v4(
288 1, settings.getValue("TRAFFICGEN_XENA_PORT1_IP"),
289 settings.getValue("TRAFFICGEN_XENA_PORT1_CIDR"),
290 settings.getValue("TRAFFICGEN_XENA_PORT1_GATEWAY"))
291 j_file.set_test_options(
292 packet_sizes=self._params['traffic']['l2']['framesize'],
293 iterations=trials, loss_rate=loss_rate,
294 duration=self._duration, micro_tpld=True if self._params[
295 'traffic']['l2']['framesize'] == 64 else False)
296 if testtype == '2544_throughput':
297 j_file.enable_throughput_test()
298 elif testtype == '2544_b2b':
299 j_file.enable_back2back_test()
301 j_file.set_header_layer2(
302 dst_mac=self._params['traffic']['l2']['dstmac'],
303 src_mac=self._params['traffic']['l2']['srcmac'])
304 j_file.set_header_layer3(
305 src_ip=self._params['traffic']['l3']['srcip'],
306 dst_ip=self._params['traffic']['l3']['dstip'],
307 protocol=self._params['traffic']['l3']['proto'])
308 j_file.set_header_layer4_udp(
309 source_port=self._params['traffic']['l4']['srcport'],
310 destination_port=self._params['traffic']['l4']['dstport'])
311 if self._params['traffic']['vlan']['enabled']:
312 j_file.set_header_vlan(
313 vlan_id=self._params['traffic']['vlan']['id'],
314 id=self._params['traffic']['vlan']['cfi'],
315 prio=self._params['traffic']['vlan']['priority'])
316 j_file.add_header_segments(
317 flows=self._params['traffic']['multistream'],
318 multistream_layer=self._params['traffic']['stream_type'])
320 if self._params['traffic']['bidir'] == "True":
321 j_file.set_topology_mesh()
323 j_file.set_topology_blocks()
325 j_file.write_config('./tools/pkt_gen/xena/profiles/2bUsed.x2544')
326 except Exception as exc:
327 self._logger.exception("Error during Xena JSON setup: %s", exc)
330 def _start_traffic_api(self, packet_limit):
332 Start the Xena traffic using the socket API driver
333 :param packet_limit: packet limit for stream, set to -1 for no limit
336 if not self.xmanager:
337 self._xsocket = XenaSocketDriver(
338 settings.getValue('TRAFFICGEN_XENA_IP'))
339 self.xmanager = XenaManager(
340 self._xsocket, settings.getValue('TRAFFICGEN_XENA_USER'),
341 settings.getValue('TRAFFICGEN_XENA_PASSWORD'))
343 # for the report file version info ask the chassis directly for its
345 settings.setValue('XENA_VERSION', 'XENA Socket API - {}'.format(
346 self.xmanager.get_version()))
348 if not len(self.xmanager.ports):
349 self.xmanager.ports[0] = self.xmanager.add_module_port(
350 settings.getValue('TRAFFICGEN_XENA_MODULE1'),
351 settings.getValue('TRAFFICGEN_XENA_PORT1'))
352 if not self.xmanager.ports[0].reserve_port():
354 'Unable to reserve port 0. Please release Xena Port')
356 if len(self.xmanager.ports) < 2:
357 self.xmanager.ports[1] = self.xmanager.add_module_port(
358 settings.getValue('TRAFFICGEN_XENA_MODULE2'),
359 settings.getValue('TRAFFICGEN_XENA_PORT2'))
360 if not self.xmanager.ports[1].reserve_port():
362 'Unable to reserve port 1. Please release Xena Port')
364 # Clear port configuration for a clean start
365 self.xmanager.ports[0].reset_port()
366 self.xmanager.ports[1].reset_port()
367 self.xmanager.ports[0].clear_stats()
368 self.xmanager.ports[1].clear_stats()
370 # set the port IP from the conf file
371 self.xmanager.ports[0].set_port_ip(
372 settings.getValue('TRAFFICGEN_XENA_PORT0_IP'),
373 settings.getValue('TRAFFICGEN_XENA_PORT0_CIDR'),
374 settings.getValue('TRAFFICGEN_XENA_PORT0_GATEWAY'))
375 self.xmanager.ports[1].set_port_ip(
376 settings.getValue('TRAFFICGEN_XENA_PORT1_IP'),
377 settings.getValue('TRAFFICGEN_XENA_PORT1_CIDR'),
378 settings.getValue('TRAFFICGEN_XENA_PORT1_GATEWAY'))
380 def setup_stream(stream, port, payload_id, flip_addr=False):
382 Helper function to configure streams.
383 :param stream: Stream object from XenaDriver module
384 :param port: Port object from XenaDriver module
385 :param payload_id: payload ID as int
386 :param flip_addr: Boolean if the source and destination addresses
391 stream.set_packet_limit(packet_limit)
393 stream.set_rate_fraction(
394 10000 * self._params['traffic']['frame_rate'])
395 stream.set_packet_header(self._build_packet_header(
397 stream.set_header_protocol(
398 'ETHERNET VLAN IP UDP' if self._params['traffic']['vlan'][
399 'enabled'] else 'ETHERNET IP UDP')
400 stream.set_packet_length(
401 'fixed', self._params['traffic']['l2']['framesize'], 16383)
402 stream.set_packet_payload('incrementing', '0x00')
403 stream.set_payload_id(payload_id)
404 port.set_port_time_limit(self._duration * 1000000)
406 if self._params['traffic']['l2']['framesize'] == 64:
408 port.micro_tpld_enable()
410 if self._params['traffic']['multistream']:
411 stream.enable_multistream(
412 flows=self._params['traffic']['multistream'],
413 layer=self._params['traffic']['stream_type'])
415 s1_p0 = self.xmanager.ports[0].add_stream()
416 setup_stream(s1_p0, self.xmanager.ports[0], 0)
418 if self._params['traffic']['bidir'] == 'True':
419 s1_p1 = self.xmanager.ports[1].add_stream()
420 setup_stream(s1_p1, self.xmanager.ports[1], 1, flip_addr=True)
422 if not self.xmanager.ports[0].traffic_on():
424 "Failure to start port 0. Check settings and retry.")
425 if self._params['traffic']['bidir'] == 'True':
426 if not self.xmanager.ports[1].traffic_on():
428 "Failure to start port 1. Check settings and retry.")
429 sleep(self._duration)
431 if self._params['traffic']['bidir'] == 'True':
432 # need to aggregate out both ports stats and assign that data
433 self.rx_stats = self.xmanager.ports[1].get_rx_stats()
434 self.tx_stats = self.xmanager.ports[0].get_tx_stats()
435 self.tx_stats.data = aggregate_stats(
437 self.xmanager.ports[1].get_tx_stats().data)
438 self.rx_stats.data = aggregate_stats(
440 self.xmanager.ports[0].get_rx_stats().data)
442 # no need to aggregate, just grab the appropriate port stats
443 self.tx_stats = self.xmanager.ports[0].get_tx_stats()
444 self.rx_stats = self.xmanager.ports[1].get_rx_stats()
447 def _stop_api_traffic(self):
449 Stop traffic through the socket API
450 :return: Return results from _create_api_result method
452 self.xmanager.ports[0].traffic_off()
453 if self._params['traffic']['bidir'] == 'True':
454 self.xmanager.ports[1].traffic_off()
457 stat = self._create_api_result()
462 self._logger.debug('Connect')
465 def disconnect(self):
466 """Disconnect from the traffic generator.
468 As with :func:`connect`, this function is optional.
471 Where implemented, this function should raise an exception on
476 self._logger.debug('disconnect')
478 self.xmanager.disconnect()
482 self._xsocket.disconnect()
485 def send_burst_traffic(self, traffic=None, numpkts=100, duration=20):
486 """Send a burst of traffic.
488 See ITrafficGenerator for description
490 self._duration = duration
493 self._params['traffic'] = self.traffic_defaults.copy()
495 self._params['traffic'] = merge_spec(self._params['traffic'],
498 self._start_traffic_api(numpkts)
499 return self._stop_api_traffic()
501 def send_cont_traffic(self, traffic=None, duration=20):
502 """Send a continuous flow of traffic.
504 See ITrafficGenerator for description
506 self._duration = duration
509 self._params['traffic'] = self.traffic_defaults.copy()
511 self._params['traffic'] = merge_spec(self._params['traffic'],
514 self._start_traffic_api(-1)
515 return self._stop_api_traffic()
517 def start_cont_traffic(self, traffic=None, duration=20):
518 """Non-blocking version of 'send_cont_traffic'.
520 See ITrafficGenerator for description
522 self._duration = duration
525 self._params['traffic'] = self.traffic_defaults.copy()
527 self._params['traffic'] = merge_spec(self._params['traffic'],
530 self._start_traffic_api(-1)
532 def stop_cont_traffic(self):
533 """Stop continuous transmission and return results.
535 return self._stop_api_traffic()
537 def send_rfc2544_throughput(self, traffic=None, trials=3, duration=20,
539 """Send traffic per RFC2544 throughput test specifications.
541 See ITrafficGenerator for description
543 self._duration = duration
546 self._params['traffic'] = self.traffic_defaults.copy()
548 self._params['traffic'] = merge_spec(self._params['traffic'],
551 self._setup_json_config(trials, lossrate, '2544_throughput')
553 args = ["mono", "./tools/pkt_gen/xena/Xena2544.exe", "-c",
554 "./tools/pkt_gen/xena/profiles/2bUsed.x2544", "-e", "-r",
555 "./tools/pkt_gen/xena", "-u",
556 settings.getValue('TRAFFICGEN_XENA_USER')]
557 self.mono_pipe = subprocess.Popen(args, stdout=sys.stdout)
558 self.mono_pipe.communicate()
559 root = ET.parse(r'./tools/pkt_gen/xena/xena2544-report.xml').getroot()
560 return Xena._create_throughput_result(root)
562 def start_rfc2544_throughput(self, traffic=None, trials=3, duration=20,
564 """Non-blocking version of 'send_rfc2544_throughput'.
566 See ITrafficGenerator for description
568 self._duration = duration
570 self._params['traffic'] = self.traffic_defaults.copy()
572 self._params['traffic'] = merge_spec(self._params['traffic'],
575 self._setup_json_config(trials, lossrate, '2544_throughput')
577 args = ["mono", "./tools/pkt_gen/xena/Xena2544.exe", "-c",
578 "./tools/pkt_gen/xena/profiles/2bUsed.x2544", "-e", "-r",
579 "./tools/pkt_gen/xena", "-u",
580 settings.getValue('TRAFFICGEN_XENA_USER')]
581 self.mono_pipe = subprocess.Popen(args, stdout=sys.stdout)
583 def wait_rfc2544_throughput(self):
584 """Wait for and return results of RFC2544 test.
586 See ITrafficGenerator for description
588 self.mono_pipe.communicate()
590 root = ET.parse(r'./tools/pkt_gen/xena/xena2544-report.xml').getroot()
591 return Xena._create_throughput_result(root)
593 def send_rfc2544_back2back(self, traffic=None, trials=1, duration=20,
595 """Send traffic per RFC2544 back2back test specifications.
597 See ITrafficGenerator for description
599 self._duration = duration
602 self._params['traffic'] = self.traffic_defaults.copy()
604 self._params['traffic'] = merge_spec(self._params['traffic'],
607 self._setup_json_config(trials, lossrate, '2544_b2b')
609 args = ["mono", "./tools/pkt_gen/xena/Xena2544.exe", "-c",
610 "./tools/pkt_gen/xena/profiles/2bUsed.x2544", "-e", "-r",
611 "./tools/pkt_gen/xena", "-u",
612 settings.getValue('TRAFFICGEN_XENA_USER')]
613 self.mono_pipe = subprocess.Popen(
614 args, stdout=sys.stdout)
615 self.mono_pipe.communicate()
616 root = ET.parse(r'./tools/pkt_gen/xena/xena2544-report.xml').getroot()
617 return Xena._create_throughput_result(root)
619 def start_rfc2544_back2back(self, traffic=None, trials=1, duration=20,
621 """Non-blocking version of 'send_rfc2544_back2back'.
623 See ITrafficGenerator for description
625 self._duration = duration
628 self._params['traffic'] = self.traffic_defaults.copy()
630 self._params['traffic'] = merge_spec(self._params['traffic'],
633 self._setup_json_config(trials, lossrate, '2544_b2b')
635 args = ["mono", "./tools/pkt_gen/xena/Xena2544.exe", "-c",
636 "./tools/pkt_gen/xena/profiles/2bUsed.x2544", "-e", "-r",
637 "./tools/pkt_gen/xena", "-u",
638 settings.getValue('TRAFFICGEN_XENA_USER')]
639 self.mono_pipe = subprocess.Popen(
640 args, stdout=sys.stdout)
642 def wait_rfc2544_back2back(self):
643 """Wait and set results of RFC2544 test.
645 self.mono_pipe.communicate()
647 root = ET.parse(r'./tools/pkt_gen/xena/xena2544-report.xml').getroot()
648 return Xena._create_throughput_result(root)
651 if __name__ == "__main__":