1 # Copyright 2016-2017 Red Hat Inc & Xena Networks.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 # This is a port of code by Xena and Flavios ported to python 3 compatibility.
16 # Credit given to Xena and Flavio for providing most of the logic of this code.
17 # The code has changes for PEP 8 and python 3 conversion. Added Stat classes
18 # for better scaling of future requirements. Also added calculation functions
19 # for line rate to align within VSPerf project.
20 # Flavios xena libraries available at https://github.com/fleitner/XenaPythonLib
23 # Flavio Leitner, Red Hat Inc.
24 # Dan Amzulescu, Xena Networks
25 # Christian Trautman, Red Hat Inc.
28 Xena Socket API Driver module for communicating directly with Xena system
29 through socket commands and returning different statistics.
38 # pylint: disable=too-many-lines
39 # Xena Socket Commands
40 CMD_CLEAR_RX_STATS = 'pr_clear'
41 CMD_CLEAR_TX_STATS = 'pt_clear'
43 CMD_CREATE_STREAM = 'ps_create'
44 CMD_DELETE_STREAM = 'ps_delete'
45 CMD_GET_PORT_SPEED = 'p_speed ?'
46 CMD_GET_PORT_SPEED_REDUCTION = 'p_speedreduction ?'
47 CMD_GET_RX_STATS_PER_TID = 'pr_tpldtraffic'
48 CMD_GET_STREAM_DATA = 'pt_stream'
49 CMD_GET_STREAMS_PER_PORT = 'ps_indices'
50 CMD_GET_TID_PER_STREAM = 'ps_tpldid'
51 CMD_GET_TX_STATS_PER_STREAM = 'pt_stream'
52 CMD_GET_RX_STATS = 'pr_all ?'
53 CMD_GET_TX_STATS = 'pt_all ?'
54 CMD_INTERFRAME_GAP = 'p_interframegap'
56 CMD_LOGOFF = 'c_logoff'
59 CMD_PORT_IP = 'p_ipaddress'
60 CMD_PORT_LEARNING = 'p_autotrain'
61 CMD_RESERVE = 'p_reservation reserve'
62 CMD_RELEASE = 'p_reservation release'
63 CMD_RELINQUISH = 'p_reservation relinquish'
65 CMD_SET_PORT_ARP_REPLY = 'p_arpreply'
66 CMD_SET_PORT_ARP_V6_REPLY = 'p_arpv6reply'
67 CMD_SET_PORT_PING_REPLY = 'p_pingreply'
68 CMD_SET_PORT_PING_V6_REPLY = 'p_pingv6reply'
69 CMD_SET_PORT_TIME_LIMIT = 'p_txtimelimit'
70 CMD_SET_STREAM_HEADER_PROTOCOL = 'ps_headerprotocol'
71 CMD_SET_STREAM_ON_OFF = 'ps_enable'
72 CMD_SET_STREAM_PACKET_HEADER = 'ps_packetheader'
73 CMD_SET_STREAM_PACKET_LENGTH = 'ps_packetlength'
74 CMD_SET_STREAM_PACKET_LIMIT = 'ps_packetlimit'
75 CMD_SET_STREAM_PACKET_PAYLOAD = 'ps_payload'
76 CMD_SET_STREAM_RATE_FRACTION = 'ps_ratefraction'
77 CMD_SET_STREAM_TEST_PAYLOAD_ID = 'ps_tpldid'
78 CMD_SET_TPLD_MODE = 'p_tpldmode'
79 CMD_START_TRAFFIC = 'p_traffic on'
80 CMD_STOP_TRAFFIC = 'p_traffic off'
81 CMD_STREAM_MODIFIER = 'ps_modifier'
82 CMD_STREAM_MODIFIER_COUNT = 'ps_modifiercount'
83 CMD_STREAM_MODIFIER_RANGE = 'ps_modifierrange'
84 CMD_VERSION = 'c_versionno ?'
86 _LOCALE = locale.getlocale()[1]
87 _LOGGER = logging.getLogger(__name__)
90 class SimpleSocket(object):
94 def __init__(self, hostname, port=5025, timeout=1):
96 :param hostname: hostname or ip as string
97 :param port: port number to use for socket as int
98 :param timeout: socket timeout as int
99 :return: SimpleSocket object
101 self.hostname = hostname
103 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104 self.sock.settimeout(timeout)
105 self.sock.connect((hostname, port))
106 except socket.error as msg:
108 "Cannot connect to Xena Socket at %s", hostname)
109 _LOGGER.error("Exception : %s", msg)
119 """ Send the command over the socket
120 :param cmd: cmd as string
121 :return: byte utf encoded return value from socket
125 self.sock.send(cmd.encode('utf-8'))
126 return self.sock.recv(1024)
130 def read_reply(self):
131 """ Get the response from the socket
132 :return: Return the reply
134 reply = self.sock.recv(1024)
135 if reply.find("---^".encode('utf-8')) != -1:
136 # read again the syntax error msg
137 reply = self.sock.recv(1024)
140 def send_command(self, cmd):
141 """ Send the command specified over the socket
142 :param cmd: Command to send as string
146 self.sock.send(cmd.encode('utf-8'))
148 def set_keep_alive(self):
149 """ Set the keep alive for the socket
152 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
155 class KeepAliveThread(threading.Thread):
157 Keep alive socket class
161 def __init__(self, connection, interval=10):
163 :param connection: Socket for keep alive
164 :param interval: interval in seconds to send keep alive
165 :return: KeepAliveThread object
167 threading.Thread.__init__(self)
168 self.connection = connection
169 self.interval = interval
170 self.finished = threading.Event()
173 'Xena Socket keep alive thread initiated, interval ' +
174 '{} seconds'.format(self.interval))
177 """ Thread stop. See python thread docs for more info
184 """ Thread start. See python thread docs for more info
187 while not self.finished.isSet():
188 self.finished.wait(self.interval)
189 self.connection.ask(self.message)
192 class XenaSocketDriver(SimpleSocket):
198 def __init__(self, hostname, port=22611):
200 :param hostname: Hostname or ip as string
201 :param port: port to use as int
202 :return: XenaSocketDriver object
204 SimpleSocket.__init__(self, hostname=hostname, port=port)
205 SimpleSocket.set_keep_alive(self)
206 self.access_semaphor = threading.Semaphore(1)
209 """ Send the command over the socket in a thread safe manner
210 :param cmd: Command to send
211 :return: reply from socket
213 self.access_semaphor.acquire()
214 reply = SimpleSocket.ask(self, cmd)
215 self.access_semaphor.release()
218 def ask_verify(self, cmd):
219 """ Send the command over the socket in a thread safe manner and
220 verify the response is good.
221 :param cmd: Command to send
222 :return: Boolean True if command response is good, False otherwise
224 resp = self.ask(cmd).decode(_LOCALE).strip('\n')
225 _LOGGER.info('[ask_verify] %s', resp)
226 if resp == self.reply_ok:
230 def disconnect(self):
232 Close the socket connection
237 def send_command(self, cmd):
238 """ Send the command over the socket with no return
239 :param cmd: Command to send
242 self.access_semaphor.acquire()
243 SimpleSocket.send_command(self, cmd)
244 self.access_semaphor.release()
246 def send_query_replies(self, cmd):
247 """ Send the command over the socket and wait for all replies and return
249 :param cmd: Command to send
250 :return: Response from command as list
252 # send the command followed by cmd SYNC to find out
253 # when the last reply arrives.
254 self.send_command(cmd.strip('\n'))
255 self.send_command('SYNC')
257 self.access_semaphor.acquire()
258 msg = SimpleSocket.read_reply(self).decode(_LOCALE)
262 (reply, msgleft) = msg.split('\n', 1)
263 # check for syntax problems
264 if reply.rfind('Syntax') != -1:
265 self.access_semaphor.release()
268 if reply.rfind('<SYNC>') == 0:
270 self.access_semaphor.release()
273 replies.append(reply + '\n')
277 msgnew = SimpleSocket.read_reply(self).decode(_LOCALE)
278 msg = msgleft + msgnew
281 class XenaManager(object):
283 Manager class for port and socket functions
285 def __init__(self, socketDriver, user='', password='xena'):
288 Establish a connection to Xena using a ``driver`` with the ``password``
292 :param socketDriver: XenaSocketDriver connection object
293 :param password: Password to the Xena traffic generator
294 :returns: XenaManager object
296 self.driver = socketDriver
298 self.keep_alive_thread = KeepAliveThread(self.driver)
300 if self.logon(password):
301 _LOGGER.info('Connected to Xena at %s', self.driver.hostname)
303 _LOGGER.error('Failed to logon to Xena at %s', self.driver.hostname)
308 def disconnect(self):
309 """ Release ports and disconnect from chassis.
311 for module_port in self.ports:
312 module_port.release_port()
315 self.keep_alive_thread.stop()
317 def add_module_port(self, module, port):
318 """Factory for Xena Ports
320 :param module: String or int of module
321 :param port: String or int of port
322 :return: XenaPort object if success, None if port already added
324 xenaport = XenaPort(self, module, port)
325 if xenaport in self.ports:
328 self.ports.append(xenaport)
331 def get_module_port(self, module, port):
332 """Return the Xena Port object if available
333 :param module: module number as int or str
334 :param port: port number as int or str
335 :return: XenaPort object or None if not found
337 for por in self.ports:
338 if por.port == str(port) and por.module == str(module):
342 def get_version(self):
344 Get the version from the chassis
345 :return: versions of server and driver as string
347 res = self.driver.ask(make_manager_command(
348 CMD_VERSION, '')).decode(_LOCALE)
349 res = res.rstrip('\n').split()
350 return "Server: {} Driver: {}".format(res[1], res[2])
354 Logoff from the Xena chassis
355 :return: Boolean True if response OK, False if error.
357 return self.driver.ask_verify(make_manager_command(CMD_LOGOFF))
359 def logon(self, password):
360 """Login to the Xena traffic generator using the ``password`` supplied.
362 :param password: string of password
363 :return: Boolean True if response OK, False if error.
365 self.keep_alive_thread.start()
366 return self.driver.ask_verify(make_manager_command(CMD_LOGIN, password))
368 def set_owner(self, username):
369 """Set the ports owner.
370 :return: Boolean True if response OK, False if error.
372 return self.driver.ask_verify(make_manager_command(CMD_OWNER, username))
374 # pylint: disable=too-many-public-methods
375 class XenaPort(object):
377 Xena Port emulator class
379 def __init__(self, manager, module, port):
382 :param manager: XenaManager object
383 :param module: Module as string or int of module to use
384 :param port: Port as string or int of port to use
385 :return: XenaPort object
387 self._manager = manager
388 self._module = str(module)
389 self._port = str(port)
390 self._streams = list()
394 """Property for manager attribute
395 :return: manager object
401 """Property for module attribute
402 :return: module value as string
408 """Property for port attribute
409 :return: port value as string
413 def port_string(self):
414 """String builder with attributes
415 :return: String of module port for command sequence
417 stringify = "{}/{}".format(self._module, self._port)
420 def add_stream(self):
421 """Add a stream to the port.
422 :return: XenaStream object, None if failure
424 identifier = len(self._streams)
425 stream = XenaStream(self, identifier)
426 if self._manager.driver.ask_verify(make_stream_command(
427 CMD_CREATE_STREAM, '', stream)):
428 self._streams.append(stream)
431 _LOGGER.error("Error during stream creation")
434 def clear_stats(self, rx_clear=True, tx_clear=True):
435 """Clear the port stats
437 :param rx_clear: Boolean if rx stats are to be cleared
438 :param tx_clear: Boolean if tx stats are to be cleared
439 :return: Boolean True if response OK, False if error.
441 command = make_port_command(CMD_CLEAR_RX_STATS, self)
442 res1 = self._manager.driver.ask_verify(command) if rx_clear else True
443 command = make_port_command(CMD_CLEAR_TX_STATS, self)
444 res2 = self._manager.driver.ask_verify(command) if tx_clear else True
445 return all([res1, res2])
447 def get_effective_speed(self):
449 Get the effective speed on the port
450 :return: effective speed as float
452 port_speed = self.get_port_speed()
453 reduction = self.get_port_speed_reduction()
454 effective_speed = port_speed * (1.0 - reduction / 1000000.0)
455 return effective_speed
457 def get_inter_frame_gap(self):
459 Get the interframe gap and return it as string
460 :return: integer of interframe gap
462 command = make_port_command(CMD_INTERFRAME_GAP + '?', self)
463 res = self._manager.driver.ask(command).decode(_LOCALE)
464 res = int(res.rstrip('\n').split(' ')[-1])
467 def get_port_speed(self):
469 Get the port speed as bits from port and return it as a int.
470 :return: Int of port speed
472 command = make_port_command(CMD_GET_PORT_SPEED, self)
473 res = self._manager.driver.ask(command).decode(_LOCALE)
474 port_speed = res.split(' ')[-1].rstrip('\n')
475 return int(port_speed) * 1000000
477 def get_port_speed_reduction(self):
479 Get the port speed reduction value as int
480 :return: Integer of port speed reduction value
482 command = make_port_command(CMD_GET_PORT_SPEED_REDUCTION, self)
483 res = self._manager.driver.ask(command).decode(_LOCALE)
484 res = int(res.rstrip('\n').split(' ')[-1])
487 def get_rx_stats(self):
488 """Get the rx stats and return the data as a dict.
489 :return: Receive stats as dictionary
491 command = make_port_command(CMD_GET_RX_STATS, self)
492 rx_data = self._manager.driver.send_query_replies(command)
493 data = XenaRXStats(rx_data, time.time())
496 def get_tx_stats(self):
497 """Get the tx stats and return the data as a dict.
498 :return: Receive stats as dictionary
500 command = make_port_command(CMD_GET_TX_STATS, self)
501 tx_data = self._manager.driver.send_query_replies(command)
502 data = XenaTXStats(tx_data, time.time())
505 def micro_tpld_disable(self):
506 """Disable micro TPLD and return to standard payload size
507 :return: Boolean if response OK, False if error
509 command = make_port_command(CMD_SET_TPLD_MODE + ' normal', self)
510 return self._manager.driver.ask_verify(command)
512 def micro_tpld_enable(self):
513 """Enable micro TPLD 6 byte payloads.
514 :Return Boolean if response OK, False if error
516 command = make_port_command(CMD_SET_TPLD_MODE + ' micro', self)
517 return self._manager.driver.ask_verify(command)
519 def release_port(self):
521 :return: Boolean True if response OK, False if error.
523 command = make_port_command(CMD_RELEASE, self)
524 return self._manager.driver.ask_verify(command)
526 def reserve_port(self):
528 :return: Boolean True if response OK, False if error.
530 command = make_port_command(CMD_RESERVE, self)
531 return self._manager.driver.ask_verify(command)
533 def reset_port(self):
535 :return: Boolean True if response OK, False if error.
537 command = make_port_command(CMD_RESET, self)
538 return self._manager.driver.ask_verify(command)
540 def set_port_arp_reply(self, is_on=True, ipv6=False):
542 Set the port arpreply value
543 :param on: Enable or disable the arp reply on the port
544 :param v6: set the value on the ip v6, disabled will set at ip v4
545 :return: Boolean True if response OK, False if error
547 command = make_port_command('{} {}'.format(
548 CMD_SET_PORT_ARP_V6_REPLY if ipv6 else CMD_SET_PORT_ARP_REPLY,
549 "on" if is_on else "off"), self)
550 return self._manager.driver.ask_verify(command)
552 def set_port_ping_reply(self, is_on=True, ipv6=False):
554 Set the port ping reply value
555 :param on: Enable or disable the ping reply on the port
556 :param v6: set the value on the ip v6, disabled will set at ip v4
557 :return: Boolean True if response OK, False if error
559 command = make_port_command('{} {}'.format(
560 CMD_SET_PORT_PING_V6_REPLY if ipv6 else CMD_SET_PORT_PING_REPLY,
561 "on" if is_on else "off"), self)
562 return self._manager.driver.ask_verify(command)
564 def set_port_learning(self, interval):
565 """Start port learning with the interval in seconds specified. 0 disables port learning
566 :param: interval as int
567 :return: Boolean True if response OK, False if error.
569 command = make_port_command('{} {}'.format(CMD_PORT_LEARNING, interval), self)
570 return self._manager.driver.ask_verify(command)
572 def set_port_ip(self, ip_addr, cidr, gateway, wild='255'):
574 Set the port ip address of the specific port
575 :param ip_addr: IP address to set to port
576 :param cidr: cidr number for the subnet
577 :param gateway: Gateway ip for port
578 :param wild: wildcard used for ARP and PING replies
579 :return: Boolean True if response OK, False if error
581 # convert the cidr to a dot notation subnet address
582 subnet = socket.inet_ntoa(
583 struct.pack(">I", (0xffffffff << (32 - cidr)) & 0xffffffff))
585 command = make_port_command('{} {} {} {} 0.0.0.{}'.format(
586 CMD_PORT_IP, ip_addr, subnet, gateway, wild), self)
587 return self._manager.driver.ask_verify(command)
589 def set_port_time_limit(self, micro_seconds):
590 """Set the port time limit in ms
591 :param micro_seconds: ms for port time limit
592 :return: Boolean True if response OK, False if error.
594 command = make_port_command('{} {}'.format(
595 CMD_SET_PORT_TIME_LIMIT, micro_seconds), self)
596 return self._manager.driver.ask_verify(command)
598 def traffic_off(self):
600 :return: Boolean True if response OK, False if error.
602 command = make_port_command(CMD_STOP_TRAFFIC, self)
603 return self._manager.driver.ask_verify(command)
605 def traffic_on(self):
607 :return: Boolean True if response OK, False if error.
609 command = make_port_command(CMD_START_TRAFFIC, self)
610 return self._manager.driver.ask_verify(command)
613 class XenaStream(object):
615 Xena stream emulator class
617 def __init__(self, xenaPort, streamID):
620 :param xenaPort: XenaPort object
621 :param streamID: Stream ID as int or string
622 :return: XenaStream object
624 self._xena_port = xenaPort
625 self._stream_id = str(streamID)
626 self._manager = self._xena_port.manager
627 self._header_protocol = None
631 """Property for port attribute
632 :return: XenaPort object
634 return self._xena_port
638 """Property for streamID attribute
639 :return: streamID value as string
641 return self._stream_id
643 def enable_multistream(self, flows, layer):
645 Basic implementation of multi stream. Enable multi stream by setting
646 modifiers on the stream
647 :param flows: Numbers of flows or end range
648 :param layer: layer to enable multi stream as str. Acceptable values
650 :return: True if success False otherwise
652 if not self._header_protocol:
654 "Please set a protocol header before calling this method.")
656 # byte offsets for setting the modifier
659 'L3': [32, 36] if 'VLAN' in self._header_protocol else [28, 32],
660 'L4': [38, 40] if 'VLAN' in self._header_protocol else [34, 36]
664 if layer in offsets.keys() and flows > 0:
665 command = make_port_command(
666 CMD_STREAM_MODIFIER_COUNT + ' [{}]'.format(self._stream_id) +
667 ' 2', self._xena_port)
668 responses.append(self._manager.driver.ask_verify(command))
669 command = make_port_command(
670 CMD_STREAM_MODIFIER + ' [{},0] {} 0xFFFF0000 INC 1'.format(
671 self._stream_id, offsets[layer][0]), self._xena_port)
672 responses.append(self._manager.driver.ask_verify(command))
673 command = make_port_command(
674 CMD_STREAM_MODIFIER_RANGE + ' [{},0] 0 1 {}'.format(
675 self._stream_id, flows), self._xena_port)
676 responses.append(self._manager.driver.ask_verify(command))
677 command = make_port_command(
678 CMD_STREAM_MODIFIER + ' [{},1] {} 0xFFFF0000 INC 1'.format(
679 self._stream_id, offsets[layer][1]), self._xena_port)
680 responses.append(self._manager.driver.ask_verify(command))
681 command = make_port_command(
682 CMD_STREAM_MODIFIER_RANGE + ' [{},1] 0 1 {}'.format(
683 self._stream_id, flows), self._xena_port)
684 responses.append(self._manager.driver.ask_verify(command))
685 return all(responses) # return True if they all worked
688 'No flows specified in enable multistream. Bypassing...')
691 raise NotImplementedError(
692 "Non-implemented stream layer in method enable multistream ",
695 def get_stream_data(self):
697 Get the response for stream data
698 :return: String of response for stream data info
700 command = make_stream_command(CMD_GET_STREAM_DATA, '?', self)
701 res = self._manager.driver.ask(command).decode(_LOCALE)
704 def set_header_protocol(self, protocol_header):
705 """Set the header info for the packet header hex.
706 If the packet header contains just Ethernet and IP info then call this
707 method with ETHERNET IP as the protocol header.
709 :param protocol_header: protocol header argument
710 :return: Boolean True if success, False if error
712 command = make_stream_command(
713 CMD_SET_STREAM_HEADER_PROTOCOL,
714 protocol_header, self)
715 if self._manager.driver.ask_verify(command):
716 self._header_protocol = protocol_header
722 """Set the stream to off
723 :return: Boolean True if success, False if error
725 return self._manager.driver.ask_verify(make_stream_command(
726 CMD_SET_STREAM_ON_OFF, 'off', self))
729 """Set the stream to on
730 :return: Boolean True if success, False if error
732 return self._manager.driver.ask_verify(make_stream_command(
733 CMD_SET_STREAM_ON_OFF, 'on', self))
735 def set_packet_header(self, header):
736 """Set the stream packet header
738 :param header: packet header as hex bytes
739 :return: Boolean True if success, False if error
741 return self._manager.driver.ask_verify(make_stream_command(
742 CMD_SET_STREAM_PACKET_HEADER, header, self))
744 def set_packet_length(self, pattern_type, minimum, maximum):
745 """Set the pattern length with min and max values based on the pattern
748 :param pattern_type: String of pattern type, valid entries [ fixed,
749 butterfly, random, mix, incrementing ]
750 :param minimum: integer of minimum byte value
751 :param maximum: integer of maximum byte value
752 :return: Boolean True if success, False if error
754 return self._manager.driver.ask_verify(make_stream_command(
755 CMD_SET_STREAM_PACKET_LENGTH, '{} {} {}'.format(
756 pattern_type, minimum, maximum), self))
758 def set_packet_limit(self, limit):
759 """Set the packet limit
761 :param limit: number of packets that will be sent, use -1 to disable
762 :return: Boolean True if success, False if error
764 return self._manager.driver.ask_verify(make_stream_command(
765 CMD_SET_STREAM_PACKET_LIMIT, limit, self))
767 def set_packet_payload(self, payload_type, hex_value):
768 """Set the payload to the hex value based on the payload type
770 :param payload_type: string of the payload type, valid entries [ pattern,
772 :param hex_value: hex string of valid hex
773 :return: Boolean True if success, False if error
775 return self._manager.driver.ask_verify(make_stream_command(
776 CMD_SET_STREAM_PACKET_PAYLOAD, '{} {}'.format(
777 payload_type, hex_value), self))
779 def set_rate_fraction(self, fraction):
780 """Set the rate fraction
782 :param fraction: fraction for the stream
783 :return: Boolean True if success, False if error
785 return self._manager.driver.ask_verify(make_stream_command(
786 CMD_SET_STREAM_RATE_FRACTION, fraction, self))
788 def set_payload_id(self, identifier):
789 """ Set the test payload ID
790 :param identifier: ID as int or string
791 :return: Boolean True if success, False if error
793 return self._manager.driver.ask_verify(make_stream_command(
794 CMD_SET_STREAM_TEST_PAYLOAD_ID, identifier, self))
797 class XenaRXStats(object):
801 def __init__(self, stats, epoc):
803 :param stats: Stats from pr all command as list
804 :param epoc: Current time in epoc
805 :return: XenaRXStats object
809 self.data = self.parse_stats()
813 def _pack_stats(param, start, fields=None):
814 """ Pack up the list of stats in a dictionary
815 :param param: The list of params to process
816 :param start: What element to start at
817 :param fields: The field names to pack as keys
818 :return: Dictionary of data where fields match up to the params
821 fields = ['bps', 'pps', 'bytes', 'packets']
824 for column in fields:
825 data[column] = int(param[start + i])
831 def _pack_tplds_stats(param, start):
832 """ Pack up the tplds stats
833 :param param: List of params to pack
834 :param start: What element to start at
835 :return: Dictionary of stats
839 for val in range(start, len(param) - start):
840 data[i] = int(param[val])
844 def _pack_rxextra_stats(self, param, start):
845 """ Pack up the extra stats
846 :param param: List of params to pack
847 :param start: What element to start at
848 :return: Dictionary of stats
850 fields = ['fcserrors', 'pauseframes', 'arprequests', 'arpreplies',
851 'pingrequests', 'pingreplies', 'gapcount', 'gapduration']
852 return self._pack_stats(param, start, fields)
854 def _pack_tplderrors_stats(self, param, start):
855 """ Pack up tlpd errors
856 :param param: List of params to pack
857 :param start: What element to start at
858 :return: Dictionary of stats
860 fields = ['dummy', 'seq', 'mis', 'pld']
861 return self._pack_stats(param, start, fields)
863 def _pack_tpldlatency_stats(self, param, start):
864 """ Pack up the tpld latency stats
865 :param param: List of params to pack
866 :param start: What element to start at
867 :return: Dictionary of stats
869 fields = ['min', 'avg', 'max', '1sec']
870 return self._pack_stats(param, start, fields)
872 def _pack_tpldjitter_stats(self, param, start):
873 """ Pack up the tpld jitter stats
874 :param param: List of params to pack
875 :param start: What element to start at
876 :return: Dictionary of stats
878 fields = ['min', 'avg', 'max', '1sec']
879 return self._pack_stats(param, start, fields)
884 :return: Time as String of epoc of when stats were collected
888 # pylint: disable=too-many-branches
889 def parse_stats(self):
890 """ Parse the stats from pr all command
891 :return: Dictionary of all stats
894 for line in self._stats:
896 if param[1] == 'PR_TOTAL':
897 statdict['pr_total'] = self._pack_stats(param, 2)
898 elif param[1] == 'PR_NOTPLD':
899 statdict['pr_notpld'] = self._pack_stats(param, 2,)
900 elif param[1] == 'PR_EXTRA':
901 statdict['pr_extra'] = self._pack_rxextra_stats(param, 2)
902 elif param[1] == 'PT_STREAM':
903 entry_id = "pt_stream_%s" % param[2].strip('[]')
904 statdict[entry_id] = self._pack_stats(param, 3)
905 elif param[1] == 'PR_TPLDS':
906 tid_list = self._pack_tplds_stats(param, 2)
908 statdict['pr_tplds'] = tid_list
909 elif param[1] == 'PR_TPLDTRAFFIC':
910 if 'pr_tpldstraffic' in statdict:
911 data = statdict['pr_tpldstraffic']
914 entry_id = param[2].strip('[]')
915 data[entry_id] = self._pack_stats(param, 3)
916 statdict['pr_tpldstraffic'] = data
917 elif param[1] == 'PR_TPLDERRORS':
918 if 'pr_tplderrors' in statdict:
919 data = statdict['pr_tplderrors']
922 entry_id = param[2].strip('[]')
923 data[entry_id] = self._pack_tplderrors_stats(param, 3)
924 statdict['pr_tplderrors'] = data
925 elif param[1] == 'PR_TPLDLATENCY':
926 if 'pr_tpldlatency' in statdict:
927 data = statdict['pr_tpldlatency']
930 entry_id = param[2].strip('[]')
931 data[entry_id] = self._pack_tpldlatency_stats(param, 3)
932 statdict['pr_tpldlatency'] = data
933 elif param[1] == 'PR_TPLDJITTER':
934 if 'pr_tpldjitter' in statdict:
935 data = statdict['pr_tpldjitter']
938 entry_id = param[2].strip('[]')
939 data[entry_id] = self._pack_tpldjitter_stats(param, 3)
940 statdict['pr_pldjitter'] = data
941 elif param[1] == 'PR_FILTER':
942 if 'pr_filter' in statdict:
943 data = statdict['pr_filter']
946 entry_id = param[2].strip('[]')
947 data[entry_id] = self._pack_stats(param, 3)
948 statdict['pr_filter'] = data
949 elif param[1] == 'P_RECEIVESYNC':
950 if param[2] == 'IN_SYNC':
951 statdict['p_receivesync'] = {'IN SYNC': 'True'}
953 statdict['p_receivesync'] = {'IN SYNC': 'False'}
955 logging.warning("XenaPort: unknown stats: %s", param[1])
961 class XenaTXStats(object):
963 Xena transmit stat class
965 def __init__(self, stats, epoc):
967 :param stats: Stats from pt all command as list
968 :param epoc: Current time in epoc
969 :return: XenaTXStats object
973 self._ptstreamkeys = list()
974 self.data = self.parse_stats()
978 def _pack_stats(params, start, fields=None):
979 """ Pack up the list of stats in a dictionary
980 :param params: The list of params to process
981 :param start: What element to start at
982 :param fields: The field names to pack as keys
983 :return: Dictionary of data where fields match up to the params
986 fields = ['bps', 'pps', 'bytes', 'packets']
989 for column in fields:
990 data[column] = int(params[start + i])
995 def _pack_txextra_stats(self, params, start):
996 """ Pack up the tx extra stats
997 :param params: List of params to pack
998 :param start: What element to start at
999 :return: Dictionary of stats
1001 fields = ['arprequests', 'arpreplies', 'pingrequests', 'pingreplies',
1002 'injectedfcs', 'injectedseq', 'injectedmis', 'injectedint',
1003 'injectedtid', 'training']
1004 return self._pack_stats(params, start, fields)
1007 def pt_stream_keys(self):
1009 :return: Return a list of pt_stream_x stream key ids
1011 return self._ptstreamkeys
1016 :return: Time as String of epoc of when stats were collected
1020 def parse_stats(self):
1021 """ Parse the stats from pr all command
1022 :return: Dictionary of all stats
1025 for line in self._stats:
1026 param = line.split()
1027 if param[1] == 'PT_TOTAL':
1028 statdict['pt_total'] = self._pack_stats(param, 2)
1029 elif param[1] == 'PT_NOTPLD':
1030 statdict['pt_notpld'] = self._pack_stats(param, 2,)
1031 elif param[1] == 'PT_EXTRA':
1032 statdict['pt_extra'] = self._pack_txextra_stats(param, 2)
1033 elif param[1] == 'PT_STREAM':
1034 entry_id = "pt_stream_%s" % param[2].strip('[]')
1035 self._ptstreamkeys.append(entry_id)
1036 statdict[entry_id] = self._pack_stats(param, 3)
1038 logging.warning("XenaPort: unknown stats: %s", param[1])
1042 def aggregate_stats(stat1, stat2):
1044 Judge whether stat1 and stat2 both have same key, if both have same key,
1045 call the aggregate fuction, else use the stat1's value
1048 for keys in stat1.keys():
1049 if keys in stat2 and isinstance(stat1[keys], dict):
1050 newstat[keys] = aggregate(stat1[keys], stat2[keys])
1052 newstat[keys] = stat1[keys]
1055 def aggregate(stat1, stat2):
1057 Recursive function to aggregate two sets of statistics. This is used when
1058 bi directional traffic is done and statistics need to be calculated based
1059 on two sets of statistics.
1060 :param stat1: One set of dictionary stats from RX or TX stats
1061 :param stat2: Second set of dictionary stats from RX or TX stats
1062 :return: stats for data entry in RX or TX Stats instance
1065 for (keys1, keys2) in zip(stat1.keys(), stat2.keys()):
1066 if isinstance(stat1[keys1], dict):
1067 newstat[keys1] = aggregate(stat1[keys1], stat2[keys2])
1069 if not isinstance(stat1[keys1], int) and not isinstance(
1071 # its some value we don't need to aggregate
1073 # for latency stats do the appropriate calculation
1075 newstat[keys1] = max(stat1[keys1], stat2[keys2])
1076 elif keys1 == 'min':
1077 newstat[keys1] = min(stat1[keys1], stat2[keys2])
1078 elif keys1 == 'avg':
1079 newstat[keys1] = (stat1[keys1] + stat2[keys2]) / 2
1081 newstat[keys1] = (stat1[keys1] + stat2[keys2])
1085 def line_percentage(port, stats, time_active, packet_size):
1087 Calculate the line percentage rate from the duration, port object and stat
1089 :param port: XenaPort object
1090 :param stats: Xena RXStat or TXStat object
1091 :param time_active: time the stream was active in secs as int
1092 :param packet_size: packet size as int
1093 :return: line percentage as float
1095 # this is ugly, but its prettier than calling the get method 3 times...
1097 packets = stats.data['pr_total']['packets']
1100 packets = stats.data['pt_total']['packets']
1103 'Could not calculate line rate because packet stat not found.')
1105 ifg = port.get_inter_frame_gap()
1106 pps = packets_per_second(packets, time_active)
1107 l2br = l2_bit_rate(packet_size, stats.preamble, pps)
1108 l1br = l1_bit_rate(l2br, pps, ifg, stats.preamble)
1109 return 100.0 * l1br / port.get_effective_speed()
1112 def l2_bit_rate(packet_size, preamble, pps):
1114 Return the l2 bit rate
1115 :param packet_size: packet size on the line in bytes
1116 :param preamble: preamble size of the packet header in bytes
1117 :param pps: packets per second
1118 :return: l2 bit rate as float
1120 return (packet_size * preamble) * pps
1123 def l1_bit_rate(l2br, pps, ifg, preamble):
1125 Return the l1 bit rate
1126 :param l2br: l2 bit rate int bits per second
1127 :param pps: packets per second
1128 :param ifg: the inter frame gap
1129 :param preamble: preamble size of the packet header in bytes
1130 :return: l1 bit rate as float
1132 return l2br + (pps * ifg * preamble)
1135 def make_manager_command(cmd, argument=None):
1136 """ String builder for Xena socket commands
1138 :param cmd: Command to send
1139 :param argument: Arguments for command to send
1140 :return: String of command
1142 command = '{} "{}"'.format(cmd, argument) if argument else cmd
1143 _LOGGER.info("[Command Sent] : %s", command)
1147 def make_port_command(cmd, xena_port):
1148 """ String builder for Xena port commands
1150 :param cmd: Command to send
1151 :param xena_port: XenaPort object
1152 :return: String of command
1154 command = "{} {}".format(xena_port.port_string(), cmd)
1155 _LOGGER.info("[Command Sent] : %s", command)
1159 def make_stream_command(cmd, args, xena_stream):
1160 """ String builder for Xena port commands
1162 :param cmd: Command to send
1163 :param xena_stream: XenaStream object
1164 :return: String of command
1166 command = "{} {} [{}] {}".format(xena_stream.xena_port.port_string(), cmd,
1167 xena_stream.stream_id, args)
1168 _LOGGER.info("[Command Sent] : %s", command)
1172 def packets_per_second(packets, time_in_sec):
1174 Return the pps as float
1175 :param packets: total packets
1176 :param time_in_sec: time in seconds
1177 :return: float of pps
1179 return packets / time_in_sec