1 # Copyright 2016-2017 Red Hat Inc & Xena Networks.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 # This is a port of code by Xena and Flavios ported to python 3 compatibility.
16 # Credit given to Xena and Flavio for providing most of the logic of this code.
17 # The code has changes for PEP 8 and python 3 conversion. Added Stat classes
18 # for better scaling of future requirements. Also added calculation functions
19 # for line rate to align within VSPerf project.
20 # Flavios xena libraries available at https://github.com/fleitner/XenaPythonLib
23 # Flavio Leitner, Red Hat Inc.
24 # Dan Amzulescu, Xena Networks
25 # Christian Trautman, Red Hat Inc.
28 Xena Socket API Driver module for communicating directly with Xena system
29 through socket commands and returning different statistics.
39 # pylint: disable=too-many-lines
40 # Xena Socket Commands
41 CMD_CLEAR_RX_STATS = 'pr_clear'
42 CMD_CLEAR_TX_STATS = 'pt_clear'
44 CMD_CREATE_STREAM = 'ps_create'
45 CMD_DELETE_STREAM = 'ps_delete'
46 CMD_GET_PORT_SPEED = 'p_speed ?'
47 CMD_GET_PORT_SPEED_REDUCTION = 'p_speedreduction ?'
48 CMD_GET_RX_STATS_PER_TID = 'pr_tpldtraffic'
49 CMD_GET_STREAM_DATA = 'pt_stream'
50 CMD_GET_STREAMS_PER_PORT = 'ps_indices'
51 CMD_GET_TID_PER_STREAM = 'ps_tpldid'
52 CMD_GET_TX_STATS_PER_STREAM = 'pt_stream'
53 CMD_GET_RX_STATS = 'pr_all ?'
54 CMD_GET_TX_STATS = 'pt_all ?'
55 CMD_INTERFRAME_GAP = 'p_interframegap'
57 CMD_LOGOFF = 'c_logoff'
60 CMD_PORT_IP = 'p_ipaddress'
61 CMD_PORT_LEARNING = 'p_autotrain'
62 CMD_RESERVE = 'p_reservation reserve'
63 CMD_RELEASE = 'p_reservation release'
64 CMD_RELINQUISH = 'p_reservation relinquish'
66 CMD_SET_PORT_ARP_REPLY = 'p_arpreply'
67 CMD_SET_PORT_ARP_V6_REPLY = 'p_arpv6reply'
68 CMD_SET_PORT_PING_REPLY = 'p_pingreply'
69 CMD_SET_PORT_PING_V6_REPLY = 'p_pingv6reply'
70 CMD_SET_PORT_TIME_LIMIT = 'p_txtimelimit'
71 CMD_SET_STREAM_HEADER_PROTOCOL = 'ps_headerprotocol'
72 CMD_SET_STREAM_ON_OFF = 'ps_enable'
73 CMD_SET_STREAM_PACKET_HEADER = 'ps_packetheader'
74 CMD_SET_STREAM_PACKET_LENGTH = 'ps_packetlength'
75 CMD_SET_STREAM_PACKET_LIMIT = 'ps_packetlimit'
76 CMD_SET_STREAM_PACKET_PAYLOAD = 'ps_payload'
77 CMD_SET_STREAM_RATE_FRACTION = 'ps_ratefraction'
78 CMD_SET_STREAM_TEST_PAYLOAD_ID = 'ps_tpldid'
79 CMD_SET_TPLD_MODE = 'p_tpldmode'
80 CMD_START_TRAFFIC = 'p_traffic on'
81 CMD_STOP_TRAFFIC = 'p_traffic off'
82 CMD_STREAM_MODIFIER = 'ps_modifier'
83 CMD_STREAM_MODIFIER_COUNT = 'ps_modifiercount'
84 CMD_STREAM_MODIFIER_RANGE = 'ps_modifierrange'
85 CMD_VERSION = 'c_versionno ?'
87 _LOCALE = locale.getlocale()[1]
88 _LOGGER = logging.getLogger(__name__)
92 Mod set attribute tracker
94 def __init__(self, **kwargs):
96 All mods default to False
97 :param kwargs: Any class attribute can be set here.
99 self.mod_src_mac = False
100 self.mod_dst_mac = False
101 self.mod_src_ip = False
102 self.mod_dst_ip = False
103 self.mod_src_port = False
104 self.mod_dst_port = False
106 for (key, value) in kwargs.items():
107 if hasattr(self, key):
108 setattr(self, key, value)
111 class SimpleSocket(object):
115 def __init__(self, hostname, port=5025, timeout=1):
117 :param hostname: hostname or ip as string
118 :param port: port number to use for socket as int
119 :param timeout: socket timeout as int
120 :return: SimpleSocket object
122 self.hostname = hostname
124 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
125 self.sock.settimeout(timeout)
126 self.sock.connect((hostname, port))
127 except socket.error as msg:
129 "Cannot connect to Xena Socket at %s", hostname)
130 _LOGGER.error("Exception : %s", msg)
140 """ Send the command over the socket
141 :param cmd: cmd as string
142 :return: byte utf encoded return value from socket
146 self.sock.send(cmd.encode('utf-8'))
147 return self.sock.recv(1024)
151 def read_reply(self):
152 """ Get the response from the socket
153 :return: Return the reply
155 reply = self.sock.recv(1024)
156 if reply.find("---^".encode('utf-8')) != -1:
157 # read again the syntax error msg
158 reply = self.sock.recv(1024)
161 def send_command(self, cmd):
162 """ Send the command specified over the socket
163 :param cmd: Command to send as string
167 self.sock.send(cmd.encode('utf-8'))
169 def set_keep_alive(self):
170 """ Set the keep alive for the socket
173 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
176 class KeepAliveThread(threading.Thread):
178 Keep alive socket class
182 def __init__(self, connection, interval=10):
184 :param connection: Socket for keep alive
185 :param interval: interval in seconds to send keep alive
186 :return: KeepAliveThread object
188 threading.Thread.__init__(self)
189 self.connection = connection
190 self.interval = interval
191 self.finished = threading.Event()
194 'Xena Socket keep alive thread initiated, interval %s seconds', self.interval)
197 """ Thread stop. See python thread docs for more info
204 """ Thread start. See python thread docs for more info
207 while not self.finished.isSet():
208 self.finished.wait(self.interval)
209 self.connection.ask(self.message)
212 class XenaSocketDriver(SimpleSocket):
218 def __init__(self, hostname, port=22611):
220 :param hostname: Hostname or ip as string
221 :param port: port to use as int
222 :return: XenaSocketDriver object
224 SimpleSocket.__init__(self, hostname=hostname, port=port)
225 SimpleSocket.set_keep_alive(self)
226 self.access_semaphor = threading.Semaphore(1)
229 """ Send the command over the socket in a thread safe manner
230 :param cmd: Command to send
231 :return: reply from socket
233 self.access_semaphor.acquire()
234 reply = SimpleSocket.ask(self, cmd)
235 self.access_semaphor.release()
238 def ask_verify(self, cmd):
239 """ Send the command over the socket in a thread safe manner and
240 verify the response is good.
241 :param cmd: Command to send
242 :return: Boolean True if command response is good, False otherwise
244 resp = self.ask(cmd).decode(_LOCALE).strip('\n')
245 _LOGGER.info('[ask_verify] %s', resp)
246 if resp == self.reply_ok:
250 def disconnect(self):
252 Close the socket connection
257 def send_command(self, cmd):
258 """ Send the command over the socket with no return
259 :param cmd: Command to send
262 self.access_semaphor.acquire()
263 SimpleSocket.send_command(self, cmd)
264 self.access_semaphor.release()
266 def send_query_replies(self, cmd):
267 """ Send the command over the socket and wait for all replies and return
269 :param cmd: Command to send
270 :return: Response from command as list
272 # send the command followed by cmd SYNC to find out
273 # when the last reply arrives.
274 self.send_command(cmd.strip('\n'))
275 self.send_command('SYNC')
277 self.access_semaphor.acquire()
278 msg = SimpleSocket.read_reply(self).decode(_LOCALE)
282 (reply, msgleft) = msg.split('\n', 1)
283 # check for syntax problems
284 if reply.rfind('Syntax') != -1:
285 self.access_semaphor.release()
288 if reply.rfind('<SYNC>') == 0:
290 self.access_semaphor.release()
293 replies.append(reply + '\n')
297 msgnew = SimpleSocket.read_reply(self).decode(_LOCALE)
298 msg = msgleft + msgnew
301 class XenaManager(object):
303 Manager class for port and socket functions
305 def __init__(self, socketDriver, user='', password='xena'):
308 Establish a connection to Xena using a ``driver`` with the ``password``
312 :param socketDriver: XenaSocketDriver connection object
313 :param password: Password to the Xena traffic generator
314 :returns: XenaManager object
316 self.driver = socketDriver
318 self.keep_alive_thread = KeepAliveThread(self.driver)
320 if self.logon(password):
321 _LOGGER.info('Connected to Xena at %s', self.driver.hostname)
323 _LOGGER.error('Failed to logon to Xena at %s', self.driver.hostname)
328 def disconnect(self):
329 """ Release ports and disconnect from chassis.
331 for module_port in self.ports:
332 module_port.release_port()
335 self.keep_alive_thread.stop()
337 def add_module_port(self, module, port):
338 """Factory for Xena Ports
340 :param module: String or int of module
341 :param port: String or int of port
342 :return: XenaPort object if success, None if port already added
344 xenaport = XenaPort(self, module, port)
345 if xenaport in self.ports:
348 self.ports.append(xenaport)
351 def get_module_port(self, module, port):
352 """Return the Xena Port object if available
353 :param module: module number as int or str
354 :param port: port number as int or str
355 :return: XenaPort object or None if not found
357 for por in self.ports:
358 if por.port == str(port) and por.module == str(module):
362 def get_version(self):
364 Get the version from the chassis
365 :return: versions of server and driver as string
367 res = self.driver.ask(make_manager_command(
368 CMD_VERSION, '')).decode(_LOCALE)
369 res = res.rstrip('\n').split()
370 return "Server: {} Driver: {}".format(res[1], res[2])
374 Logoff from the Xena chassis
375 :return: Boolean True if response OK, False if error.
377 return self.driver.ask_verify(make_manager_command(CMD_LOGOFF))
379 def logon(self, password):
380 """Login to the Xena traffic generator using the ``password`` supplied.
382 :param password: string of password
383 :return: Boolean True if response OK, False if error.
385 self.keep_alive_thread.start()
386 return self.driver.ask_verify(make_manager_command(CMD_LOGIN, password))
388 def set_owner(self, username):
389 """Set the ports owner.
390 :return: Boolean True if response OK, False if error.
392 return self.driver.ask_verify(make_manager_command(CMD_OWNER, username))
394 # pylint: disable=too-many-public-methods
395 class XenaPort(object):
397 Xena Port emulator class
399 def __init__(self, manager, module, port):
402 :param manager: XenaManager object
403 :param module: Module as string or int of module to use
404 :param port: Port as string or int of port to use
405 :return: XenaPort object
407 self._manager = manager
408 self._module = str(module)
409 self._port = str(port)
410 self._streams = list()
414 """Property for manager attribute
415 :return: manager object
421 """Property for module attribute
422 :return: module value as string
428 """Property for port attribute
429 :return: port value as string
433 def port_string(self):
434 """String builder with attributes
435 :return: String of module port for command sequence
437 stringify = "{}/{}".format(self._module, self._port)
440 def add_stream(self):
441 """Add a stream to the port.
442 :return: XenaStream object, None if failure
444 identifier = len(self._streams)
445 stream = XenaStream(self, identifier)
446 if self._manager.driver.ask_verify(make_stream_command(
447 CMD_CREATE_STREAM, '', stream)):
448 self._streams.append(stream)
451 _LOGGER.error("Error during stream creation")
454 def clear_stats(self, rx_clear=True, tx_clear=True):
455 """Clear the port stats
457 :param rx_clear: Boolean if rx stats are to be cleared
458 :param tx_clear: Boolean if tx stats are to be cleared
459 :return: Boolean True if response OK, False if error.
461 command = make_port_command(CMD_CLEAR_RX_STATS, self)
462 res1 = self._manager.driver.ask_verify(command) if rx_clear else True
463 command = make_port_command(CMD_CLEAR_TX_STATS, self)
464 res2 = self._manager.driver.ask_verify(command) if tx_clear else True
465 return all([res1, res2])
467 def get_effective_speed(self):
469 Get the effective speed on the port
470 :return: effective speed as float
472 port_speed = self.get_port_speed()
473 reduction = self.get_port_speed_reduction()
474 effective_speed = port_speed * (1.0 - reduction / 1000000.0)
475 return effective_speed
477 def get_inter_frame_gap(self):
479 Get the interframe gap and return it as string
480 :return: integer of interframe gap
482 command = make_port_command(CMD_INTERFRAME_GAP + '?', self)
483 res = self._manager.driver.ask(command).decode(_LOCALE)
484 res = int(res.rstrip('\n').split(' ')[-1])
487 def get_port_speed(self):
489 Get the port speed as bits from port and return it as a int.
490 :return: Int of port speed
492 command = make_port_command(CMD_GET_PORT_SPEED, self)
493 res = self._manager.driver.ask(command).decode(_LOCALE)
494 port_speed = res.split(' ')[-1].rstrip('\n')
495 return int(port_speed) * 1000000
497 def get_port_speed_reduction(self):
499 Get the port speed reduction value as int
500 :return: Integer of port speed reduction value
502 command = make_port_command(CMD_GET_PORT_SPEED_REDUCTION, self)
503 res = self._manager.driver.ask(command).decode(_LOCALE)
504 res = int(res.rstrip('\n').split(' ')[-1])
507 def get_rx_stats(self):
508 """Get the rx stats and return the data as a dict.
509 :return: Receive stats as dictionary
511 command = make_port_command(CMD_GET_RX_STATS, self)
512 rx_data = self._manager.driver.send_query_replies(command)
513 data = XenaRXStats(rx_data, time.time())
516 def get_tx_stats(self):
517 """Get the tx stats and return the data as a dict.
518 :return: Receive stats as dictionary
520 command = make_port_command(CMD_GET_TX_STATS, self)
521 tx_data = self._manager.driver.send_query_replies(command)
522 data = XenaTXStats(tx_data, time.time())
525 def micro_tpld_disable(self):
526 """Disable micro TPLD and return to standard payload size
527 :return: Boolean if response OK, False if error
529 command = make_port_command(CMD_SET_TPLD_MODE + ' normal', self)
530 return self._manager.driver.ask_verify(command)
532 def micro_tpld_enable(self):
533 """Enable micro TPLD 6 byte payloads.
534 :Return Boolean if response OK, False if error
536 command = make_port_command(CMD_SET_TPLD_MODE + ' micro', self)
537 return self._manager.driver.ask_verify(command)
539 def release_port(self):
541 :return: Boolean True if response OK, False if error.
543 command = make_port_command(CMD_RELEASE, self)
544 return self._manager.driver.ask_verify(command)
546 def reserve_port(self):
548 :return: Boolean True if response OK, False if error.
550 command = make_port_command(CMD_RESERVE, self)
551 return self._manager.driver.ask_verify(command)
553 def reset_port(self):
555 :return: Boolean True if response OK, False if error.
557 command = make_port_command(CMD_RESET, self)
558 return self._manager.driver.ask_verify(command)
560 def set_port_arp_reply(self, is_on=True, ipv6=False):
562 Set the port arpreply value
563 :param on: Enable or disable the arp reply on the port
564 :param v6: set the value on the ip v6, disabled will set at ip v4
565 :return: Boolean True if response OK, False if error
567 command = make_port_command('{} {}'.format(
568 CMD_SET_PORT_ARP_V6_REPLY if ipv6 else CMD_SET_PORT_ARP_REPLY,
569 "on" if is_on else "off"), self)
570 return self._manager.driver.ask_verify(command)
572 def set_port_ping_reply(self, is_on=True, ipv6=False):
574 Set the port ping reply value
575 :param on: Enable or disable the ping reply on the port
576 :param v6: set the value on the ip v6, disabled will set at ip v4
577 :return: Boolean True if response OK, False if error
579 command = make_port_command('{} {}'.format(
580 CMD_SET_PORT_PING_V6_REPLY if ipv6 else CMD_SET_PORT_PING_REPLY,
581 "on" if is_on else "off"), self)
582 return self._manager.driver.ask_verify(command)
584 def set_port_learning(self, interval):
585 """Start port learning with the interval in seconds specified. 0 disables port learning
586 :param: interval as int
587 :return: Boolean True if response OK, False if error.
589 command = make_port_command('{} {}'.format(CMD_PORT_LEARNING, interval), self)
590 return self._manager.driver.ask_verify(command)
592 def set_port_ip(self, ip_addr, cidr, gateway, wild='255'):
594 Set the port ip address of the specific port
595 :param ip_addr: IP address to set to port
596 :param cidr: cidr number for the subnet
597 :param gateway: Gateway ip for port
598 :param wild: wildcard used for ARP and PING replies
599 :return: Boolean True if response OK, False if error
601 # convert the cidr to a dot notation subnet address
602 subnet = socket.inet_ntoa(
603 struct.pack(">I", (0xffffffff << (32 - cidr)) & 0xffffffff))
605 command = make_port_command('{} {} {} {} 0.0.0.{}'.format(
606 CMD_PORT_IP, ip_addr, subnet, gateway, wild), self)
607 return self._manager.driver.ask_verify(command)
609 def set_port_time_limit(self, micro_seconds):
610 """Set the port time limit in ms
611 :param micro_seconds: ms for port time limit
612 :return: Boolean True if response OK, False if error.
614 command = make_port_command('{} {}'.format(
615 CMD_SET_PORT_TIME_LIMIT, micro_seconds), self)
616 return self._manager.driver.ask_verify(command)
618 def traffic_off(self):
620 :return: Boolean True if response OK, False if error.
622 command = make_port_command(CMD_STOP_TRAFFIC, self)
623 return self._manager.driver.ask_verify(command)
625 def traffic_on(self):
627 :return: Boolean True if response OK, False if error.
629 command = make_port_command(CMD_START_TRAFFIC, self)
630 return self._manager.driver.ask_verify(command)
633 class XenaStream(object):
635 Xena stream emulator class
637 def __init__(self, xenaPort, streamID):
640 :param xenaPort: XenaPort object
641 :param streamID: Stream ID as int or string
642 :return: XenaStream object
644 self._xena_port = xenaPort
645 self._stream_id = str(streamID)
646 self._manager = self._xena_port.manager
647 self._header_protocol = None
651 """Property for port attribute
652 :return: XenaPort object
654 return self._xena_port
658 """Property for streamID attribute
659 :return: streamID value as string
661 return self._stream_id
663 def enable_multistream(self, flows, mod_class):
665 Implementation of multi stream. Enable multi stream by setting
666 modifiers on the stream. If no mods are selected, src_ip mod will be used.
667 :param flows: Numbers of flows, Values greater than 65535 will square rooted
668 to the closest value. Xena mods are limited to 4 bytes.
669 :param mod_class: ModSet object
670 :return: True if success False otherwise
672 if not self._header_protocol:
674 "Please set a protocol header before calling this method.")
675 # maximum value for a Xena modifier is 65535 (unsigned int). If flows
676 # is greater than 65535 we have to do two mods getting as close as we
677 # can with square rooting the flow count.
678 if flows > 4294836225:
679 _LOGGER.debug('Flow mods exceeds highest value, changing to 4294836225')
685 mod1, mod2 = int(math.sqrt(flows)), int(math.sqrt(flows))
686 _LOGGER.debug('Flow count modified to %s', mod1*mod2)
688 if not any([mod_class.mod_src_mac, mod_class.mod_dst_mac, mod_class.mod_src_ip,
689 mod_class.mod_dst_ip, mod_class.mod_src_port, mod_class.mod_dst_port]):
690 # no mods were selected, default to src ip only
691 mod_class.mod_src_ip = True
692 if mod_class.mod_src_mac:
693 offset_list.append(3)
694 if mod_class.mod_dst_mac:
695 offset_list.append(9)
696 if mod_class.mod_src_ip:
697 offset_list.append(32 if 'VLAN' in self._header_protocol else 28)
698 if mod_class.mod_dst_ip:
699 offset_list.append(36 if 'VLAN' in self._header_protocol else 32)
700 if mod_class.mod_src_port:
701 offset_list.append(38 if 'VLAN' in self._header_protocol else 34)
702 if mod_class.mod_dst_port:
703 offset_list.append(40 if 'VLAN' in self._header_protocol else 36)
704 # calculate how many mods we have to do
705 countertotal = len(offset_list)
707 # to handle flows greater than 65535 we will need more mods for
709 for mod in [mod_class.mod_src_mac, mod_class.mod_dst_mac,
710 mod_class.mod_src_ip, mod_class.mod_dst_ip]:
713 command = make_port_command(
714 CMD_STREAM_MODIFIER_COUNT + ' [{}]'.format(self._stream_id) +
715 ' {}'.format(countertotal), self._xena_port)
717 responses.append(self._manager.driver.ask_verify(command))
719 for offset in offset_list:
720 if (mod_class.mod_dst_port or mod_class.mod_src_port) and \
721 (offset >= 38 if 'VLAN' in self._header_protocol else 34):
722 # only do a 1 mod for udp ports at max 65535
723 newmod1 = 65535 if flows >= 65535 else flows
724 command = make_port_command(
725 CMD_STREAM_MODIFIER + ' [{},{}] {} 0xFFFF0000 INC 1'.format(
726 self._stream_id, modcounter, offset), self._xena_port)
727 responses.append(self._manager.driver.ask_verify(command))
728 command = make_port_command(
729 CMD_STREAM_MODIFIER_RANGE + ' [{},{}] 0 1 {}'.format(
730 self._stream_id, modcounter, newmod1 - 1), self._xena_port)
731 responses.append(self._manager.driver.ask_verify(command))
733 command = make_port_command(
734 CMD_STREAM_MODIFIER + ' [{},{}] {} 0xFFFF0000 INC 1'.format(
735 self._stream_id, modcounter, offset), self._xena_port)
736 responses.append(self._manager.driver.ask_verify(command))
737 command = make_port_command(
738 CMD_STREAM_MODIFIER_RANGE + ' [{},{}] 0 1 {}'.format(
739 self._stream_id, modcounter, mod1 - 1), self._xena_port)
740 responses.append(self._manager.driver.ask_verify(command))
741 # if we have a second modifier set the modifier to mod2 and to
742 # incremement once every full rotation of mod 1
745 command = make_port_command(
746 CMD_STREAM_MODIFIER + ' [{},{}] {} 0xFFFF0000 INC {}'.format(
747 self._stream_id, modcounter, offset-2, mod1), self._xena_port)
748 responses.append(self._manager.driver.ask_verify(command))
749 command = make_port_command(
750 CMD_STREAM_MODIFIER_RANGE + ' [{},{}] 0 1 {}'.format(
751 self._stream_id, modcounter, mod2), self._xena_port)
752 responses.append(self._manager.driver.ask_verify(command))
754 return all(responses) # return True if they all worked
756 def get_stream_data(self):
758 Get the response for stream data
759 :return: String of response for stream data info
761 command = make_stream_command(CMD_GET_STREAM_DATA, '?', self)
762 res = self._manager.driver.ask(command).decode(_LOCALE)
765 def set_header_protocol(self, protocol_header):
766 """Set the header info for the packet header hex.
767 If the packet header contains just Ethernet and IP info then call this
768 method with ETHERNET IP as the protocol header.
770 :param protocol_header: protocol header argument
771 :return: Boolean True if success, False if error
773 command = make_stream_command(
774 CMD_SET_STREAM_HEADER_PROTOCOL,
775 protocol_header, self)
776 if self._manager.driver.ask_verify(command):
777 self._header_protocol = protocol_header
783 """Set the stream to off
784 :return: Boolean True if success, False if error
786 return self._manager.driver.ask_verify(make_stream_command(
787 CMD_SET_STREAM_ON_OFF, 'off', self))
790 """Set the stream to on
791 :return: Boolean True if success, False if error
793 return self._manager.driver.ask_verify(make_stream_command(
794 CMD_SET_STREAM_ON_OFF, 'on', self))
796 def set_packet_header(self, header):
797 """Set the stream packet header
799 :param header: packet header as hex bytes
800 :return: Boolean True if success, False if error
802 return self._manager.driver.ask_verify(make_stream_command(
803 CMD_SET_STREAM_PACKET_HEADER, header, self))
805 def set_packet_length(self, pattern_type, minimum, maximum):
806 """Set the pattern length with min and max values based on the pattern
809 :param pattern_type: String of pattern type, valid entries [ fixed,
810 butterfly, random, mix, incrementing ]
811 :param minimum: integer of minimum byte value
812 :param maximum: integer of maximum byte value
813 :return: Boolean True if success, False if error
815 return self._manager.driver.ask_verify(make_stream_command(
816 CMD_SET_STREAM_PACKET_LENGTH, '{} {} {}'.format(
817 pattern_type, minimum, maximum), self))
819 def set_packet_limit(self, limit):
820 """Set the packet limit
822 :param limit: number of packets that will be sent, use -1 to disable
823 :return: Boolean True if success, False if error
825 return self._manager.driver.ask_verify(make_stream_command(
826 CMD_SET_STREAM_PACKET_LIMIT, limit, self))
828 def set_packet_payload(self, payload_type, hex_value):
829 """Set the payload to the hex value based on the payload type
831 :param payload_type: string of the payload type, valid entries [ pattern,
833 :param hex_value: hex string of valid hex
834 :return: Boolean True if success, False if error
836 return self._manager.driver.ask_verify(make_stream_command(
837 CMD_SET_STREAM_PACKET_PAYLOAD, '{} {}'.format(
838 payload_type, hex_value), self))
840 def set_rate_fraction(self, fraction):
841 """Set the rate fraction
843 :param fraction: fraction for the stream
844 :return: Boolean True if success, False if error
846 return self._manager.driver.ask_verify(make_stream_command(
847 CMD_SET_STREAM_RATE_FRACTION, fraction, self))
849 def set_payload_id(self, identifier):
850 """ Set the test payload ID
851 :param identifier: ID as int or string
852 :return: Boolean True if success, False if error
854 return self._manager.driver.ask_verify(make_stream_command(
855 CMD_SET_STREAM_TEST_PAYLOAD_ID, identifier, self))
858 class XenaRXStats(object):
862 def __init__(self, stats, epoc):
864 :param stats: Stats from pr all command as list
865 :param epoc: Current time in epoc
866 :return: XenaRXStats object
870 self.data = self.parse_stats()
874 def _pack_stats(param, start, fields=None):
875 """ Pack up the list of stats in a dictionary
876 :param param: The list of params to process
877 :param start: What element to start at
878 :param fields: The field names to pack as keys
879 :return: Dictionary of data where fields match up to the params
882 fields = ['bps', 'pps', 'bytes', 'packets']
885 for column in fields:
886 data[column] = int(param[start + i])
892 def _pack_tplds_stats(param, start):
893 """ Pack up the tplds stats
894 :param param: List of params to pack
895 :param start: What element to start at
896 :return: Dictionary of stats
900 for val in range(start, len(param) - start):
901 data[i] = int(param[val])
905 def _pack_rxextra_stats(self, param, start):
906 """ Pack up the extra stats
907 :param param: List of params to pack
908 :param start: What element to start at
909 :return: Dictionary of stats
911 fields = ['fcserrors', 'pauseframes', 'arprequests', 'arpreplies',
912 'pingrequests', 'pingreplies', 'gapcount', 'gapduration']
913 return self._pack_stats(param, start, fields)
915 def _pack_tplderrors_stats(self, param, start):
916 """ Pack up tlpd errors
917 :param param: List of params to pack
918 :param start: What element to start at
919 :return: Dictionary of stats
921 fields = ['dummy', 'seq', 'mis', 'pld']
922 return self._pack_stats(param, start, fields)
924 def _pack_tpldlatency_stats(self, param, start):
925 """ Pack up the tpld latency stats
926 :param param: List of params to pack
927 :param start: What element to start at
928 :return: Dictionary of stats
930 fields = ['min', 'avg', 'max', '1sec']
931 return self._pack_stats(param, start, fields)
933 def _pack_tpldjitter_stats(self, param, start):
934 """ Pack up the tpld jitter stats
935 :param param: List of params to pack
936 :param start: What element to start at
937 :return: Dictionary of stats
939 fields = ['min', 'avg', 'max', '1sec']
940 return self._pack_stats(param, start, fields)
945 :return: Time as String of epoc of when stats were collected
949 # pylint: disable=too-many-branches
950 def parse_stats(self):
951 """ Parse the stats from pr all command
952 :return: Dictionary of all stats
955 for line in self._stats:
957 if param[1] == 'PR_TOTAL':
958 statdict['pr_total'] = self._pack_stats(param, 2)
959 elif param[1] == 'PR_NOTPLD':
960 statdict['pr_notpld'] = self._pack_stats(param, 2,)
961 elif param[1] == 'PR_EXTRA':
962 statdict['pr_extra'] = self._pack_rxextra_stats(param, 2)
963 elif param[1] == 'PT_STREAM':
964 entry_id = "pt_stream_%s" % param[2].strip('[]')
965 statdict[entry_id] = self._pack_stats(param, 3)
966 elif param[1] == 'PR_TPLDS':
967 tid_list = self._pack_tplds_stats(param, 2)
969 statdict['pr_tplds'] = tid_list
970 elif param[1] == 'PR_TPLDTRAFFIC':
971 if 'pr_tpldstraffic' in statdict:
972 data = statdict['pr_tpldstraffic']
975 entry_id = param[2].strip('[]')
976 data[entry_id] = self._pack_stats(param, 3)
977 statdict['pr_tpldstraffic'] = data
978 elif param[1] == 'PR_TPLDERRORS':
979 if 'pr_tplderrors' in statdict:
980 data = statdict['pr_tplderrors']
983 entry_id = param[2].strip('[]')
984 data[entry_id] = self._pack_tplderrors_stats(param, 3)
985 statdict['pr_tplderrors'] = data
986 elif param[1] == 'PR_TPLDLATENCY':
987 if 'pr_tpldlatency' in statdict:
988 data = statdict['pr_tpldlatency']
991 entry_id = param[2].strip('[]')
992 data[entry_id] = self._pack_tpldlatency_stats(param, 3)
993 statdict['pr_tpldlatency'] = data
994 elif param[1] == 'PR_TPLDJITTER':
995 if 'pr_tpldjitter' in statdict:
996 data = statdict['pr_tpldjitter']
999 entry_id = param[2].strip('[]')
1000 data[entry_id] = self._pack_tpldjitter_stats(param, 3)
1001 statdict['pr_pldjitter'] = data
1002 elif param[1] == 'PR_FILTER':
1003 if 'pr_filter' in statdict:
1004 data = statdict['pr_filter']
1007 entry_id = param[2].strip('[]')
1008 data[entry_id] = self._pack_stats(param, 3)
1009 statdict['pr_filter'] = data
1010 elif param[1] == 'P_RECEIVESYNC':
1011 if param[2] == 'IN_SYNC':
1012 statdict['p_receivesync'] = {'IN SYNC': 'True'}
1014 statdict['p_receivesync'] = {'IN SYNC': 'False'}
1016 logging.warning("XenaPort: unknown stats: %s", param[1])
1022 class XenaTXStats(object):
1024 Xena transmit stat class
1026 def __init__(self, stats, epoc):
1028 :param stats: Stats from pt all command as list
1029 :param epoc: Current time in epoc
1030 :return: XenaTXStats object
1034 self._ptstreamkeys = list()
1035 self.data = self.parse_stats()
1039 def _pack_stats(params, start, fields=None):
1040 """ Pack up the list of stats in a dictionary
1041 :param params: The list of params to process
1042 :param start: What element to start at
1043 :param fields: The field names to pack as keys
1044 :return: Dictionary of data where fields match up to the params
1047 fields = ['bps', 'pps', 'bytes', 'packets']
1050 for column in fields:
1051 data[column] = int(params[start + i])
1056 def _pack_txextra_stats(self, params, start):
1057 """ Pack up the tx extra stats
1058 :param params: List of params to pack
1059 :param start: What element to start at
1060 :return: Dictionary of stats
1062 fields = ['arprequests', 'arpreplies', 'pingrequests', 'pingreplies',
1063 'injectedfcs', 'injectedseq', 'injectedmis', 'injectedint',
1064 'injectedtid', 'training']
1065 return self._pack_stats(params, start, fields)
1068 def pt_stream_keys(self):
1070 :return: Return a list of pt_stream_x stream key ids
1072 return self._ptstreamkeys
1077 :return: Time as String of epoc of when stats were collected
1081 def parse_stats(self):
1082 """ Parse the stats from pr all command
1083 :return: Dictionary of all stats
1086 for line in self._stats:
1087 param = line.split()
1088 if param[1] == 'PT_TOTAL':
1089 statdict['pt_total'] = self._pack_stats(param, 2)
1090 elif param[1] == 'PT_NOTPLD':
1091 statdict['pt_notpld'] = self._pack_stats(param, 2,)
1092 elif param[1] == 'PT_EXTRA':
1093 statdict['pt_extra'] = self._pack_txextra_stats(param, 2)
1094 elif param[1] == 'PT_STREAM':
1095 entry_id = "pt_stream_%s" % param[2].strip('[]')
1096 self._ptstreamkeys.append(entry_id)
1097 statdict[entry_id] = self._pack_stats(param, 3)
1099 logging.warning("XenaPort: unknown stats: %s", param[1])
1103 def aggregate_stats(stat1, stat2):
1105 Judge whether stat1 and stat2 both have same key, if both have same key,
1106 call the aggregate fuction, else use the stat1's value
1109 for keys in stat1.keys():
1110 if keys in stat2 and isinstance(stat1[keys], dict):
1111 newstat[keys] = aggregate(stat1[keys], stat2[keys])
1113 newstat[keys] = stat1[keys]
1116 def aggregate(stat1, stat2):
1118 Recursive function to aggregate two sets of statistics. This is used when
1119 bi directional traffic is done and statistics need to be calculated based
1120 on two sets of statistics.
1121 :param stat1: One set of dictionary stats from RX or TX stats
1122 :param stat2: Second set of dictionary stats from RX or TX stats
1123 :return: stats for data entry in RX or TX Stats instance
1126 for (keys1, keys2) in zip(stat1.keys(), stat2.keys()):
1127 if isinstance(stat1[keys1], dict):
1128 newstat[keys1] = aggregate(stat1[keys1], stat2[keys2])
1130 if not isinstance(stat1[keys1], int) and not isinstance(
1132 # its some value we don't need to aggregate
1134 # for latency stats do the appropriate calculation
1136 newstat[keys1] = max(stat1[keys1], stat2[keys2])
1137 elif keys1 == 'min':
1138 newstat[keys1] = min(stat1[keys1], stat2[keys2])
1139 elif keys1 == 'avg':
1140 newstat[keys1] = (stat1[keys1] + stat2[keys2]) / 2
1142 newstat[keys1] = (stat1[keys1] + stat2[keys2])
1146 def line_percentage(port, stats, time_active, packet_size):
1148 Calculate the line percentage rate from the duration, port object and stat
1150 :param port: XenaPort object
1151 :param stats: Xena RXStat or TXStat object
1152 :param time_active: time the stream was active in secs as int
1153 :param packet_size: packet size as int
1154 :return: line percentage as float
1156 # this is ugly, but its prettier than calling the get method 3 times...
1158 packets = stats.data['pr_total']['packets']
1161 packets = stats.data['pt_total']['packets']
1164 'Could not calculate line rate because packet stat not found.')
1166 ifg = port.get_inter_frame_gap()
1167 pps = packets_per_second(packets, time_active)
1168 l2br = l2_bit_rate(packet_size, stats.preamble, pps)
1169 l1br = l1_bit_rate(l2br, pps, ifg, stats.preamble)
1170 return 100.0 * l1br / port.get_effective_speed()
1173 def l2_bit_rate(packet_size, preamble, pps):
1175 Return the l2 bit rate
1176 :param packet_size: packet size on the line in bytes
1177 :param preamble: preamble size of the packet header in bytes
1178 :param pps: packets per second
1179 :return: l2 bit rate as float
1181 return (packet_size * preamble) * pps
1184 def l1_bit_rate(l2br, pps, ifg, preamble):
1186 Return the l1 bit rate
1187 :param l2br: l2 bit rate int bits per second
1188 :param pps: packets per second
1189 :param ifg: the inter frame gap
1190 :param preamble: preamble size of the packet header in bytes
1191 :return: l1 bit rate as float
1193 return l2br + (pps * ifg * preamble)
1196 def make_manager_command(cmd, argument=None):
1197 """ String builder for Xena socket commands
1199 :param cmd: Command to send
1200 :param argument: Arguments for command to send
1201 :return: String of command
1203 command = '{} "{}"'.format(cmd, argument) if argument else cmd
1204 _LOGGER.info("[Command Sent] : %s", command)
1208 def make_port_command(cmd, xena_port):
1209 """ String builder for Xena port commands
1211 :param cmd: Command to send
1212 :param xena_port: XenaPort object
1213 :return: String of command
1215 command = "{} {}".format(xena_port.port_string(), cmd)
1216 _LOGGER.info("[Command Sent] : %s", command)
1220 def make_stream_command(cmd, args, xena_stream):
1221 """ String builder for Xena port commands
1223 :param cmd: Command to send
1224 :param xena_stream: XenaStream object
1225 :return: String of command
1227 command = "{} {} [{}] {}".format(xena_stream.xena_port.port_string(), cmd,
1228 xena_stream.stream_id, args)
1229 _LOGGER.info("[Command Sent] : %s", command)
1233 def packets_per_second(packets, time_in_sec):
1235 Return the pps as float
1236 :param packets: total packets
1237 :param time_in_sec: time in seconds
1238 :return: float of pps
1240 return packets / time_in_sec