1 # Copyright 2016 Red Hat Inc & Xena Networks.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 # Rick Alongi, Red Hat Inc.
17 # Amit Supugade, Red Hat Inc.
18 # Dan Amzulescu, Xena Networks
19 # Christian Trautman, Red Hat Inc.
22 Xena Traffic Generator Model
30 from time import sleep
31 import xml.etree.ElementTree as ET
32 from collections import OrderedDict
35 from conf import settings
36 from core.results.results_constants import ResultsConstants
37 from tools.pkt_gen.trafficgen.trafficgenhelper import (
40 from tools.pkt_gen.trafficgen.trafficgen import ITrafficGenerator
43 from tools.pkt_gen.xena.xena_json import XenaJSON
44 from tools.pkt_gen.xena.XenaDriver import (
52 import scapy.layers.inet as inet
55 class Xena(ITrafficGenerator):
57 Xena Traffic generator wrapper class
59 _traffic_defaults = TRAFFIC_DEFAULTS.copy()
60 _logger = logging.getLogger(__name__)
72 def traffic_defaults(self):
73 """Default traffic values.
75 These can be expected to be constant across traffic generators,
76 so no setter is provided. Changes to the structure or contents
77 will likely break traffic generator implementations or tests
80 return self._traffic_defaults
83 def _create_throughput_result(root):
85 Create the results based off the output xml file from the Xena2544.exe
87 :param root: root dictionary from xml import
88 :return: Results Ordered dictionary based off ResultsConstants
90 # get the test type from the report file
91 test_type = root[0][1].get('TestType')
92 # set the version from the report file
93 settings.setValue('XENA_VERSION', root[0][0][1].get('GeneratedBy'))
95 if test_type == 'Throughput':
96 results = OrderedDict()
97 results[ResultsConstants.THROUGHPUT_RX_FPS] = int(
98 root[0][1][0][1].get('PortRxPps'))
99 results[ResultsConstants.THROUGHPUT_RX_MBPS] = int(
100 root[0][1][0][1].get('PortRxBpsL1')) / 1000000
101 results[ResultsConstants.THROUGHPUT_RX_PERCENT] = (
102 100 - int(root[0][1][0].get('TotalLossRatioPcnt'))) * float(
103 root[0][1][0].get('TotalTxRatePcnt'))/100
104 results[ResultsConstants.TX_RATE_FPS] = root[0][1][0].get(
106 results[ResultsConstants.TX_RATE_MBPS] = float(
107 root[0][1][0].get('TotalTxRateBpsL1')) / 1000000
108 results[ResultsConstants.TX_RATE_PERCENT] = root[0][1][0].get(
111 results[ResultsConstants.MIN_LATENCY_NS] = float(
112 root[0][1][0][0].get('MinLatency')) * 1000
114 # Stats for latency returned as N/A so just post them
115 results[ResultsConstants.MIN_LATENCY_NS] = root[0][1][0][0].get(
118 results[ResultsConstants.MAX_LATENCY_NS] = float(
119 root[0][1][0][0].get('MaxLatency')) * 1000
121 # Stats for latency returned as N/A so just post them
122 results[ResultsConstants.MAX_LATENCY_NS] = root[0][1][0][0].get(
125 results[ResultsConstants.AVG_LATENCY_NS] = float(
126 root[0][1][0][0].get('AvgLatency')) * 1000
128 # Stats for latency returned as N/A so just post them
129 results[ResultsConstants.AVG_LATENCY_NS] = root[0][1][0][0].get(
131 elif test_type == 'Back2Back':
132 results = OrderedDict()
134 # Just mimic what Ixia does and only return the b2b frame count.
135 # This may change later once its decided the common results stats
136 # to be returned should be.
137 results[ResultsConstants.B2B_FRAMES] = root[0][1][0][0].get(
138 'TotalTxBurstFrames')
140 raise NotImplementedError('Unknown test type in report file.')
144 def _build_packet_header(self, reverse=False):
146 Build a packet header based on traffic profile using scapy external
148 :param reverse: Swap source and destination info when building header
149 :return: packet header in hex
151 srcmac = self._params['traffic']['l2'][
152 'srcmac'] if not reverse else self._params['traffic']['l2'][
154 dstmac = self._params['traffic']['l2'][
155 'dstmac'] if not reverse else self._params['traffic']['l2'][
157 srcip = self._params['traffic']['l3'][
158 'srcip'] if not reverse else self._params['traffic']['l3']['dstip']
159 dstip = self._params['traffic']['l3'][
160 'dstip'] if not reverse else self._params['traffic']['l3']['srcip']
161 layer2 = inet.Ether(src=srcmac, dst=dstmac)
162 layer3 = inet.IP(src=srcip, dst=dstip,
163 proto=self._params['traffic']['l3']['proto'])
164 layer4 = inet.UDP(sport=self._params['traffic']['l4']['srcport'],
165 dport=self._params['traffic']['l4']['dstport'])
166 if self._params['traffic']['vlan']['enabled']:
167 vlan = inet.Dot1Q(vlan=self._params['traffic']['vlan']['id'],
168 prio=self._params['traffic']['vlan']['priority'],
169 id=self._params['traffic']['vlan']['cfi'])
172 packet = layer2/vlan/layer3/layer4 if vlan else layer2/layer3/layer4
173 packet_bytes = bytes(packet)
174 packet_hex = '0x' + binascii.hexlify(packet_bytes).decode('utf-8')
177 def _create_api_result(self):
179 Create result dictionary per trafficgen specifications from socket API
180 stats. If stats are not available return values of 0.
181 :return: ResultsConstants as dictionary
183 # Handle each case of statistics based on if the data is available.
184 # This prevents uncaught exceptions when the stats aren't available.
185 result_dict = OrderedDict()
186 if self.tx_stats.data.get(self.tx_stats.pt_stream_keys[0]):
187 result_dict[ResultsConstants.TX_FRAMES] = self.tx_stats.data[
188 self.tx_stats.pt_stream_keys[0]]['packets']
189 result_dict[ResultsConstants.TX_RATE_FPS] = self.tx_stats.data[
190 self.tx_stats.pt_stream_keys[0]]['pps']
191 result_dict[ResultsConstants.TX_RATE_MBPS] = self.tx_stats.data[
192 self.tx_stats.pt_stream_keys[0]]['bps'] / 1000000
193 result_dict[ResultsConstants.TX_BYTES] = self.tx_stats.data[
194 self.tx_stats.pt_stream_keys[0]]['bytes']
195 # tx rate percent may need to be halved if bi directional
196 result_dict[ResultsConstants.TX_RATE_PERCENT] = line_percentage(
197 self.xmanager.ports[0], self.tx_stats, self._duration,
198 self._params['traffic']['l2']['framesize']) if \
199 self._params['traffic']['bidir'] == 'False' else\
201 self.xmanager.ports[0], self.tx_stats, self._duration,
202 self._params['traffic']['l2']['framesize']) / 2
204 self._logger.error('Transmit stats not available.')
205 result_dict[ResultsConstants.TX_FRAMES] = 0
206 result_dict[ResultsConstants.TX_RATE_FPS] = 0
207 result_dict[ResultsConstants.TX_RATE_MBPS] = 0
208 result_dict[ResultsConstants.TX_BYTES] = 0
209 result_dict[ResultsConstants.TX_RATE_PERCENT] = 0
211 if self.rx_stats.data.get('pr_tpldstraffic'):
212 result_dict[ResultsConstants.RX_FRAMES] = self.rx_stats.data[
213 'pr_tpldstraffic']['0']['packets']
215 ResultsConstants.THROUGHPUT_RX_FPS] = self.rx_stats.data[
216 'pr_tpldstraffic']['0']['pps']
218 ResultsConstants.THROUGHPUT_RX_MBPS] = self.rx_stats.data[
219 'pr_tpldstraffic']['0']['bps'] / 1000000
220 result_dict[ResultsConstants.RX_BYTES] = self.rx_stats.data[
221 'pr_tpldstraffic']['0']['bytes']
222 # throughput percent may need to be halved if bi directional
224 ResultsConstants.THROUGHPUT_RX_PERCENT] = line_percentage(
225 self.xmanager.ports[1], self.rx_stats, self._duration,
226 self._params['traffic']['l2']['framesize']) if \
227 self._params['traffic']['bidir'] == 'False' else \
229 self.xmanager.ports[1], self.rx_stats, self._duration,
230 self._params['traffic']['l2']['framesize']) / 2
233 self._logger.error('Receive stats not available.')
234 result_dict[ResultsConstants.RX_FRAMES] = 0
235 result_dict[ResultsConstants.THROUGHPUT_RX_FPS] = 0
236 result_dict[ResultsConstants.THROUGHPUT_RX_MBPS] = 0
237 result_dict[ResultsConstants.RX_BYTES] = 0
238 result_dict[ResultsConstants.THROUGHPUT_RX_PERCENT] = 0
240 if self.rx_stats.data.get('pr_tplderrors'):
241 result_dict[ResultsConstants.PAYLOAD_ERR] = self.rx_stats.data[
242 'pr_tplderrors']['0']['pld']
243 result_dict[ResultsConstants.SEQ_ERR] = self.rx_stats.data[
244 'pr_tplderrors']['0']['seq']
246 result_dict[ResultsConstants.PAYLOAD_ERR] = 0
247 result_dict[ResultsConstants.SEQ_ERR] = 0
249 if self.rx_stats.data.get('pr_tpldlatency'):
250 result_dict[ResultsConstants.MIN_LATENCY_NS] = self.rx_stats.data[
251 'pr_tpldlatency']['0']['min']
252 result_dict[ResultsConstants.MAX_LATENCY_NS] = self.rx_stats.data[
253 'pr_tpldlatency']['0']['max']
254 result_dict[ResultsConstants.AVG_LATENCY_NS] = self.rx_stats.data[
255 'pr_tpldlatency']['0']['avg']
257 result_dict[ResultsConstants.MIN_LATENCY_NS] = 0
258 result_dict[ResultsConstants.MAX_LATENCY_NS] = 0
259 result_dict[ResultsConstants.AVG_LATENCY_NS] = 0
263 def _setup_json_config(self, trials, loss_rate, testtype=None):
265 Create a 2bUsed json file that will be used for xena2544.exe execution.
266 :param trials: Number of trials
267 :param loss_rate: The acceptable loss rate as float
268 :param testtype: Either '2544_b2b' or '2544_throughput' as string
272 j_file = XenaJSON('./tools/pkt_gen/xena/profiles/baseconfig.x2544')
273 j_file.set_chassis_info(
274 settings.getValue('TRAFFICGEN_XENA_IP'),
275 settings.getValue('TRAFFICGEN_XENA_PASSWORD')
277 j_file.set_port(0, settings.getValue('TRAFFICGEN_XENA_MODULE1'),
278 settings.getValue('TRAFFICGEN_XENA_PORT1'))
279 j_file.set_port(1, settings.getValue('TRAFFICGEN_XENA_MODULE2'),
280 settings.getValue('TRAFFICGEN_XENA_PORT2'))
281 j_file.set_port_ip_v4(
282 0, settings.getValue("TRAFFICGEN_XENA_PORT0_IP"),
283 settings.getValue("TRAFFICGEN_XENA_PORT0_CIDR"),
284 settings.getValue("TRAFFICGEN_XENA_PORT0_GATEWAY"))
285 j_file.set_port_ip_v4(
286 1, settings.getValue("TRAFFICGEN_XENA_PORT1_IP"),
287 settings.getValue("TRAFFICGEN_XENA_PORT1_CIDR"),
288 settings.getValue("TRAFFICGEN_XENA_PORT1_GATEWAY"))
289 j_file.set_test_options(
290 packet_sizes=self._params['traffic']['l2']['framesize'],
291 iterations=trials, loss_rate=loss_rate,
292 duration=self._duration, micro_tpld=True if self._params[
293 'traffic']['l2']['framesize'] == 64 else False)
294 if testtype == '2544_throughput':
295 j_file.enable_throughput_test()
296 elif testtype == '2544_b2b':
297 j_file.enable_back2back_test()
299 j_file.set_header_layer2(
300 dst_mac=self._params['traffic']['l2']['dstmac'],
301 src_mac=self._params['traffic']['l2']['srcmac'])
302 j_file.set_header_layer3(
303 src_ip=self._params['traffic']['l3']['srcip'],
304 dst_ip=self._params['traffic']['l3']['dstip'],
305 protocol=self._params['traffic']['l3']['proto'])
306 j_file.set_header_layer4_udp(
307 source_port=self._params['traffic']['l4']['srcport'],
308 destination_port=self._params['traffic']['l4']['dstport'])
309 if self._params['traffic']['vlan']['enabled']:
310 j_file.set_header_vlan(
311 vlan_id=self._params['traffic']['vlan']['id'],
312 id=self._params['traffic']['vlan']['cfi'],
313 prio=self._params['traffic']['vlan']['priority'])
314 j_file.add_header_segments(
315 flows=self._params['traffic']['multistream'],
316 multistream_layer=self._params['traffic']['stream_type'])
318 if self._params['traffic']['bidir'] == "True":
319 j_file.set_topology_mesh()
321 j_file.set_topology_blocks()
323 j_file.write_config('./tools/pkt_gen/xena/profiles/2bUsed.x2544')
324 except Exception as exc:
325 self._logger.exception("Error during Xena JSON setup: %s", exc)
328 def _start_traffic_api(self, packet_limit):
330 Start the Xena traffic using the socket API driver
331 :param packet_limit: packet limit for stream, set to -1 for no limit
334 if not self.xmanager:
335 self._xsocket = XenaSocketDriver(
336 settings.getValue('TRAFFICGEN_XENA_IP'))
337 self.xmanager = XenaManager(
338 self._xsocket, settings.getValue('TRAFFICGEN_XENA_USER'),
339 settings.getValue('TRAFFICGEN_XENA_PASSWORD'))
341 # for the report file version info ask the chassis directly for its
343 settings.setValue('XENA_VERSION', 'XENA Socket API - {}'.format(
344 self.xmanager.get_version()))
346 if not len(self.xmanager.ports):
347 self.xmanager.ports[0] = self.xmanager.add_module_port(
348 settings.getValue('TRAFFICGEN_XENA_MODULE1'),
349 settings.getValue('TRAFFICGEN_XENA_PORT1'))
350 if not self.xmanager.ports[0].reserve_port():
352 'Unable to reserve port 0. Please release Xena Port')
354 if len(self.xmanager.ports) < 2:
355 self.xmanager.ports[1] = self.xmanager.add_module_port(
356 settings.getValue('TRAFFICGEN_XENA_MODULE2'),
357 settings.getValue('TRAFFICGEN_XENA_PORT2'))
358 if not self.xmanager.ports[1].reserve_port():
360 'Unable to reserve port 1. Please release Xena Port')
362 # Clear port configuration for a clean start
363 self.xmanager.ports[0].reset_port()
364 self.xmanager.ports[1].reset_port()
365 self.xmanager.ports[0].clear_stats()
366 self.xmanager.ports[1].clear_stats()
368 # set the port IP from the conf file
369 self.xmanager.ports[0].set_port_ip(
370 settings.getValue('TRAFFICGEN_XENA_PORT0_IP'),
371 settings.getValue('TRAFFICGEN_XENA_PORT0_CIDR'),
372 settings.getValue('TRAFFICGEN_XENA_PORT0_GATEWAY'))
373 self.xmanager.ports[1].set_port_ip(
374 settings.getValue('TRAFFICGEN_XENA_PORT1_IP'),
375 settings.getValue('TRAFFICGEN_XENA_PORT1_CIDR'),
376 settings.getValue('TRAFFICGEN_XENA_PORT1_GATEWAY'))
378 def setup_stream(stream, port, payload_id, flip_addr=False):
380 Helper function to configure streams.
381 :param stream: Stream object from XenaDriver module
382 :param port: Port object from XenaDriver module
383 :param payload_id: payload ID as int
384 :param flip_addr: Boolean if the source and destination addresses
389 stream.set_packet_limit(packet_limit)
391 stream.set_rate_fraction(
392 10000 * self._params['traffic']['frame_rate'])
393 stream.set_packet_header(self._build_packet_header(
395 stream.set_header_protocol(
396 'ETHERNET VLAN IP UDP' if self._params['traffic']['vlan'][
397 'enabled'] else 'ETHERNET IP UDP')
398 stream.set_packet_length(
399 'fixed', self._params['traffic']['l2']['framesize'], 16383)
400 stream.set_packet_payload('incrementing', '0x00')
401 stream.set_payload_id(payload_id)
402 port.set_port_time_limit(self._duration * 1000000)
404 if self._params['traffic']['l2']['framesize'] == 64:
406 port.micro_tpld_enable()
408 if self._params['traffic']['multistream']:
409 stream.enable_multistream(
410 flows=self._params['traffic']['multistream'],
411 layer=self._params['traffic']['stream_type'])
413 s1_p0 = self.xmanager.ports[0].add_stream()
414 setup_stream(s1_p0, self.xmanager.ports[0], 0)
416 if self._params['traffic']['bidir'] == 'True':
417 s1_p1 = self.xmanager.ports[1].add_stream()
418 setup_stream(s1_p1, self.xmanager.ports[1], 1, flip_addr=True)
420 if not self.xmanager.ports[0].traffic_on():
422 "Failure to start port 0. Check settings and retry.")
423 if self._params['traffic']['bidir'] == 'True':
424 if not self.xmanager.ports[1].traffic_on():
426 "Failure to start port 1. Check settings and retry.")
427 sleep(self._duration)
429 if self._params['traffic']['bidir'] == 'True':
430 # need to aggregate out both ports stats and assign that data
431 self.rx_stats = self.xmanager.ports[1].get_rx_stats()
432 self.tx_stats = self.xmanager.ports[0].get_tx_stats()
433 self.tx_stats.data = aggregate_stats(
435 self.xmanager.ports[1].get_tx_stats().data)
436 self.rx_stats.data = aggregate_stats(
438 self.xmanager.ports[0].get_rx_stats().data)
440 # no need to aggregate, just grab the appropriate port stats
441 self.tx_stats = self.xmanager.ports[0].get_tx_stats()
442 self.rx_stats = self.xmanager.ports[1].get_rx_stats()
445 def _stop_api_traffic(self):
447 Stop traffic through the socket API
448 :return: Return results from _create_api_result method
450 self.xmanager.ports[0].traffic_off()
451 if self._params['traffic']['bidir'] == 'True':
452 self.xmanager.ports[1].traffic_off()
455 stat = self._create_api_result()
460 self._logger.debug('Connect')
463 def disconnect(self):
464 """Disconnect from the traffic generator.
466 As with :func:`connect`, this function is optional.
469 Where implemented, this function should raise an exception on
474 self._logger.debug('disconnect')
476 self.xmanager.disconnect()
480 self._xsocket.disconnect()
483 def send_burst_traffic(self, traffic=None, numpkts=100, duration=20):
484 """Send a burst of traffic.
486 Send a ``numpkts`` packets of traffic, using ``traffic``
487 configuration, with a timeout of ``time``.
490 :param traffic: Detailed "traffic" spec, i.e. IP address, VLAN tags
491 :param numpkts: Number of packets to send
492 :param duration: Time to wait to receive packets
494 :returns: dictionary of strings with following data:
498 - List of List of Rx Bytes,
499 - Payload Errors and Sequence Errors.
501 raise NotImplementedError('Xena burst traffic not implemented')
503 def send_cont_traffic(self, traffic=None, duration=20):
504 """Send a continuous flow of traffic.
506 See ITrafficGenerator for description
508 self._duration = duration
511 self._params['traffic'] = self.traffic_defaults.copy()
513 self._params['traffic'] = merge_spec(self._params['traffic'],
516 self._start_traffic_api(-1)
517 return self._stop_api_traffic()
519 def start_cont_traffic(self, traffic=None, duration=20):
520 """Non-blocking version of 'send_cont_traffic'.
522 See ITrafficGenerator for description
524 self._duration = duration
527 self._params['traffic'] = self.traffic_defaults.copy()
529 self._params['traffic'] = merge_spec(self._params['traffic'],
532 self._start_traffic_api(-1)
534 def stop_cont_traffic(self):
535 """Stop continuous transmission and return results.
537 return self._stop_api_traffic()
539 def send_rfc2544_throughput(self, traffic=None, trials=3, duration=20,
541 """Send traffic per RFC2544 throughput test specifications.
543 See ITrafficGenerator for description
545 self._duration = duration
548 self._params['traffic'] = self.traffic_defaults.copy()
550 self._params['traffic'] = merge_spec(self._params['traffic'],
553 self._setup_json_config(trials, lossrate, '2544_throughput')
555 args = ["mono", "./tools/pkt_gen/xena/Xena2544.exe", "-c",
556 "./tools/pkt_gen/xena/profiles/2bUsed.x2544", "-e", "-r",
557 "./tools/pkt_gen/xena", "-u",
558 settings.getValue('TRAFFICGEN_XENA_USER')]
559 self.mono_pipe = subprocess.Popen(args, stdout=sys.stdout)
560 self.mono_pipe.communicate()
561 root = ET.parse(r'./tools/pkt_gen/xena/xena2544-report.xml').getroot()
562 return Xena._create_throughput_result(root)
564 def start_rfc2544_throughput(self, traffic=None, trials=3, duration=20,
566 """Non-blocking version of 'send_rfc2544_throughput'.
568 See ITrafficGenerator for description
570 self._duration = duration
572 self._params['traffic'] = self.traffic_defaults.copy()
574 self._params['traffic'] = merge_spec(self._params['traffic'],
577 self._setup_json_config(trials, lossrate, '2544_throughput')
579 args = ["mono", "./tools/pkt_gen/xena/Xena2544.exe", "-c",
580 "./tools/pkt_gen/xena/profiles/2bUsed.x2544", "-e", "-r",
581 "./tools/pkt_gen/xena", "-u",
582 settings.getValue('TRAFFICGEN_XENA_USER')]
583 self.mono_pipe = subprocess.Popen(args, stdout=sys.stdout)
585 def wait_rfc2544_throughput(self):
586 """Wait for and return results of RFC2544 test.
588 See ITrafficGenerator for description
590 self.mono_pipe.communicate()
592 root = ET.parse(r'./tools/pkt_gen/xena/xena2544-report.xml').getroot()
593 return Xena._create_throughput_result(root)
595 def send_rfc2544_back2back(self, traffic=None, trials=1, duration=20,
597 """Send traffic per RFC2544 back2back test specifications.
599 See ITrafficGenerator for description
601 self._duration = duration
604 self._params['traffic'] = self.traffic_defaults.copy()
606 self._params['traffic'] = merge_spec(self._params['traffic'],
609 self._setup_json_config(trials, lossrate, '2544_b2b')
611 args = ["mono", "./tools/pkt_gen/xena/Xena2544.exe", "-c",
612 "./tools/pkt_gen/xena/profiles/2bUsed.x2544", "-e", "-r",
613 "./tools/pkt_gen/xena", "-u",
614 settings.getValue('TRAFFICGEN_XENA_USER')]
615 self.mono_pipe = subprocess.Popen(
616 args, stdout=sys.stdout)
617 self.mono_pipe.communicate()
618 root = ET.parse(r'./tools/pkt_gen/xena/xena2544-report.xml').getroot()
619 return Xena._create_throughput_result(root)
621 def start_rfc2544_back2back(self, traffic=None, trials=1, duration=20,
623 """Non-blocking version of 'send_rfc2544_back2back'.
625 See ITrafficGenerator for description
627 self._duration = duration
630 self._params['traffic'] = self.traffic_defaults.copy()
632 self._params['traffic'] = merge_spec(self._params['traffic'],
635 self._setup_json_config(trials, lossrate, '2544_b2b')
637 args = ["mono", "./tools/pkt_gen/xena/Xena2544.exe", "-c",
638 "./tools/pkt_gen/xena/profiles/2bUsed.x2544", "-e", "-r",
639 "./tools/pkt_gen/xena", "-u",
640 settings.getValue('TRAFFICGEN_XENA_USER')]
641 self.mono_pipe = subprocess.Popen(
642 args, stdout=sys.stdout)
644 def wait_rfc2544_back2back(self):
645 """Wait and set results of RFC2544 test.
647 self.mono_pipe.communicate()
649 root = ET.parse(r'./tools/pkt_gen/xena/xena2544-report.xml').getroot()
650 return Xena._create_throughput_result(root)
653 if __name__ == "__main__":