Merge "trex_learning: Add learning packet option to T-Rex testing"
[vswitchperf.git] / tools / pkt_gen / xena / XenaDriver.py
1 # Copyright 2016-2017 Red Hat Inc & Xena Networks.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #   http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 # This is a port of code by Xena and Flavios ported to python 3 compatibility.
16 # Credit given to Xena and Flavio for providing most of the logic of this code.
17 # The code has changes for PEP 8 and python 3 conversion. Added Stat classes
18 # for better scaling of future requirements. Also added calculation functions
19 # for line rate to align within VSPerf project.
20 # Flavios xena libraries available at https://github.com/fleitner/XenaPythonLib
21
22 # Contributors:
23 #   Flavio Leitner, Red Hat Inc.
24 #   Dan Amzulescu, Xena Networks
25 #   Christian Trautman, Red Hat Inc.
26
27 """
28 Xena Socket API Driver module for communicating directly with Xena system
29 through socket commands and returning different statistics.
30 """
31 import locale
32 import logging
33 import socket
34 import struct
35 import sys
36 import threading
37 import time
38 # pylint: disable=too-many-lines
39 # Xena Socket Commands
40 CMD_CLEAR_RX_STATS = 'pr_clear'
41 CMD_CLEAR_TX_STATS = 'pt_clear'
42 CMD_COMMENT = ';'
43 CMD_CREATE_STREAM = 'ps_create'
44 CMD_DELETE_STREAM = 'ps_delete'
45 CMD_GET_PORT_SPEED = 'p_speed ?'
46 CMD_GET_PORT_SPEED_REDUCTION = 'p_speedreduction ?'
47 CMD_GET_RX_STATS_PER_TID = 'pr_tpldtraffic'
48 CMD_GET_STREAM_DATA = 'pt_stream'
49 CMD_GET_STREAMS_PER_PORT = 'ps_indices'
50 CMD_GET_TID_PER_STREAM = 'ps_tpldid'
51 CMD_GET_TX_STATS_PER_STREAM = 'pt_stream'
52 CMD_GET_RX_STATS = 'pr_all ?'
53 CMD_GET_TX_STATS = 'pt_all ?'
54 CMD_INTERFRAME_GAP = 'p_interframegap'
55 CMD_LOGIN = 'c_logon'
56 CMD_LOGOFF = 'c_logoff'
57 CMD_OWNER = 'c_owner'
58 CMD_PORT = ';Port:'
59 CMD_PORT_IP = 'p_ipaddress'
60 CMD_PORT_LEARNING = 'p_autotrain'
61 CMD_RESERVE = 'p_reservation reserve'
62 CMD_RELEASE = 'p_reservation release'
63 CMD_RELINQUISH = 'p_reservation relinquish'
64 CMD_RESET = 'p_reset'
65 CMD_SET_PORT_ARP_REPLY = 'p_arpreply'
66 CMD_SET_PORT_ARP_V6_REPLY = 'p_arpv6reply'
67 CMD_SET_PORT_PING_REPLY = 'p_pingreply'
68 CMD_SET_PORT_PING_V6_REPLY = 'p_pingv6reply'
69 CMD_SET_PORT_TIME_LIMIT = 'p_txtimelimit'
70 CMD_SET_STREAM_HEADER_PROTOCOL = 'ps_headerprotocol'
71 CMD_SET_STREAM_ON_OFF = 'ps_enable'
72 CMD_SET_STREAM_PACKET_HEADER = 'ps_packetheader'
73 CMD_SET_STREAM_PACKET_LENGTH = 'ps_packetlength'
74 CMD_SET_STREAM_PACKET_LIMIT = 'ps_packetlimit'
75 CMD_SET_STREAM_PACKET_PAYLOAD = 'ps_payload'
76 CMD_SET_STREAM_RATE_FRACTION = 'ps_ratefraction'
77 CMD_SET_STREAM_TEST_PAYLOAD_ID = 'ps_tpldid'
78 CMD_SET_TPLD_MODE = 'p_tpldmode'
79 CMD_START_TRAFFIC = 'p_traffic on'
80 CMD_STOP_TRAFFIC = 'p_traffic off'
81 CMD_STREAM_MODIFIER = 'ps_modifier'
82 CMD_STREAM_MODIFIER_COUNT = 'ps_modifiercount'
83 CMD_STREAM_MODIFIER_RANGE = 'ps_modifierrange'
84 CMD_VERSION = 'c_versionno ?'
85
86 _LOCALE = locale.getlocale()[1]
87 _LOGGER = logging.getLogger(__name__)
88
89
90 class SimpleSocket(object):
91     """
92     Socket class
93     """
94     def __init__(self, hostname, port=5025, timeout=1):
95         """Constructor
96         :param hostname: hostname or ip as string
97         :param port: port number to use for socket as int
98         :param timeout: socket timeout as int
99         :return: SimpleSocket object
100         """
101         self.hostname = hostname
102         try:
103             self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104             self.sock.settimeout(timeout)
105             self.sock.connect((hostname, port))
106         except socket.error as msg:
107             _LOGGER.error(
108                 "Cannot connect to Xena Socket at %s", hostname)
109             _LOGGER.error("Exception : %s", msg)
110             sys.exit(1)
111
112     def __del__(self):
113         """Deconstructor
114         :return:
115         """
116         self.sock.close()
117
118     def ask(self, cmd):
119         """ Send the command over the socket
120         :param cmd: cmd as string
121         :return: byte utf encoded return value from socket
122         """
123         cmd += '\n'
124         try:
125             self.sock.send(cmd.encode('utf-8'))
126             return self.sock.recv(1024)
127         except OSError:
128             return ''
129
130     def read_reply(self):
131         """ Get the response from the socket
132         :return: Return the reply
133         """
134         reply = self.sock.recv(1024)
135         if reply.find("---^".encode('utf-8')) != -1:
136             # read again the syntax error msg
137             reply = self.sock.recv(1024)
138         return reply
139
140     def send_command(self, cmd):
141         """ Send the command specified over the socket
142         :param cmd: Command to send as string
143         :return: None
144         """
145         cmd += '\n'
146         self.sock.send(cmd.encode('utf-8'))
147
148     def set_keep_alive(self):
149         """ Set the keep alive for the socket
150         :return: None
151         """
152         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
153
154
155 class KeepAliveThread(threading.Thread):
156     """
157     Keep alive socket class
158     """
159     message = ''
160
161     def __init__(self, connection, interval=10):
162         """ Constructor
163         :param connection: Socket for keep alive
164         :param interval: interval in seconds to send keep alive
165         :return: KeepAliveThread object
166         """
167         threading.Thread.__init__(self)
168         self.connection = connection
169         self.interval = interval
170         self.finished = threading.Event()
171         self.setDaemon(True)
172         _LOGGER.debug(
173             'Xena Socket keep alive thread initiated, interval ' +
174             '{} seconds'.format(self.interval))
175
176     def stop(self):
177         """ Thread stop. See python thread docs for more info
178         :return: None
179         """
180         self.finished.set()
181         self.join()
182
183     def run(self):
184         """ Thread start. See python thread docs for more info
185         :return: None
186         """
187         while not self.finished.isSet():
188             self.finished.wait(self.interval)
189             self.connection.ask(self.message)
190
191
192 class XenaSocketDriver(SimpleSocket):
193     """
194     Xena socket class
195     """
196     reply_ok = '<OK>'
197
198     def __init__(self, hostname, port=22611):
199         """ Constructor
200         :param hostname: Hostname or ip as string
201         :param port: port to use as int
202         :return: XenaSocketDriver object
203         """
204         SimpleSocket.__init__(self, hostname=hostname, port=port)
205         SimpleSocket.set_keep_alive(self)
206         self.access_semaphor = threading.Semaphore(1)
207
208     def ask(self, cmd):
209         """ Send the command over the socket in a thread safe manner
210         :param cmd: Command to send
211         :return: reply from socket
212         """
213         self.access_semaphor.acquire()
214         reply = SimpleSocket.ask(self, cmd)
215         self.access_semaphor.release()
216         return reply
217
218     def ask_verify(self, cmd):
219         """ Send the command over the socket in a thread safe manner and
220         verify the response is good.
221         :param cmd: Command to send
222         :return: Boolean True if command response is good, False otherwise
223         """
224         resp = self.ask(cmd).decode(_LOCALE).strip('\n')
225         _LOGGER.info('[ask_verify] %s', resp)
226         if resp == self.reply_ok:
227             return True
228         return False
229
230     def disconnect(self):
231         """
232         Close the socket connection
233         :return: None
234         """
235         self.sock.close()
236
237     def send_command(self, cmd):
238         """ Send the command over the socket with no return
239         :param cmd: Command to send
240         :return: None
241         """
242         self.access_semaphor.acquire()
243         SimpleSocket.send_command(self, cmd)
244         self.access_semaphor.release()
245
246     def send_query_replies(self, cmd):
247         """ Send the command over the socket and wait for all replies and return
248         the lines as a list
249         :param cmd: Command to send
250         :return: Response from command as list
251         """
252         # send the command followed by cmd SYNC to find out
253         # when the last reply arrives.
254         self.send_command(cmd.strip('\n'))
255         self.send_command('SYNC')
256         replies = []
257         self.access_semaphor.acquire()
258         msg = SimpleSocket.read_reply(self).decode(_LOCALE)
259         msgleft = ''
260         while True:
261             if '\n' in msg:
262                 (reply, msgleft) = msg.split('\n', 1)
263                 # check for syntax problems
264                 if reply.rfind('Syntax') != -1:
265                     self.access_semaphor.release()
266                     return []
267
268                 if reply.rfind('<SYNC>') == 0:
269
270                     self.access_semaphor.release()
271                     return replies
272
273                 replies.append(reply + '\n')
274                 msg = msgleft
275             else:
276                 # more bytes to come
277                 msgnew = SimpleSocket.read_reply(self).decode(_LOCALE)
278                 msg = msgleft + msgnew
279
280
281 class XenaManager(object):
282     """
283     Manager class for port and socket functions
284     """
285     def __init__(self, socketDriver, user='', password='xena'):
286         """Constructor
287
288         Establish a connection to Xena using a ``driver`` with the ``password``
289         supplied.
290
291         Attributes:
292         :param socketDriver: XenaSocketDriver connection object
293         :param password: Password to the Xena traffic generator
294         :returns: XenaManager object
295         """
296         self.driver = socketDriver
297         self.ports = list()
298         self.keep_alive_thread = KeepAliveThread(self.driver)
299
300         if self.logon(password):
301             _LOGGER.info('Connected to Xena at %s', self.driver.hostname)
302         else:
303             _LOGGER.error('Failed to logon to Xena at %s', self.driver.hostname)
304             return
305
306         self.set_owner(user)
307
308     def disconnect(self):
309         """ Release ports and disconnect from chassis.
310         """
311         for module_port in self.ports:
312             module_port.release_port()
313         self.ports = []
314         self.logoff()
315         self.keep_alive_thread.stop()
316
317     def add_module_port(self, module, port):
318         """Factory for Xena Ports
319
320         :param module: String or int of module
321         :param port: String or int of port
322         :return: XenaPort object if success, None if port already added
323         """
324         xenaport = XenaPort(self, module, port)
325         if xenaport in self.ports:
326             return None
327         else:
328             self.ports.append(xenaport)
329             return xenaport
330
331     def get_module_port(self, module, port):
332         """Return the Xena Port object if available
333         :param module: module number as int or str
334         :param port: port number as int or str
335         :return: XenaPort object or None if not found
336         """
337         for por in self.ports:
338             if por.port == str(port) and por.module == str(module):
339                 return por
340         return None
341
342     def get_version(self):
343         """
344         Get the version from the chassis
345         :return: versions of server and driver as string
346         """
347         res = self.driver.ask(make_manager_command(
348             CMD_VERSION, '')).decode(_LOCALE)
349         res = res.rstrip('\n').split()
350         return "Server: {} Driver: {}".format(res[1], res[2])
351
352     def logoff(self):
353         """
354         Logoff from the Xena chassis
355         :return: Boolean True if response OK, False if error.
356         """
357         return self.driver.ask_verify(make_manager_command(CMD_LOGOFF))
358
359     def logon(self, password):
360         """Login to the Xena traffic generator using the ``password`` supplied.
361
362         :param password: string of password
363         :return: Boolean True if response OK, False if error.
364         """
365         self.keep_alive_thread.start()
366         return self.driver.ask_verify(make_manager_command(CMD_LOGIN, password))
367
368     def set_owner(self, username):
369         """Set the ports owner.
370         :return: Boolean True if response OK, False if error.
371         """
372         return self.driver.ask_verify(make_manager_command(CMD_OWNER, username))
373
374 # pylint: disable=too-many-public-methods
375 class XenaPort(object):
376     """
377     Xena Port emulator class
378     """
379     def __init__(self, manager, module, port):
380         """Constructor
381
382         :param manager: XenaManager object
383         :param module: Module as string or int of module to use
384         :param port: Port as string or int of port to use
385         :return: XenaPort object
386         """
387         self._manager = manager
388         self._module = str(module)
389         self._port = str(port)
390         self._streams = list()
391
392     @property
393     def manager(self):
394         """Property for manager attribute
395         :return: manager object
396         """
397         return self._manager
398
399     @property
400     def module(self):
401         """Property for module attribute
402         :return: module value as string
403         """
404         return self._module
405
406     @property
407     def port(self):
408         """Property for port attribute
409         :return: port value as string
410         """
411         return self._port
412
413     def port_string(self):
414         """String builder with attributes
415         :return: String of module port for command sequence
416         """
417         stringify = "{}/{}".format(self._module, self._port)
418         return stringify
419
420     def add_stream(self):
421         """Add a stream to the port.
422         :return: XenaStream object, None if failure
423         """
424         identifier = len(self._streams)
425         stream = XenaStream(self, identifier)
426         if self._manager.driver.ask_verify(make_stream_command(
427                 CMD_CREATE_STREAM, '', stream)):
428             self._streams.append(stream)
429             return stream
430         else:
431             _LOGGER.error("Error during stream creation")
432             return None
433
434     def clear_stats(self, rx_clear=True, tx_clear=True):
435         """Clear the port stats
436
437         :param rx_clear: Boolean if rx stats are to be cleared
438         :param tx_clear: Boolean if tx stats are to be cleared
439         :return: Boolean True if response OK, False if error.
440         """
441         command = make_port_command(CMD_CLEAR_RX_STATS, self)
442         res1 = self._manager.driver.ask_verify(command) if rx_clear else True
443         command = make_port_command(CMD_CLEAR_TX_STATS, self)
444         res2 = self._manager.driver.ask_verify(command) if tx_clear else True
445         return all([res1, res2])
446
447     def get_effective_speed(self):
448         """
449         Get the effective speed on the port
450         :return: effective speed as float
451         """
452         port_speed = self.get_port_speed()
453         reduction = self.get_port_speed_reduction()
454         effective_speed = port_speed * (1.0 - reduction / 1000000.0)
455         return effective_speed
456
457     def get_inter_frame_gap(self):
458         """
459         Get the interframe gap and return it as string
460         :return: integer of interframe gap
461         """
462         command = make_port_command(CMD_INTERFRAME_GAP + '?', self)
463         res = self._manager.driver.ask(command).decode(_LOCALE)
464         res = int(res.rstrip('\n').split(' ')[-1])
465         return res
466
467     def get_port_speed(self):
468         """
469         Get the port speed as bits from port and return it as a int.
470         :return: Int of port speed
471         """
472         command = make_port_command(CMD_GET_PORT_SPEED, self)
473         res = self._manager.driver.ask(command).decode(_LOCALE)
474         port_speed = res.split(' ')[-1].rstrip('\n')
475         return int(port_speed) * 1000000
476
477     def get_port_speed_reduction(self):
478         """
479         Get the port speed reduction value as int
480         :return: Integer of port speed reduction value
481         """
482         command = make_port_command(CMD_GET_PORT_SPEED_REDUCTION, self)
483         res = self._manager.driver.ask(command).decode(_LOCALE)
484         res = int(res.rstrip('\n').split(' ')[-1])
485         return res
486
487     def get_rx_stats(self):
488         """Get the rx stats and return the data as a dict.
489         :return: Receive stats as dictionary
490         """
491         command = make_port_command(CMD_GET_RX_STATS, self)
492         rx_data = self._manager.driver.send_query_replies(command)
493         data = XenaRXStats(rx_data, time.time())
494         return data
495
496     def get_tx_stats(self):
497         """Get the tx stats and return the data as a dict.
498         :return: Receive stats as dictionary
499         """
500         command = make_port_command(CMD_GET_TX_STATS, self)
501         tx_data = self._manager.driver.send_query_replies(command)
502         data = XenaTXStats(tx_data, time.time())
503         return data
504
505     def micro_tpld_disable(self):
506         """Disable micro TPLD and return to standard payload size
507         :return: Boolean if response OK, False if error
508         """
509         command = make_port_command(CMD_SET_TPLD_MODE + ' normal', self)
510         return self._manager.driver.ask_verify(command)
511
512     def micro_tpld_enable(self):
513         """Enable micro TPLD 6 byte payloads.
514         :Return Boolean if response OK, False if error
515         """
516         command = make_port_command(CMD_SET_TPLD_MODE + ' micro', self)
517         return self._manager.driver.ask_verify(command)
518
519     def release_port(self):
520         """Release the port
521         :return: Boolean True if response OK, False if error.
522         """
523         command = make_port_command(CMD_RELEASE, self)
524         return self._manager.driver.ask_verify(command)
525
526     def reserve_port(self):
527         """Reserve the port
528         :return: Boolean True if response OK, False if error.
529         """
530         command = make_port_command(CMD_RESERVE, self)
531         return self._manager.driver.ask_verify(command)
532
533     def reset_port(self):
534         """Reset the port
535         :return: Boolean True if response OK, False if error.
536         """
537         command = make_port_command(CMD_RESET, self)
538         return self._manager.driver.ask_verify(command)
539
540     def set_port_arp_reply(self, is_on=True, ipv6=False):
541         """
542         Set the port arpreply value
543         :param on: Enable or disable the arp reply on the port
544         :param v6: set the value on the ip v6, disabled will set at ip v4
545         :return: Boolean True if response OK, False if error
546         """
547         command = make_port_command('{} {}'.format(
548             CMD_SET_PORT_ARP_V6_REPLY if ipv6 else CMD_SET_PORT_ARP_REPLY,
549             "on" if is_on else "off"), self)
550         return self._manager.driver.ask_verify(command)
551
552     def set_port_ping_reply(self, is_on=True, ipv6=False):
553         """
554         Set the port ping reply value
555         :param on: Enable or disable the ping reply on the port
556         :param v6: set the value on the ip v6, disabled will set at ip v4
557         :return: Boolean True if response OK, False if error
558         """
559         command = make_port_command('{} {}'.format(
560             CMD_SET_PORT_PING_V6_REPLY if ipv6 else CMD_SET_PORT_PING_REPLY,
561             "on" if is_on else "off"), self)
562         return self._manager.driver.ask_verify(command)
563
564     def set_port_learning(self, interval):
565         """Start port learning with the interval in seconds specified. 0 disables port learning
566         :param: interval as int
567         :return: Boolean True if response OK, False if error.
568         """
569         command = make_port_command('{} {}'.format(CMD_PORT_LEARNING, interval), self)
570         return self._manager.driver.ask_verify(command)
571
572     def set_port_ip(self, ip_addr, cidr, gateway, wild='255'):
573         """
574         Set the port ip address of the specific port
575         :param ip_addr: IP address to set to port
576         :param cidr: cidr number for the subnet
577         :param gateway: Gateway ip for port
578         :param wild: wildcard used for ARP and PING replies
579         :return: Boolean True if response OK, False if error
580         """
581         # convert the cidr to a dot notation subnet address
582         subnet = socket.inet_ntoa(
583             struct.pack(">I", (0xffffffff << (32 - cidr)) & 0xffffffff))
584
585         command = make_port_command('{} {} {} {} 0.0.0.{}'.format(
586             CMD_PORT_IP, ip_addr, subnet, gateway, wild), self)
587         return self._manager.driver.ask_verify(command)
588
589     def set_port_time_limit(self, micro_seconds):
590         """Set the port time limit in ms
591         :param micro_seconds: ms for port time limit
592         :return: Boolean True if response OK, False if error.
593         """
594         command = make_port_command('{} {}'.format(
595             CMD_SET_PORT_TIME_LIMIT, micro_seconds), self)
596         return self._manager.driver.ask_verify(command)
597
598     def traffic_off(self):
599         """Start traffic
600         :return: Boolean True if response OK, False if error.
601         """
602         command = make_port_command(CMD_STOP_TRAFFIC, self)
603         return self._manager.driver.ask_verify(command)
604
605     def traffic_on(self):
606         """Stop traffic
607         :return: Boolean True if response OK, False if error.
608         """
609         command = make_port_command(CMD_START_TRAFFIC, self)
610         return self._manager.driver.ask_verify(command)
611
612
613 class XenaStream(object):
614     """
615     Xena stream emulator class
616     """
617     def __init__(self, xenaPort, streamID):
618         """Constructor
619
620         :param xenaPort: XenaPort object
621         :param streamID: Stream ID as int or string
622         :return: XenaStream object
623         """
624         self._xena_port = xenaPort
625         self._stream_id = str(streamID)
626         self._manager = self._xena_port.manager
627         self._header_protocol = None
628
629     @property
630     def xena_port(self):
631         """Property for port attribute
632         :return: XenaPort object
633         """
634         return self._xena_port
635
636     @property
637     def stream_id(self):
638         """Property for streamID attribute
639         :return: streamID value as string
640         """
641         return self._stream_id
642
643     def enable_multistream(self, flows, layer):
644         """
645         Basic implementation of multi stream. Enable multi stream by setting
646         modifiers on the stream
647         :param flows: Numbers of flows or end range
648         :param layer: layer to enable multi stream as str. Acceptable values
649         are L2, L3, or L4
650         :return: True if success False otherwise
651         """
652         if not self._header_protocol:
653             raise RuntimeError(
654                 "Please set a protocol header before calling this method.")
655
656         # byte offsets for setting the modifier
657         offsets = {
658             'L2': [0, 6],
659             'L3': [32, 36] if 'VLAN' in self._header_protocol else [28, 32],
660             'L4': [38, 40] if 'VLAN' in self._header_protocol else [34, 36]
661         }
662
663         responses = list()
664         if layer in offsets.keys() and flows > 0:
665             command = make_port_command(
666                 CMD_STREAM_MODIFIER_COUNT + ' [{}]'.format(self._stream_id) +
667                 ' 2', self._xena_port)
668             responses.append(self._manager.driver.ask_verify(command))
669             command = make_port_command(
670                 CMD_STREAM_MODIFIER + ' [{},0] {} 0xFFFF0000 INC 1'.format(
671                     self._stream_id, offsets[layer][0]), self._xena_port)
672             responses.append(self._manager.driver.ask_verify(command))
673             command = make_port_command(
674                 CMD_STREAM_MODIFIER_RANGE + ' [{},0] 0 1 {}'.format(
675                     self._stream_id, flows), self._xena_port)
676             responses.append(self._manager.driver.ask_verify(command))
677             command = make_port_command(
678                 CMD_STREAM_MODIFIER + ' [{},1] {} 0xFFFF0000 INC 1'.format(
679                     self._stream_id, offsets[layer][1]), self._xena_port)
680             responses.append(self._manager.driver.ask_verify(command))
681             command = make_port_command(
682                 CMD_STREAM_MODIFIER_RANGE + ' [{},1] 0 1 {}'.format(
683                     self._stream_id, flows), self._xena_port)
684             responses.append(self._manager.driver.ask_verify(command))
685             return all(responses)  # return True if they all worked
686         elif flows < 1:
687             _LOGGER.warning(
688                 'No flows specified in enable multistream. Bypassing...')
689             return False
690         else:
691             raise NotImplementedError(
692                 "Non-implemented stream layer in method enable multistream ",
693                 "layer=", layer)
694
695     def get_stream_data(self):
696         """
697         Get the response for stream data
698         :return: String of response for stream data info
699         """
700         command = make_stream_command(CMD_GET_STREAM_DATA, '?', self)
701         res = self._manager.driver.ask(command).decode(_LOCALE)
702         return res
703
704     def set_header_protocol(self, protocol_header):
705         """Set the header info for the packet header hex.
706         If the packet header contains just Ethernet and IP info then call this
707         method with ETHERNET IP as the protocol header.
708
709         :param protocol_header: protocol header argument
710         :return: Boolean True if success, False if error
711         """
712         command = make_stream_command(
713             CMD_SET_STREAM_HEADER_PROTOCOL,
714             protocol_header, self)
715         if self._manager.driver.ask_verify(command):
716             self._header_protocol = protocol_header
717             return True
718         else:
719             return False
720
721     def set_off(self):
722         """Set the stream to off
723         :return: Boolean True if success, False if error
724         """
725         return self._manager.driver.ask_verify(make_stream_command(
726             CMD_SET_STREAM_ON_OFF, 'off', self))
727
728     def set_on(self):
729         """Set the stream to on
730         :return: Boolean True if success, False if error
731         """
732         return self._manager.driver.ask_verify(make_stream_command(
733             CMD_SET_STREAM_ON_OFF, 'on', self))
734
735     def set_packet_header(self, header):
736         """Set the stream packet header
737
738         :param header: packet header as hex bytes
739         :return: Boolean True if success, False if error
740         """
741         return self._manager.driver.ask_verify(make_stream_command(
742             CMD_SET_STREAM_PACKET_HEADER, header, self))
743
744     def set_packet_length(self, pattern_type, minimum, maximum):
745         """Set the pattern length with min and max values based on the pattern
746         type supplied
747
748         :param pattern_type: String of pattern type, valid entries [ fixed,
749          butterfly, random, mix, incrementing ]
750         :param minimum: integer of minimum byte value
751         :param maximum: integer of maximum byte value
752         :return: Boolean True if success, False if error
753         """
754         return self._manager.driver.ask_verify(make_stream_command(
755             CMD_SET_STREAM_PACKET_LENGTH, '{} {} {}'.format(
756                 pattern_type, minimum, maximum), self))
757
758     def set_packet_limit(self, limit):
759         """Set the packet limit
760
761         :param limit: number of packets that will be sent, use -1 to disable
762         :return: Boolean True if success, False if error
763         """
764         return self._manager.driver.ask_verify(make_stream_command(
765             CMD_SET_STREAM_PACKET_LIMIT, limit, self))
766
767     def set_packet_payload(self, payload_type, hex_value):
768         """Set the payload to the hex value based on the payload type
769
770         :param payload_type: string of the payload type, valid entries [ pattern,
771          incrementing, prbs ]
772         :param hex_value: hex string of valid hex
773         :return: Boolean True if success, False if error
774         """
775         return self._manager.driver.ask_verify(make_stream_command(
776             CMD_SET_STREAM_PACKET_PAYLOAD, '{} {}'.format(
777                 payload_type, hex_value), self))
778
779     def set_rate_fraction(self, fraction):
780         """Set the rate fraction
781
782         :param fraction: fraction for the stream
783         :return: Boolean True if success, False if error
784         """
785         return self._manager.driver.ask_verify(make_stream_command(
786             CMD_SET_STREAM_RATE_FRACTION, fraction, self))
787
788     def set_payload_id(self, identifier):
789         """ Set the test payload ID
790         :param identifier: ID as int or string
791         :return: Boolean True if success, False if error
792         """
793         return self._manager.driver.ask_verify(make_stream_command(
794             CMD_SET_STREAM_TEST_PAYLOAD_ID, identifier, self))
795
796
797 class XenaRXStats(object):
798     """
799     Receive stat class
800     """
801     def __init__(self, stats, epoc):
802         """ Constructor
803         :param stats: Stats from pr all command as list
804         :param epoc: Current time in epoc
805         :return: XenaRXStats object
806         """
807         self._stats = stats
808         self._time = epoc
809         self.data = self.parse_stats()
810         self.preamble = 8
811
812     @staticmethod
813     def _pack_stats(param, start, fields=None):
814         """ Pack up the list of stats in a dictionary
815         :param param: The list of params to process
816         :param start: What element to start at
817         :param fields: The field names to pack as keys
818         :return: Dictionary of data where fields match up to the params
819         """
820         if not fields:
821             fields = ['bps', 'pps', 'bytes', 'packets']
822         data = {}
823         i = 0
824         for column in fields:
825             data[column] = int(param[start + i])
826             i += 1
827
828         return data
829
830     @staticmethod
831     def _pack_tplds_stats(param, start):
832         """ Pack up the tplds stats
833         :param param: List of params to pack
834         :param start: What element to start at
835         :return: Dictionary of stats
836         """
837         data = {}
838         i = 0
839         for val in range(start, len(param) - start):
840             data[i] = int(param[val])
841             i += 1
842         return data
843
844     def _pack_rxextra_stats(self, param, start):
845         """ Pack up the extra stats
846         :param param: List of params to pack
847         :param start: What element to start at
848         :return: Dictionary of stats
849         """
850         fields = ['fcserrors', 'pauseframes', 'arprequests', 'arpreplies',
851                   'pingrequests', 'pingreplies', 'gapcount', 'gapduration']
852         return self._pack_stats(param, start, fields)
853
854     def _pack_tplderrors_stats(self, param, start):
855         """ Pack up tlpd errors
856         :param param: List of params to pack
857         :param start: What element to start at
858         :return: Dictionary of stats
859         """
860         fields = ['dummy', 'seq', 'mis', 'pld']
861         return self._pack_stats(param, start, fields)
862
863     def _pack_tpldlatency_stats(self, param, start):
864         """ Pack up the tpld latency stats
865         :param param: List of params to pack
866         :param start: What element to start at
867         :return: Dictionary of stats
868         """
869         fields = ['min', 'avg', 'max', '1sec']
870         return self._pack_stats(param, start, fields)
871
872     def _pack_tpldjitter_stats(self, param, start):
873         """ Pack up the tpld jitter stats
874         :param param: List of params to pack
875         :param start: What element to start at
876         :return: Dictionary of stats
877         """
878         fields = ['min', 'avg', 'max', '1sec']
879         return self._pack_stats(param, start, fields)
880
881     @property
882     def time(self):
883         """
884         :return: Time as String of epoc of when stats were collected
885         """
886         return self._time
887
888     # pylint: disable=too-many-branches
889     def parse_stats(self):
890         """ Parse the stats from pr all command
891         :return: Dictionary of all stats
892         """
893         statdict = {}
894         for line in self._stats:
895             param = line.split()
896             if param[1] == 'PR_TOTAL':
897                 statdict['pr_total'] = self._pack_stats(param, 2)
898             elif param[1] == 'PR_NOTPLD':
899                 statdict['pr_notpld'] = self._pack_stats(param, 2,)
900             elif param[1] == 'PR_EXTRA':
901                 statdict['pr_extra'] = self._pack_rxextra_stats(param, 2)
902             elif param[1] == 'PT_STREAM':
903                 entry_id = "pt_stream_%s" % param[2].strip('[]')
904                 statdict[entry_id] = self._pack_stats(param, 3)
905             elif param[1] == 'PR_TPLDS':
906                 tid_list = self._pack_tplds_stats(param, 2)
907                 if len(tid_list):
908                     statdict['pr_tplds'] = tid_list
909             elif param[1] == 'PR_TPLDTRAFFIC':
910                 if 'pr_tpldstraffic' in statdict:
911                     data = statdict['pr_tpldstraffic']
912                 else:
913                     data = {}
914                 entry_id = param[2].strip('[]')
915                 data[entry_id] = self._pack_stats(param, 3)
916                 statdict['pr_tpldstraffic'] = data
917             elif param[1] == 'PR_TPLDERRORS':
918                 if 'pr_tplderrors' in statdict:
919                     data = statdict['pr_tplderrors']
920                 else:
921                     data = {}
922                 entry_id = param[2].strip('[]')
923                 data[entry_id] = self._pack_tplderrors_stats(param, 3)
924                 statdict['pr_tplderrors'] = data
925             elif param[1] == 'PR_TPLDLATENCY':
926                 if 'pr_tpldlatency' in statdict:
927                     data = statdict['pr_tpldlatency']
928                 else:
929                     data = {}
930                 entry_id = param[2].strip('[]')
931                 data[entry_id] = self._pack_tpldlatency_stats(param, 3)
932                 statdict['pr_tpldlatency'] = data
933             elif param[1] == 'PR_TPLDJITTER':
934                 if 'pr_tpldjitter' in statdict:
935                     data = statdict['pr_tpldjitter']
936                 else:
937                     data = {}
938                 entry_id = param[2].strip('[]')
939                 data[entry_id] = self._pack_tpldjitter_stats(param, 3)
940                 statdict['pr_pldjitter'] = data
941             elif param[1] == 'PR_FILTER':
942                 if 'pr_filter' in statdict:
943                     data = statdict['pr_filter']
944                 else:
945                     data = {}
946                 entry_id = param[2].strip('[]')
947                 data[entry_id] = self._pack_stats(param, 3)
948                 statdict['pr_filter'] = data
949             elif param[1] == 'P_RECEIVESYNC':
950                 if param[2] == 'IN_SYNC':
951                     statdict['p_receivesync'] = {'IN SYNC': 'True'}
952                 else:
953                     statdict['p_receivesync'] = {'IN SYNC': 'False'}
954             else:
955                 logging.warning("XenaPort: unknown stats: %s", param[1])
956
957         mydict = statdict
958         return mydict
959
960
961 class XenaTXStats(object):
962     """
963     Xena transmit stat class
964     """
965     def __init__(self, stats, epoc):
966         """ Constructor
967         :param stats: Stats from pt all command as list
968         :param epoc: Current time in epoc
969         :return: XenaTXStats object
970         """
971         self._stats = stats
972         self._time = epoc
973         self._ptstreamkeys = list()
974         self.data = self.parse_stats()
975         self.preamble = 8
976
977     @staticmethod
978     def _pack_stats(params, start, fields=None):
979         """ Pack up the list of stats in a dictionary
980         :param params: The list of params to process
981         :param start: What element to start at
982         :param fields: The field names to pack as keys
983         :return: Dictionary of data where fields match up to the params
984         """
985         if not fields:
986             fields = ['bps', 'pps', 'bytes', 'packets']
987         data = {}
988         i = 0
989         for column in fields:
990             data[column] = int(params[start + i])
991             i += 1
992
993         return data
994
995     def _pack_txextra_stats(self, params, start):
996         """ Pack up the tx extra stats
997         :param params: List of params to pack
998         :param start: What element to start at
999         :return: Dictionary of stats
1000         """
1001         fields = ['arprequests', 'arpreplies', 'pingrequests', 'pingreplies',
1002                   'injectedfcs', 'injectedseq', 'injectedmis', 'injectedint',
1003                   'injectedtid', 'training']
1004         return self._pack_stats(params, start, fields)
1005
1006     @property
1007     def pt_stream_keys(self):
1008         """
1009         :return: Return a list of pt_stream_x stream key ids
1010         """
1011         return self._ptstreamkeys
1012
1013     @property
1014     def time(self):
1015         """
1016         :return: Time as String of epoc of when stats were collected
1017         """
1018         return self._time
1019
1020     def parse_stats(self):
1021         """ Parse the stats from pr all command
1022         :return: Dictionary of all stats
1023         """
1024         statdict = {}
1025         for line in self._stats:
1026             param = line.split()
1027             if param[1] == 'PT_TOTAL':
1028                 statdict['pt_total'] = self._pack_stats(param, 2)
1029             elif param[1] == 'PT_NOTPLD':
1030                 statdict['pt_notpld'] = self._pack_stats(param, 2,)
1031             elif param[1] == 'PT_EXTRA':
1032                 statdict['pt_extra'] = self._pack_txextra_stats(param, 2)
1033             elif param[1] == 'PT_STREAM':
1034                 entry_id = "pt_stream_%s" % param[2].strip('[]')
1035                 self._ptstreamkeys.append(entry_id)
1036                 statdict[entry_id] = self._pack_stats(param, 3)
1037             else:
1038                 logging.warning("XenaPort: unknown stats: %s", param[1])
1039         mydict = statdict
1040         return mydict
1041
1042 def aggregate_stats(stat1, stat2):
1043     """
1044     Judge whether stat1 and stat2 both have same key, if both have same key,
1045     call the aggregate fuction, else use the stat1's value
1046     """
1047     newstat = dict()
1048     for keys in stat1.keys():
1049         if keys in stat2 and isinstance(stat1[keys], dict):
1050             newstat[keys] = aggregate(stat1[keys], stat2[keys])
1051         else:
1052             newstat[keys] = stat1[keys]
1053     return newstat
1054
1055 def aggregate(stat1, stat2):
1056     """
1057     Recursive function to aggregate two sets of statistics. This is used when
1058     bi directional traffic is done and statistics need to be calculated based
1059     on two sets of statistics.
1060     :param stat1: One set of dictionary stats from RX or TX stats
1061     :param stat2: Second set of dictionary stats from RX or TX stats
1062     :return: stats for data entry in RX or TX Stats instance
1063     """
1064     newstat = dict()
1065     for (keys1, keys2) in zip(stat1.keys(), stat2.keys()):
1066         if isinstance(stat1[keys1], dict):
1067             newstat[keys1] = aggregate(stat1[keys1], stat2[keys2])
1068         else:
1069             if not isinstance(stat1[keys1], int) and not isinstance(
1070                     [keys1], float):
1071                 # its some value we don't need to aggregate
1072                 return stat1[keys1]
1073             # for latency stats do the appropriate calculation
1074             if keys1 == 'max':
1075                 newstat[keys1] = max(stat1[keys1], stat2[keys2])
1076             elif keys1 == 'min':
1077                 newstat[keys1] = min(stat1[keys1], stat2[keys2])
1078             elif keys1 == 'avg':
1079                 newstat[keys1] = (stat1[keys1] + stat2[keys2]) / 2
1080             else:
1081                 newstat[keys1] = (stat1[keys1] + stat2[keys2])
1082     return newstat
1083
1084
1085 def line_percentage(port, stats, time_active, packet_size):
1086     """
1087     Calculate the line percentage rate from the duration, port object and stat
1088     object.
1089     :param port: XenaPort object
1090     :param stats: Xena RXStat or TXStat object
1091     :param time_active: time the stream was active in secs as int
1092     :param packet_size: packet size as int
1093     :return: line percentage as float
1094     """
1095     # this is ugly, but its prettier than calling the get method 3 times...
1096     try:
1097         packets = stats.data['pr_total']['packets']
1098     except KeyError:
1099         try:
1100             packets = stats.data['pt_total']['packets']
1101         except KeyError:
1102             _LOGGER.error(
1103                 'Could not calculate line rate because packet stat not found.')
1104             return 0
1105     ifg = port.get_inter_frame_gap()
1106     pps = packets_per_second(packets, time_active)
1107     l2br = l2_bit_rate(packet_size, stats.preamble, pps)
1108     l1br = l1_bit_rate(l2br, pps, ifg, stats.preamble)
1109     return 100.0 * l1br / port.get_effective_speed()
1110
1111
1112 def l2_bit_rate(packet_size, preamble, pps):
1113     """
1114     Return the l2 bit rate
1115     :param packet_size: packet size on the line in bytes
1116     :param preamble: preamble size of the packet header in bytes
1117     :param pps: packets per second
1118     :return: l2 bit rate as float
1119     """
1120     return (packet_size * preamble) * pps
1121
1122
1123 def l1_bit_rate(l2br, pps, ifg, preamble):
1124     """
1125     Return the l1 bit rate
1126     :param l2br: l2 bit rate int bits per second
1127     :param pps: packets per second
1128     :param ifg: the inter frame gap
1129     :param preamble: preamble size of the packet header in bytes
1130     :return: l1 bit rate as float
1131     """
1132     return l2br + (pps * ifg * preamble)
1133
1134
1135 def make_manager_command(cmd, argument=None):
1136     """ String builder for Xena socket commands
1137
1138     :param cmd: Command to send
1139     :param argument: Arguments for command to send
1140     :return: String of command
1141     """
1142     command = '{} "{}"'.format(cmd, argument) if argument else cmd
1143     _LOGGER.info("[Command Sent] : %s", command)
1144     return command
1145
1146
1147 def make_port_command(cmd, xena_port):
1148     """ String builder for Xena port commands
1149
1150     :param cmd: Command to send
1151     :param xena_port: XenaPort object
1152     :return: String of command
1153     """
1154     command = "{} {}".format(xena_port.port_string(), cmd)
1155     _LOGGER.info("[Command Sent] : %s", command)
1156     return command
1157
1158
1159 def make_stream_command(cmd, args, xena_stream):
1160     """ String builder for Xena port commands
1161
1162     :param cmd: Command to send
1163     :param xena_stream: XenaStream object
1164     :return: String of command
1165     """
1166     command = "{} {} [{}] {}".format(xena_stream.xena_port.port_string(), cmd,
1167                                      xena_stream.stream_id, args)
1168     _LOGGER.info("[Command Sent] : %s", command)
1169     return command
1170
1171
1172 def packets_per_second(packets, time_in_sec):
1173     """
1174     Return the pps as float
1175     :param packets: total packets
1176     :param time_in_sec: time in seconds
1177     :return: float of pps
1178     """
1179     return packets / time_in_sec