1 # Copyright 2016 Red Hat Inc & Xena Networks.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 # This is a port of code by Xena and Flavios ported to python 3 compatibility.
16 # Credit given to Xena and Flavio for providing most of the logic of this code.
17 # The code has changes for PEP 8 and python 3 conversion. Added Stat classes
18 # for better scaling of future requirements. Also added calculation functions
19 # for line rate to align within VSPerf project.
20 # Flavios xena libraries available at https://github.com/fleitner/XenaPythonLib
23 # Flavio Leitner, Red Hat Inc.
24 # Dan Amzulescu, Xena Networks
25 # Christian Trautman, Red Hat Inc.
28 Xena Socket API Driver module for communicating directly with Xena system
29 through socket commands and returning different statistics.
39 # Xena Socket Commands
40 CMD_CLEAR_RX_STATS = 'pr_clear'
41 CMD_CLEAR_TX_STATS = 'pt_clear'
43 CMD_CREATE_STREAM = 'ps_create'
44 CMD_DELETE_STREAM = 'ps_delete'
45 CMD_GET_PORT_SPEED = 'p_speed ?'
46 CMD_GET_PORT_SPEED_REDUCTION = 'p_speedreduction ?'
47 CMD_GET_RX_STATS_PER_TID = 'pr_tpldtraffic'
48 CMD_GET_STREAM_DATA = 'pt_stream'
49 CMD_GET_STREAMS_PER_PORT = 'ps_indices'
50 CMD_GET_TID_PER_STREAM = 'ps_tpldid'
51 CMD_GET_TX_STATS_PER_STREAM = 'pt_stream'
52 CMD_GET_RX_STATS = 'pr_all ?'
53 CMD_GET_TX_STATS = 'pt_all ?'
54 CMD_INTERFRAME_GAP = 'p_interframegap'
56 CMD_LOGOFF = 'c_logoff'
59 CMD_PORT_IP = 'p_ipaddress'
60 CMD_RESERVE = 'p_reservation reserve'
61 CMD_RELEASE = 'p_reservation release'
62 CMD_RELINQUISH = 'p_reservation relinquish'
64 CMD_SET_PORT_TIME_LIMIT = 'p_txtimelimit'
65 CMD_SET_STREAM_HEADER_PROTOCOL = 'ps_headerprotocol'
66 CMD_SET_STREAM_ON_OFF = 'ps_enable'
67 CMD_SET_STREAM_PACKET_HEADER = 'ps_packetheader'
68 CMD_SET_STREAM_PACKET_LENGTH = 'ps_packetlength'
69 CMD_SET_STREAM_PACKET_LIMIT = 'ps_packetlimit'
70 CMD_SET_STREAM_PACKET_PAYLOAD = 'ps_payload'
71 CMD_SET_STREAM_RATE_FRACTION = 'ps_ratefraction'
72 CMD_SET_STREAM_TEST_PAYLOAD_ID = 'ps_tpldid'
73 CMD_SET_TPLD_MODE = 'p_tpldmode'
74 CMD_START_TRAFFIC = 'p_traffic on'
75 CMD_STOP_TRAFFIC = 'p_traffic off'
76 CMD_STREAM_MODIFIER = 'ps_modifier'
77 CMD_STREAM_MODIFIER_COUNT = 'ps_modifiercount'
78 CMD_STREAM_MODIFIER_RANGE = 'ps_modifierrange'
79 CMD_VERSION = 'c_versionno ?'
81 _LOCALE = locale.getlocale()[1]
82 _LOGGER = logging.getLogger(__name__)
85 class SimpleSocket(object):
89 def __init__(self, hostname, port=5025, timeout=1):
91 :param hostname: hostname or ip as string
92 :param port: port number to use for socket as int
93 :param timeout: socket timeout as int
94 :return: SimpleSocket object
96 self.hostname = hostname
98 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
99 self.sock.settimeout(timeout)
100 self.sock.connect((hostname, port))
101 except socket.error as msg:
103 "Cannot connect to Xena Socket at %s", hostname)
104 _LOGGER.error("Exception : %s", msg)
114 """ Send the command over the socket
115 :param cmd: cmd as string
116 :return: byte utf encoded return value from socket
120 self.sock.send(cmd.encode('utf-8'))
121 return self.sock.recv(1024)
125 def read_reply(self):
126 """ Get the response from the socket
127 :return: Return the reply
129 reply = self.sock.recv(1024)
130 if reply.find("---^".encode('utf-8')) != -1:
131 # read again the syntax error msg
132 reply = self.sock.recv(1024)
135 def send_command(self, cmd):
136 """ Send the command specified over the socket
137 :param cmd: Command to send as string
141 self.sock.send(cmd.encode('utf-8'))
143 def set_keep_alive(self):
144 """ Set the keep alive for the socket
147 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
150 class KeepAliveThread(threading.Thread):
152 Keep alive socket class
156 def __init__(self, connection, interval=10):
158 :param connection: Socket for keep alive
159 :param interval: interval in seconds to send keep alive
160 :return: KeepAliveThread object
162 threading.Thread.__init__(self)
163 self.connection = connection
164 self.interval = interval
165 self.finished = threading.Event()
168 'Xena Socket keep alive thread initiated, interval ' +
169 '{} seconds'.format(self.interval))
172 """ Thread stop. See python thread docs for more info
179 """ Thread start. See python thread docs for more info
182 while not self.finished.isSet():
183 self.finished.wait(self.interval)
184 self.connection.ask(self.message)
187 class XenaSocketDriver(SimpleSocket):
193 def __init__(self, hostname, port=22611):
195 :param hostname: Hostname or ip as string
196 :param port: port to use as int
197 :return: XenaSocketDriver object
199 SimpleSocket.__init__(self, hostname=hostname, port=port)
200 SimpleSocket.set_keep_alive(self)
201 self.access_semaphor = threading.Semaphore(1)
204 """ Send the command over the socket in a thread safe manner
205 :param cmd: Command to send
206 :return: reply from socket
208 self.access_semaphor.acquire()
209 reply = SimpleSocket.ask(self, cmd)
210 self.access_semaphor.release()
213 def ask_verify(self, cmd):
214 """ Send the command over the socket in a thread safe manner and
215 verify the response is good.
216 :param cmd: Command to send
217 :return: Boolean True if command response is good, False otherwise
219 resp = self.ask(cmd).decode(_LOCALE).strip('\n')
220 _LOGGER.info('[ask_verify] %s', resp)
221 if resp == self.reply_ok:
225 def disconnect(self):
227 Close the socket connection
232 def send_command(self, cmd):
233 """ Send the command over the socket with no return
234 :param cmd: Command to send
237 self.access_semaphor.acquire()
238 SimpleSocket.send_command(self, cmd)
239 self.access_semaphor.release()
241 def send_query_replies(self, cmd):
242 """ Send the command over the socket and wait for all replies and return
244 :param cmd: Command to send
245 :return: Response from command as list
247 # send the command followed by cmd SYNC to find out
248 # when the last reply arrives.
249 self.send_command(cmd.strip('\n'))
250 self.send_command('SYNC')
252 self.access_semaphor.acquire()
253 msg = SimpleSocket.read_reply(self).decode(_LOCALE)
257 (reply, msgleft) = msg.split('\n', 1)
258 # check for syntax problems
259 if reply.rfind('Syntax') != -1:
260 self.access_semaphor.release()
263 if reply.rfind('<SYNC>') == 0:
265 self.access_semaphor.release()
268 replies.append(reply + '\n')
272 msgnew = SimpleSocket.read_reply(self).decode(_LOCALE)
273 msg = msgleft + msgnew
276 class XenaManager(object):
278 Manager class for port and socket functions
280 def __init__(self, socketDriver, user='', password='xena'):
283 Establish a connection to Xena using a ``driver`` with the ``password``
287 :param socketDriver: XenaSocketDriver connection object
288 :param password: Password to the Xena traffic generator
289 :returns: XenaManager object
291 self.driver = socketDriver
293 self.keep_alive_thread = KeepAliveThread(self.driver)
295 if self.logon(password):
296 _LOGGER.info('Connected to Xena at %s', self.driver.hostname)
298 _LOGGER.error('Failed to logon to Xena at %s', self.driver.hostname)
303 def disconnect(self):
304 """ Release ports and disconnect from chassis.
306 for module_port in self.ports:
307 module_port.release_port()
310 self.keep_alive_thread.stop()
312 def add_module_port(self, module, port):
313 """Factory for Xena Ports
315 :param module: String or int of module
316 :param port: String or int of port
317 :return: XenaPort object if success, None if port already added
319 xenaport = XenaPort(self, module, port)
320 if xenaport in self.ports:
323 self.ports.append(xenaport)
326 def get_module_port(self, module, port):
327 """Return the Xena Port object if available
328 :param module: module number as int or str
329 :param port: port number as int or str
330 :return: XenaPort object or None if not found
332 for por in self.ports:
333 if por.port == str(port) and por.module == str(module):
337 def get_version(self):
339 Get the version from the chassis
340 :return: versions of server and driver as string
342 res = self.driver.ask(make_manager_command(
343 CMD_VERSION, '')).decode(_LOCALE)
344 res = res.rstrip('\n').split()
345 return "Server: {} Driver: {}".format(res[1], res[2])
349 Logoff from the Xena chassis
350 :return: Boolean True if response OK, False if error.
352 return self.driver.ask_verify(make_manager_command(CMD_LOGOFF))
354 def logon(self, password):
355 """Login to the Xena traffic generator using the ``password`` supplied.
357 :param password: string of password
358 :return: Boolean True if response OK, False if error.
360 self.keep_alive_thread.start()
361 return self.driver.ask_verify(make_manager_command(CMD_LOGIN, password))
363 def set_owner(self, username):
364 """Set the ports owner.
365 :return: Boolean True if response OK, False if error.
367 return self.driver.ask_verify(make_manager_command(CMD_OWNER, username))
370 class XenaPort(object):
372 Xena Port emulator class
374 def __init__(self, manager, module, port):
377 :param manager: XenaManager object
378 :param module: Module as string or int of module to use
379 :param port: Port as string or int of port to use
380 :return: XenaPort object
382 self._manager = manager
383 self._module = str(module)
384 self._port = str(port)
385 self._streams = list()
389 """Property for manager attribute
390 :return: manager object
396 """Property for module attribute
397 :return: module value as string
403 """Property for port attribute
404 :return: port value as string
408 def port_string(self):
409 """String builder with attributes
410 :return: String of module port for command sequence
412 stringify = "{}/{}".format(self._module, self._port)
415 def add_stream(self):
416 """Add a stream to the port.
417 :return: XenaStream object, None if failure
419 identifier = len(self._streams)
420 stream = XenaStream(self, identifier)
421 if self._manager.driver.ask_verify(make_stream_command(
422 CMD_CREATE_STREAM, '', stream)):
423 self._streams.append(stream)
426 _LOGGER.error("Error during stream creation")
429 def clear_stats(self, rx_clear=True, tx_clear=True):
430 """Clear the port stats
432 :param rx_clear: Boolean if rx stats are to be cleared
433 :param tx_clear: Boolean if tx stats are to be cleared
434 :return: Boolean True if response OK, False if error.
436 command = make_port_command(CMD_CLEAR_RX_STATS, self)
437 res1 = self._manager.driver.ask_verify(command) if rx_clear else True
438 command = make_port_command(CMD_CLEAR_TX_STATS, self)
439 res2 = self._manager.driver.ask_verify(command) if tx_clear else True
440 return all([res1, res2])
442 def get_effective_speed(self):
444 Get the effective speed on the port
445 :return: effective speed as float
447 port_speed = self.get_port_speed()
448 reduction = self.get_port_speed_reduction()
449 effective_speed = port_speed * (1.0 - reduction / 1000000.0)
450 return effective_speed
452 def get_inter_frame_gap(self):
454 Get the interframe gap and return it as string
455 :return: integer of interframe gap
457 command = make_port_command(CMD_INTERFRAME_GAP + '?', self)
458 res = self._manager.driver.ask(command).decode(_LOCALE)
459 res = int(res.rstrip('\n').split(' ')[-1])
462 def get_port_speed(self):
464 Get the port speed as bits from port and return it as a int.
465 :return: Int of port speed
467 command = make_port_command(CMD_GET_PORT_SPEED, self)
468 res = self._manager.driver.ask(command).decode(_LOCALE)
469 port_speed = res.split(' ')[-1].rstrip('\n')
470 return int(port_speed) * 1000000
472 def get_port_speed_reduction(self):
474 Get the port speed reduction value as int
475 :return: Integer of port speed reduction value
477 command = make_port_command(CMD_GET_PORT_SPEED_REDUCTION, self)
478 res = self._manager.driver.ask(command).decode(_LOCALE)
479 res = int(res.rstrip('\n').split(' ')[-1])
482 def get_rx_stats(self):
483 """Get the rx stats and return the data as a dict.
484 :return: Receive stats as dictionary
486 command = make_port_command(CMD_GET_RX_STATS, self)
487 rx_data = self._manager.driver.send_query_replies(command)
488 data = XenaRXStats(rx_data, time.time())
491 def get_tx_stats(self):
492 """Get the tx stats and return the data as a dict.
493 :return: Receive stats as dictionary
495 command = make_port_command(CMD_GET_TX_STATS, self)
496 tx_data = self._manager.driver.send_query_replies(command)
497 data = XenaTXStats(tx_data, time.time())
500 def micro_tpld_disable(self):
501 """Disable micro TPLD and return to standard payload size
502 :return: Boolean if response OK, False if error
504 command = make_port_command(CMD_SET_TPLD_MODE + ' normal', self)
505 return self._manager.driver.ask_verify(command)
507 def micro_tpld_enable(self):
508 """Enable micro TPLD 6 byte payloads.
509 :Return Boolean if response OK, False if error
511 command = make_port_command(CMD_SET_TPLD_MODE + ' micro', self)
512 return self._manager.driver.ask_verify(command)
514 def release_port(self):
516 :return: Boolean True if response OK, False if error.
518 command = make_port_command(CMD_RELEASE, self)
519 return self._manager.driver.ask_verify(command)
521 def reserve_port(self):
523 :return: Boolean True if response OK, False if error.
525 command = make_port_command(CMD_RESERVE, self)
526 return self._manager.driver.ask_verify(command)
528 def reset_port(self):
530 :return: Boolean True if response OK, False if error.
532 command = make_port_command(CMD_RESET, self)
533 return self._manager.driver.ask_verify(command)
535 def set_port_ip(self, ip_addr, cidr, gateway, wild='255'):
537 Set the port ip address of the specific port
538 :param ip_addr: IP address to set to port
539 :param cidr: cidr number for the subnet
540 :param gateway: Gateway ip for port
541 :param wild: wildcard used for ARP and PING replies
542 :return: Boolean True if response OK, False if error
544 # convert the cidr to a dot notation subnet address
545 subnet = socket.inet_ntoa(
546 struct.pack(">I", (0xffffffff << (32 - cidr)) & 0xffffffff))
548 command = make_port_command('{} {} {} {} 0.0.0.{}'.format(
549 CMD_PORT_IP, ip_addr, subnet, gateway, wild), self)
550 return self._manager.driver.ask_verify(command)
552 def set_port_time_limit(self, micro_seconds):
553 """Set the port time limit in ms
554 :param micro_seconds: ms for port time limit
555 :return: Boolean True if response OK, False if error.
557 command = make_port_command('{} {}'.format(
558 CMD_SET_PORT_TIME_LIMIT, micro_seconds), self)
559 return self._manager.driver.ask_verify(command)
561 def traffic_off(self):
563 :return: Boolean True if response OK, False if error.
565 command = make_port_command(CMD_STOP_TRAFFIC, self)
566 return self._manager.driver.ask_verify(command)
568 def traffic_on(self):
570 :return: Boolean True if response OK, False if error.
572 command = make_port_command(CMD_START_TRAFFIC, self)
573 return self._manager.driver.ask_verify(command)
576 class XenaStream(object):
578 Xena stream emulator class
580 def __init__(self, xenaPort, streamID):
583 :param xenaPort: XenaPort object
584 :param streamID: Stream ID as int or string
585 :return: XenaStream object
587 self._xena_port = xenaPort
588 self._stream_id = str(streamID)
589 self._manager = self._xena_port.manager
590 self._header_protocol = None
594 """Property for port attribute
595 :return: XenaPort object
597 return self._xena_port
601 """Property for streamID attribute
602 :return: streamID value as string
604 return self._stream_id
606 def enable_multistream(self, flows, layer):
608 Basic implementation of multi stream. Enable multi stream by setting
609 modifiers on the stream
610 :param flows: Numbers of flows or end range
611 :param layer: layer to enable multi stream as str. Acceptable values
613 :return: True if success False otherwise
615 if not self._header_protocol:
617 "Please set a protocol header before calling this method.")
619 # byte offsets for setting the modifier
622 'L3': [32, 36] if 'VLAN' in self._header_protocol else [28, 32],
623 'L4': [38, 40] if 'VLAN' in self._header_protocol else [34, 36]
627 if layer in offsets.keys() and flows > 0:
628 command = make_port_command(
629 CMD_STREAM_MODIFIER_COUNT + ' [{}]'.format(self._stream_id) +
630 ' 2', self._xena_port)
631 responses.append(self._manager.driver.ask_verify(command))
632 command = make_port_command(
633 CMD_STREAM_MODIFIER + ' [{},0] {} 0xFFFF0000 INC 1'.format(
634 self._stream_id, offsets[layer][0]), self._xena_port)
635 responses.append(self._manager.driver.ask_verify(command))
636 command = make_port_command(
637 CMD_STREAM_MODIFIER_RANGE + ' [{},0] 0 1 {}'.format(
638 self._stream_id, flows), self._xena_port)
639 responses.append(self._manager.driver.ask_verify(command))
640 command = make_port_command(
641 CMD_STREAM_MODIFIER + ' [{},1] {} 0xFFFF0000 INC 1'.format(
642 self._stream_id, offsets[layer][1]), self._xena_port)
643 responses.append(self._manager.driver.ask_verify(command))
644 command = make_port_command(
645 CMD_STREAM_MODIFIER_RANGE + ' [{},1] 0 1 {}'.format(
646 self._stream_id, flows), self._xena_port)
647 responses.append(self._manager.driver.ask_verify(command))
648 return all(responses) # return True if they all worked
651 'No flows specified in enable multistream. Bypassing...')
654 raise NotImplementedError(
655 "Non-implemented stream layer in method enable multistream ",
658 def get_stream_data(self):
660 Get the response for stream data
661 :return: String of response for stream data info
663 command = make_stream_command(CMD_GET_STREAM_DATA, '?', self)
664 res = self._manager.driver.ask(command).decode(_LOCALE)
667 def set_header_protocol(self, protocol_header):
668 """Set the header info for the packet header hex.
669 If the packet header contains just Ethernet and IP info then call this
670 method with ETHERNET IP as the protocol header.
672 :param protocol_header: protocol header argument
673 :return: Boolean True if success, False if error
675 command = make_stream_command(
676 CMD_SET_STREAM_HEADER_PROTOCOL,
677 protocol_header, self)
678 if self._manager.driver.ask_verify(command):
679 self._header_protocol = protocol_header
685 """Set the stream to off
686 :return: Boolean True if success, False if error
688 return self._manager.driver.ask_verify(make_stream_command(
689 CMD_SET_STREAM_ON_OFF, 'off', self))
692 """Set the stream to on
693 :return: Boolean True if success, False if error
695 return self._manager.driver.ask_verify(make_stream_command(
696 CMD_SET_STREAM_ON_OFF, 'on', self))
698 def set_packet_header(self, header):
699 """Set the stream packet header
701 :param header: packet header as hex bytes
702 :return: Boolean True if success, False if error
704 return self._manager.driver.ask_verify(make_stream_command(
705 CMD_SET_STREAM_PACKET_HEADER, header, self))
707 def set_packet_length(self, pattern_type, minimum, maximum):
708 """Set the pattern length with min and max values based on the pattern
711 :param pattern_type: String of pattern type, valid entries [ fixed,
712 butterfly, random, mix, incrementing ]
713 :param minimum: integer of minimum byte value
714 :param maximum: integer of maximum byte value
715 :return: Boolean True if success, False if error
717 return self._manager.driver.ask_verify(make_stream_command(
718 CMD_SET_STREAM_PACKET_LENGTH, '{} {} {}'.format(
719 pattern_type, minimum, maximum), self))
721 def set_packet_limit(self, limit):
722 """Set the packet limit
724 :param limit: number of packets that will be sent, use -1 to disable
725 :return: Boolean True if success, False if error
727 return self._manager.driver.ask_verify(make_stream_command(
728 CMD_SET_STREAM_PACKET_LIMIT, limit, self))
730 def set_packet_payload(self, payload_type, hex_value):
731 """Set the payload to the hex value based on the payload type
733 :param payload_type: string of the payload type, valid entries [ pattern,
735 :param hex_value: hex string of valid hex
736 :return: Boolean True if success, False if error
738 return self._manager.driver.ask_verify(make_stream_command(
739 CMD_SET_STREAM_PACKET_PAYLOAD, '{} {}'.format(
740 payload_type, hex_value), self))
742 def set_rate_fraction(self, fraction):
743 """Set the rate fraction
745 :param fraction: fraction for the stream
746 :return: Boolean True if success, False if error
748 return self._manager.driver.ask_verify(make_stream_command(
749 CMD_SET_STREAM_RATE_FRACTION, fraction, self))
751 def set_payload_id(self, identifier):
752 """ Set the test payload ID
753 :param identifier: ID as int or string
754 :return: Boolean True if success, False if error
756 return self._manager.driver.ask_verify(make_stream_command(
757 CMD_SET_STREAM_TEST_PAYLOAD_ID, identifier, self))
760 class XenaRXStats(object):
764 def __init__(self, stats, epoc):
766 :param stats: Stats from pr all command as list
767 :param epoc: Current time in epoc
768 :return: XenaRXStats object
772 self.data = self.parse_stats()
776 def _pack_stats(param, start, fields=None):
777 """ Pack up the list of stats in a dictionary
778 :param param: The list of params to process
779 :param start: What element to start at
780 :param fields: The field names to pack as keys
781 :return: Dictionary of data where fields match up to the params
784 fields = ['bps', 'pps', 'bytes', 'packets']
787 for column in fields:
788 data[column] = int(param[start + i])
794 def _pack_tplds_stats(param, start):
795 """ Pack up the tplds stats
796 :param param: List of params to pack
797 :param start: What element to start at
798 :return: Dictionary of stats
802 for val in range(start, len(param) - start):
803 data[i] = int(param[val])
807 def _pack_rxextra_stats(self, param, start):
808 """ Pack up the extra stats
809 :param param: List of params to pack
810 :param start: What element to start at
811 :return: Dictionary of stats
813 fields = ['fcserrors', 'pauseframes', 'arprequests', 'arpreplies',
814 'pingrequests', 'pingreplies', 'gapcount', 'gapduration']
815 return self._pack_stats(param, start, fields)
817 def _pack_tplderrors_stats(self, param, start):
818 """ Pack up tlpd errors
819 :param param: List of params to pack
820 :param start: What element to start at
821 :return: Dictionary of stats
823 fields = ['dummy', 'seq', 'mis', 'pld']
824 return self._pack_stats(param, start, fields)
826 def _pack_tpldlatency_stats(self, param, start):
827 """ Pack up the tpld latency stats
828 :param param: List of params to pack
829 :param start: What element to start at
830 :return: Dictionary of stats
832 fields = ['min', 'avg', 'max', '1sec']
833 return self._pack_stats(param, start, fields)
835 def _pack_tpldjitter_stats(self, param, start):
836 """ Pack up the tpld jitter stats
837 :param param: List of params to pack
838 :param start: What element to start at
839 :return: Dictionary of stats
841 fields = ['min', 'avg', 'max', '1sec']
842 return self._pack_stats(param, start, fields)
847 :return: Time as String of epoc of when stats were collected
851 def parse_stats(self):
852 """ Parse the stats from pr all command
853 :return: Dictionary of all stats
856 for line in self._stats:
858 if param[1] == 'PR_TOTAL':
859 statdict['pr_total'] = self._pack_stats(param, 2)
860 elif param[1] == 'PR_NOTPLD':
861 statdict['pr_notpld'] = self._pack_stats(param, 2,)
862 elif param[1] == 'PR_EXTRA':
863 statdict['pr_extra'] = self._pack_rxextra_stats(param, 2)
864 elif param[1] == 'PT_STREAM':
865 entry_id = "pt_stream_%s" % param[2].strip('[]')
866 statdict[entry_id] = self._pack_stats(param, 3)
867 elif param[1] == 'PR_TPLDS':
868 tid_list = self._pack_tplds_stats(param, 2)
870 statdict['pr_tplds'] = tid_list
871 elif param[1] == 'PR_TPLDTRAFFIC':
872 if 'pr_tpldstraffic' in statdict:
873 data = statdict['pr_tpldstraffic']
876 entry_id = param[2].strip('[]')
877 data[entry_id] = self._pack_stats(param, 3)
878 statdict['pr_tpldstraffic'] = data
879 elif param[1] == 'PR_TPLDERRORS':
880 if 'pr_tplderrors' in statdict:
881 data = statdict['pr_tplderrors']
884 entry_id = param[2].strip('[]')
885 data[entry_id] = self._pack_tplderrors_stats(param, 3)
886 statdict['pr_tplderrors'] = data
887 elif param[1] == 'PR_TPLDLATENCY':
888 if 'pr_tpldlatency' in statdict:
889 data = statdict['pr_tpldlatency']
892 entry_id = param[2].strip('[]')
893 data[entry_id] = self._pack_tpldlatency_stats(param, 3)
894 statdict['pr_tpldlatency'] = data
895 elif param[1] == 'PR_TPLDJITTER':
896 if 'pr_tpldjitter' in statdict:
897 data = statdict['pr_tpldjitter']
900 entry_id = param[2].strip('[]')
901 data[entry_id] = self._pack_tpldjitter_stats(param, 3)
902 statdict['pr_pldjitter'] = data
903 elif param[1] == 'PR_FILTER':
904 if 'pr_filter' in statdict:
905 data = statdict['pr_filter']
908 entry_id = param[2].strip('[]')
909 data[entry_id] = self._pack_stats(param, 3)
910 statdict['pr_filter'] = data
911 elif param[1] == 'P_RECEIVESYNC':
912 if param[2] == 'IN_SYNC':
913 statdict['p_receivesync'] = {'IN SYNC': 'True'}
915 statdict['p_receivesync'] = {'IN SYNC': 'False'}
917 logging.warning("XenaPort: unknown stats: %s", param[1])
923 class XenaTXStats(object):
925 Xena transmit stat class
927 def __init__(self, stats, epoc):
929 :param stats: Stats from pt all command as list
930 :param epoc: Current time in epoc
931 :return: XenaTXStats object
935 self._ptstreamkeys = list()
936 self.data = self.parse_stats()
940 def _pack_stats(params, start, fields=None):
941 """ Pack up the list of stats in a dictionary
942 :param params: The list of params to process
943 :param start: What element to start at
944 :param fields: The field names to pack as keys
945 :return: Dictionary of data where fields match up to the params
948 fields = ['bps', 'pps', 'bytes', 'packets']
951 for column in fields:
952 data[column] = int(params[start + i])
957 def _pack_txextra_stats(self, params, start):
958 """ Pack up the tx extra stats
959 :param params: List of params to pack
960 :param start: What element to start at
961 :return: Dictionary of stats
963 fields = ['arprequests', 'arpreplies', 'pingrequests', 'pingreplies',
964 'injectedfcs', 'injectedseq', 'injectedmis', 'injectedint',
965 'injectedtid', 'training']
966 return self._pack_stats(params, start, fields)
969 def pt_stream_keys(self):
971 :return: Return a list of pt_stream_x stream key ids
973 return self._ptstreamkeys
978 :return: Time as String of epoc of when stats were collected
982 def parse_stats(self):
983 """ Parse the stats from pr all command
984 :return: Dictionary of all stats
987 for line in self._stats:
989 if param[1] == 'PT_TOTAL':
990 statdict['pt_total'] = self._pack_stats(param, 2)
991 elif param[1] == 'PT_NOTPLD':
992 statdict['pt_notpld'] = self._pack_stats(param, 2,)
993 elif param[1] == 'PT_EXTRA':
994 statdict['pt_extra'] = self._pack_txextra_stats(param, 2)
995 elif param[1] == 'PT_STREAM':
996 entry_id = "pt_stream_%s" % param[2].strip('[]')
997 self._ptstreamkeys.append(entry_id)
998 statdict[entry_id] = self._pack_stats(param, 3)
1000 logging.warning("XenaPort: unknown stats: %s", param[1])
1004 def aggregate_stats(stat1, stat2):
1006 Judge whether stat1 and stat2 both have same key, if both have same key,
1007 call the aggregate fuction, else use the stat1's value
1010 for keys in stat1.keys():
1011 if keys in stat2 and isinstance(stat1[keys], dict):
1012 newstat[keys] = aggregate(stat1[keys], stat2[keys])
1014 newstat[keys] = stat1[keys]
1017 def aggregate(stat1, stat2):
1019 Recursive function to aggregate two sets of statistics. This is used when
1020 bi directional traffic is done and statistics need to be calculated based
1021 on two sets of statistics.
1022 :param stat1: One set of dictionary stats from RX or TX stats
1023 :param stat2: Second set of dictionary stats from RX or TX stats
1024 :return: stats for data entry in RX or TX Stats instance
1027 for (keys1, keys2) in zip(stat1.keys(), stat2.keys()):
1028 if isinstance(stat1[keys1], dict):
1029 newstat[keys1] = aggregate(stat1[keys1], stat2[keys2])
1031 if not isinstance(stat1[keys1], int) and not isinstance(
1033 # its some value we don't need to aggregate
1035 # for latency stats do the appropriate calculation
1037 newstat[keys1] = max(stat1[keys1], stat2[keys2])
1038 elif keys1 == 'min':
1039 newstat[keys1] = min(stat1[keys1], stat2[keys2])
1040 elif keys1 == 'avg':
1041 newstat[keys1] = (stat1[keys1] + stat2[keys2]) / 2
1043 newstat[keys1] = (stat1[keys1] + stat2[keys2])
1047 def line_percentage(port, stats, time_active, packet_size):
1049 Calculate the line percentage rate from the duration, port object and stat
1051 :param port: XenaPort object
1052 :param stats: Xena RXStat or TXStat object
1053 :param time_active: time the stream was active in secs as int
1054 :param packet_size: packet size as int
1055 :return: line percentage as float
1057 # this is ugly, but its prettier than calling the get method 3 times...
1059 packets = stats.data['pr_total']['packets']
1062 packets = stats.data['pt_total']['packets']
1065 'Could not calculate line rate because packet stat not found.')
1067 ifg = port.get_inter_frame_gap()
1068 pps = packets_per_second(packets, time_active)
1069 l2br = l2_bit_rate(packet_size, stats.preamble, pps)
1070 l1br = l1_bit_rate(l2br, pps, ifg, stats.preamble)
1071 return 100.0 * l1br / port.get_effective_speed()
1074 def l2_bit_rate(packet_size, preamble, pps):
1076 Return the l2 bit rate
1077 :param packet_size: packet size on the line in bytes
1078 :param preamble: preamble size of the packet header in bytes
1079 :param pps: packets per second
1080 :return: l2 bit rate as float
1082 return (packet_size * preamble) * pps
1085 def l1_bit_rate(l2br, pps, ifg, preamble):
1087 Return the l1 bit rate
1088 :param l2br: l2 bit rate int bits per second
1089 :param pps: packets per second
1090 :param ifg: the inter frame gap
1091 :param preamble: preamble size of the packet header in bytes
1092 :return: l1 bit rate as float
1094 return l2br + (pps * ifg * preamble)
1097 def make_manager_command(cmd, argument=None):
1098 """ String builder for Xena socket commands
1100 :param cmd: Command to send
1101 :param argument: Arguments for command to send
1102 :return: String of command
1104 command = '{} "{}"'.format(cmd, argument) if argument else cmd
1105 _LOGGER.info("[Command Sent] : %s", command)
1109 def make_port_command(cmd, xena_port):
1110 """ String builder for Xena port commands
1112 :param cmd: Command to send
1113 :param xena_port: XenaPort object
1114 :return: String of command
1116 command = "{} {}".format(xena_port.port_string(), cmd)
1117 _LOGGER.info("[Command Sent] : %s", command)
1121 def make_stream_command(cmd, args, xena_stream):
1122 """ String builder for Xena port commands
1124 :param cmd: Command to send
1125 :param xena_stream: XenaStream object
1126 :return: String of command
1128 command = "{} {} [{}] {}".format(xena_stream.xena_port.port_string(), cmd,
1129 xena_stream.stream_id, args)
1130 _LOGGER.info("[Command Sent] : %s", command)
1134 def packets_per_second(packets, time_in_sec):
1136 Return the pps as float
1137 :param packets: total packets
1138 :param time_in_sec: time in seconds
1139 :return: float of pps
1141 return packets / time_in_sec