Merge "Tools: Improve Stability."
[vswitchperf.git] / tools / pkt_gen / xena / XenaDriver.py
1 # Copyright 2016-2017 Red Hat Inc & Xena Networks.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #   http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 # This is a port of code by Xena and Flavios ported to python 3 compatibility.
16 # Credit given to Xena and Flavio for providing most of the logic of this code.
17 # The code has changes for PEP 8 and python 3 conversion. Added Stat classes
18 # for better scaling of future requirements. Also added calculation functions
19 # for line rate to align within VSPerf project.
20 # Flavios xena libraries available at https://github.com/fleitner/XenaPythonLib
21
22 # Contributors:
23 #   Flavio Leitner, Red Hat Inc.
24 #   Dan Amzulescu, Xena Networks
25 #   Christian Trautman, Red Hat Inc.
26
27 """
28 Xena Socket API Driver module for communicating directly with Xena system
29 through socket commands and returning different statistics.
30 """
31 import locale
32 import logging
33 import math
34 import socket
35 import struct
36 import sys
37 import threading
38 import time
39 # pylint: disable=too-many-lines
40 # Xena Socket Commands
41 CMD_CLEAR_RX_STATS = 'pr_clear'
42 CMD_CLEAR_TX_STATS = 'pt_clear'
43 CMD_COMMENT = ';'
44 CMD_CREATE_STREAM = 'ps_create'
45 CMD_DELETE_STREAM = 'ps_delete'
46 CMD_GET_PORT_SPEED = 'p_speed ?'
47 CMD_GET_PORT_SPEED_REDUCTION = 'p_speedreduction ?'
48 CMD_GET_RX_STATS_PER_TID = 'pr_tpldtraffic'
49 CMD_GET_STREAM_DATA = 'pt_stream'
50 CMD_GET_STREAMS_PER_PORT = 'ps_indices'
51 CMD_GET_TID_PER_STREAM = 'ps_tpldid'
52 CMD_GET_TX_STATS_PER_STREAM = 'pt_stream'
53 CMD_GET_RX_STATS = 'pr_all ?'
54 CMD_GET_TX_STATS = 'pt_all ?'
55 CMD_INTERFRAME_GAP = 'p_interframegap'
56 CMD_LOGIN = 'c_logon'
57 CMD_LOGOFF = 'c_logoff'
58 CMD_OWNER = 'c_owner'
59 CMD_PORT = ';Port:'
60 CMD_PORT_IP = 'p_ipaddress'
61 CMD_PORT_LEARNING = 'p_autotrain'
62 CMD_RESERVE = 'p_reservation reserve'
63 CMD_RELEASE = 'p_reservation release'
64 CMD_RELINQUISH = 'p_reservation relinquish'
65 CMD_RESET = 'p_reset'
66 CMD_SET_PORT_ARP_REPLY = 'p_arpreply'
67 CMD_SET_PORT_ARP_V6_REPLY = 'p_arpv6reply'
68 CMD_SET_PORT_PING_REPLY = 'p_pingreply'
69 CMD_SET_PORT_PING_V6_REPLY = 'p_pingv6reply'
70 CMD_SET_PORT_TIME_LIMIT = 'p_txtimelimit'
71 CMD_SET_STREAM_HEADER_PROTOCOL = 'ps_headerprotocol'
72 CMD_SET_STREAM_ON_OFF = 'ps_enable'
73 CMD_SET_STREAM_PACKET_HEADER = 'ps_packetheader'
74 CMD_SET_STREAM_PACKET_LENGTH = 'ps_packetlength'
75 CMD_SET_STREAM_PACKET_LIMIT = 'ps_packetlimit'
76 CMD_SET_STREAM_PACKET_PAYLOAD = 'ps_payload'
77 CMD_SET_STREAM_RATE_FRACTION = 'ps_ratefraction'
78 CMD_SET_STREAM_TEST_PAYLOAD_ID = 'ps_tpldid'
79 CMD_SET_TPLD_MODE = 'p_tpldmode'
80 CMD_START_TRAFFIC = 'p_traffic on'
81 CMD_STOP_TRAFFIC = 'p_traffic off'
82 CMD_STREAM_MODIFIER = 'ps_modifier'
83 CMD_STREAM_MODIFIER_COUNT = 'ps_modifiercount'
84 CMD_STREAM_MODIFIER_RANGE = 'ps_modifierrange'
85 CMD_VERSION = 'c_versionno ?'
86
87 _LOCALE = locale.getlocale()[1]
88 _LOGGER = logging.getLogger(__name__)
89
90 class ModSet(object):
91     """
92     Mod set attribute tracker
93     """
94     def __init__(self, **kwargs):
95         """ Constructor
96         All mods default to False
97         :param kwargs: Any class attribute can be set here.
98         """
99         self.mod_src_mac = False
100         self.mod_dst_mac = False
101         self.mod_src_ip = False
102         self.mod_dst_ip = False
103         self.mod_src_port = False
104         self.mod_dst_port = False
105
106         for (key, value) in kwargs.items():
107             if hasattr(self, key):
108                 setattr(self, key, value)
109
110
111 class SimpleSocket(object):
112     """
113     Socket class
114     """
115     def __init__(self, hostname, port=5025, timeout=1):
116         """Constructor
117         :param hostname: hostname or ip as string
118         :param port: port number to use for socket as int
119         :param timeout: socket timeout as int
120         :return: SimpleSocket object
121         """
122         self.hostname = hostname
123         try:
124             self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
125             self.sock.settimeout(timeout)
126             self.sock.connect((hostname, port))
127         except socket.error as msg:
128             _LOGGER.error(
129                 "Cannot connect to Xena Socket at %s", hostname)
130             _LOGGER.error("Exception : %s", msg)
131             sys.exit(1)
132
133     def __del__(self):
134         """Deconstructor
135         :return:
136         """
137         self.sock.close()
138
139     def ask(self, cmd):
140         """ Send the command over the socket
141         :param cmd: cmd as string
142         :return: byte utf encoded return value from socket
143         """
144         cmd += '\n'
145         try:
146             self.sock.send(cmd.encode('utf-8'))
147             return self.sock.recv(1024)
148         except OSError:
149             return ''
150
151     def read_reply(self):
152         """ Get the response from the socket
153         :return: Return the reply
154         """
155         reply = self.sock.recv(1024)
156         if reply.find("---^".encode('utf-8')) != -1:
157             # read again the syntax error msg
158             reply = self.sock.recv(1024)
159         return reply
160
161     def send_command(self, cmd):
162         """ Send the command specified over the socket
163         :param cmd: Command to send as string
164         :return: None
165         """
166         cmd += '\n'
167         self.sock.send(cmd.encode('utf-8'))
168
169     def set_keep_alive(self):
170         """ Set the keep alive for the socket
171         :return: None
172         """
173         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
174
175
176 class KeepAliveThread(threading.Thread):
177     """
178     Keep alive socket class
179     """
180     message = ''
181
182     def __init__(self, connection, interval=10):
183         """ Constructor
184         :param connection: Socket for keep alive
185         :param interval: interval in seconds to send keep alive
186         :return: KeepAliveThread object
187         """
188         threading.Thread.__init__(self)
189         self.connection = connection
190         self.interval = interval
191         self.finished = threading.Event()
192         self.setDaemon(True)
193         _LOGGER.debug(
194             'Xena Socket keep alive thread initiated, interval %s seconds', self.interval)
195
196     def stop(self):
197         """ Thread stop. See python thread docs for more info
198         :return: None
199         """
200         self.finished.set()
201         self.join()
202
203     def run(self):
204         """ Thread start. See python thread docs for more info
205         :return: None
206         """
207         while not self.finished.isSet():
208             self.finished.wait(self.interval)
209             self.connection.ask(self.message)
210
211
212 class XenaSocketDriver(SimpleSocket):
213     """
214     Xena socket class
215     """
216     reply_ok = '<OK>'
217
218     def __init__(self, hostname, port=22611):
219         """ Constructor
220         :param hostname: Hostname or ip as string
221         :param port: port to use as int
222         :return: XenaSocketDriver object
223         """
224         SimpleSocket.__init__(self, hostname=hostname, port=port)
225         SimpleSocket.set_keep_alive(self)
226         self.access_semaphor = threading.Semaphore(1)
227
228     def ask(self, cmd):
229         """ Send the command over the socket in a thread safe manner
230         :param cmd: Command to send
231         :return: reply from socket
232         """
233         self.access_semaphor.acquire()
234         reply = SimpleSocket.ask(self, cmd)
235         self.access_semaphor.release()
236         return reply
237
238     def ask_verify(self, cmd):
239         """ Send the command over the socket in a thread safe manner and
240         verify the response is good.
241         :param cmd: Command to send
242         :return: Boolean True if command response is good, False otherwise
243         """
244         resp = self.ask(cmd).decode(_LOCALE).strip('\n')
245         _LOGGER.info('[ask_verify] %s', resp)
246         if resp == self.reply_ok:
247             return True
248         return False
249
250     def disconnect(self):
251         """
252         Close the socket connection
253         :return: None
254         """
255         self.sock.close()
256
257     def send_command(self, cmd):
258         """ Send the command over the socket with no return
259         :param cmd: Command to send
260         :return: None
261         """
262         self.access_semaphor.acquire()
263         SimpleSocket.send_command(self, cmd)
264         self.access_semaphor.release()
265
266     def send_query_replies(self, cmd):
267         """ Send the command over the socket and wait for all replies and return
268         the lines as a list
269         :param cmd: Command to send
270         :return: Response from command as list
271         """
272         # send the command followed by cmd SYNC to find out
273         # when the last reply arrives.
274         self.send_command(cmd.strip('\n'))
275         self.send_command('SYNC')
276         replies = []
277         self.access_semaphor.acquire()
278         msg = SimpleSocket.read_reply(self).decode(_LOCALE)
279         msgleft = ''
280         while True:
281             if '\n' in msg:
282                 (reply, msgleft) = msg.split('\n', 1)
283                 # check for syntax problems
284                 if reply.rfind('Syntax') != -1:
285                     self.access_semaphor.release()
286                     return []
287
288                 if reply.rfind('<SYNC>') == 0:
289
290                     self.access_semaphor.release()
291                     return replies
292
293                 replies.append(reply + '\n')
294                 msg = msgleft
295             else:
296                 # more bytes to come
297                 msgnew = SimpleSocket.read_reply(self).decode(_LOCALE)
298                 msg = msgleft + msgnew
299
300
301 class XenaManager(object):
302     """
303     Manager class for port and socket functions
304     """
305     def __init__(self, socketDriver, user='', password='xena'):
306         """Constructor
307
308         Establish a connection to Xena using a ``driver`` with the ``password``
309         supplied.
310
311         Attributes:
312         :param socketDriver: XenaSocketDriver connection object
313         :param password: Password to the Xena traffic generator
314         :returns: XenaManager object
315         """
316         self.driver = socketDriver
317         self.ports = list()
318         self.keep_alive_thread = KeepAliveThread(self.driver)
319
320         if self.logon(password):
321             _LOGGER.info('Connected to Xena at %s', self.driver.hostname)
322         else:
323             _LOGGER.error('Failed to logon to Xena at %s', self.driver.hostname)
324             return
325
326         self.set_owner(user)
327
328     def disconnect(self):
329         """ Release ports and disconnect from chassis.
330         """
331         for module_port in self.ports:
332             module_port.release_port()
333         self.ports = []
334         self.logoff()
335         self.keep_alive_thread.stop()
336
337     def add_module_port(self, module, port):
338         """Factory for Xena Ports
339
340         :param module: String or int of module
341         :param port: String or int of port
342         :return: XenaPort object if success, None if port already added
343         """
344         xenaport = XenaPort(self, module, port)
345         if xenaport in self.ports:
346             return None
347         else:
348             self.ports.append(xenaport)
349             return xenaport
350
351     def get_module_port(self, module, port):
352         """Return the Xena Port object if available
353         :param module: module number as int or str
354         :param port: port number as int or str
355         :return: XenaPort object or None if not found
356         """
357         for por in self.ports:
358             if por.port == str(port) and por.module == str(module):
359                 return por
360         return None
361
362     def get_version(self):
363         """
364         Get the version from the chassis
365         :return: versions of server and driver as string
366         """
367         res = self.driver.ask(make_manager_command(
368             CMD_VERSION, '')).decode(_LOCALE)
369         res = res.rstrip('\n').split()
370         return "Server: {} Driver: {}".format(res[1], res[2])
371
372     def logoff(self):
373         """
374         Logoff from the Xena chassis
375         :return: Boolean True if response OK, False if error.
376         """
377         return self.driver.ask_verify(make_manager_command(CMD_LOGOFF))
378
379     def logon(self, password):
380         """Login to the Xena traffic generator using the ``password`` supplied.
381
382         :param password: string of password
383         :return: Boolean True if response OK, False if error.
384         """
385         self.keep_alive_thread.start()
386         return self.driver.ask_verify(make_manager_command(CMD_LOGIN, password))
387
388     def set_owner(self, username):
389         """Set the ports owner.
390         :return: Boolean True if response OK, False if error.
391         """
392         return self.driver.ask_verify(make_manager_command(CMD_OWNER, username))
393
394 # pylint: disable=too-many-public-methods
395 class XenaPort(object):
396     """
397     Xena Port emulator class
398     """
399     def __init__(self, manager, module, port):
400         """Constructor
401
402         :param manager: XenaManager object
403         :param module: Module as string or int of module to use
404         :param port: Port as string or int of port to use
405         :return: XenaPort object
406         """
407         self._manager = manager
408         self._module = str(module)
409         self._port = str(port)
410         self._streams = list()
411
412     @property
413     def manager(self):
414         """Property for manager attribute
415         :return: manager object
416         """
417         return self._manager
418
419     @property
420     def module(self):
421         """Property for module attribute
422         :return: module value as string
423         """
424         return self._module
425
426     @property
427     def port(self):
428         """Property for port attribute
429         :return: port value as string
430         """
431         return self._port
432
433     def port_string(self):
434         """String builder with attributes
435         :return: String of module port for command sequence
436         """
437         stringify = "{}/{}".format(self._module, self._port)
438         return stringify
439
440     def add_stream(self):
441         """Add a stream to the port.
442         :return: XenaStream object, None if failure
443         """
444         identifier = len(self._streams)
445         stream = XenaStream(self, identifier)
446         if self._manager.driver.ask_verify(make_stream_command(
447                 CMD_CREATE_STREAM, '', stream)):
448             self._streams.append(stream)
449             return stream
450         else:
451             _LOGGER.error("Error during stream creation")
452             return None
453
454     def clear_stats(self, rx_clear=True, tx_clear=True):
455         """Clear the port stats
456
457         :param rx_clear: Boolean if rx stats are to be cleared
458         :param tx_clear: Boolean if tx stats are to be cleared
459         :return: Boolean True if response OK, False if error.
460         """
461         command = make_port_command(CMD_CLEAR_RX_STATS, self)
462         res1 = self._manager.driver.ask_verify(command) if rx_clear else True
463         command = make_port_command(CMD_CLEAR_TX_STATS, self)
464         res2 = self._manager.driver.ask_verify(command) if tx_clear else True
465         return all([res1, res2])
466
467     def get_effective_speed(self):
468         """
469         Get the effective speed on the port
470         :return: effective speed as float
471         """
472         port_speed = self.get_port_speed()
473         reduction = self.get_port_speed_reduction()
474         effective_speed = port_speed * (1.0 - reduction / 1000000.0)
475         return effective_speed
476
477     def get_inter_frame_gap(self):
478         """
479         Get the interframe gap and return it as string
480         :return: integer of interframe gap
481         """
482         command = make_port_command(CMD_INTERFRAME_GAP + '?', self)
483         res = self._manager.driver.ask(command).decode(_LOCALE)
484         res = int(res.rstrip('\n').split(' ')[-1])
485         return res
486
487     def get_port_speed(self):
488         """
489         Get the port speed as bits from port and return it as a int.
490         :return: Int of port speed
491         """
492         command = make_port_command(CMD_GET_PORT_SPEED, self)
493         res = self._manager.driver.ask(command).decode(_LOCALE)
494         port_speed = res.split(' ')[-1].rstrip('\n')
495         return int(port_speed) * 1000000
496
497     def get_port_speed_reduction(self):
498         """
499         Get the port speed reduction value as int
500         :return: Integer of port speed reduction value
501         """
502         command = make_port_command(CMD_GET_PORT_SPEED_REDUCTION, self)
503         res = self._manager.driver.ask(command).decode(_LOCALE)
504         res = int(res.rstrip('\n').split(' ')[-1])
505         return res
506
507     def get_rx_stats(self):
508         """Get the rx stats and return the data as a dict.
509         :return: Receive stats as dictionary
510         """
511         command = make_port_command(CMD_GET_RX_STATS, self)
512         rx_data = self._manager.driver.send_query_replies(command)
513         data = XenaRXStats(rx_data, time.time())
514         return data
515
516     def get_tx_stats(self):
517         """Get the tx stats and return the data as a dict.
518         :return: Receive stats as dictionary
519         """
520         command = make_port_command(CMD_GET_TX_STATS, self)
521         tx_data = self._manager.driver.send_query_replies(command)
522         data = XenaTXStats(tx_data, time.time())
523         return data
524
525     def micro_tpld_disable(self):
526         """Disable micro TPLD and return to standard payload size
527         :return: Boolean if response OK, False if error
528         """
529         command = make_port_command(CMD_SET_TPLD_MODE + ' normal', self)
530         return self._manager.driver.ask_verify(command)
531
532     def micro_tpld_enable(self):
533         """Enable micro TPLD 6 byte payloads.
534         :Return Boolean if response OK, False if error
535         """
536         command = make_port_command(CMD_SET_TPLD_MODE + ' micro', self)
537         return self._manager.driver.ask_verify(command)
538
539     def release_port(self):
540         """Release the port
541         :return: Boolean True if response OK, False if error.
542         """
543         command = make_port_command(CMD_RELEASE, self)
544         return self._manager.driver.ask_verify(command)
545
546     def reserve_port(self):
547         """Reserve the port
548         :return: Boolean True if response OK, False if error.
549         """
550         command = make_port_command(CMD_RESERVE, self)
551         return self._manager.driver.ask_verify(command)
552
553     def reset_port(self):
554         """Reset the port
555         :return: Boolean True if response OK, False if error.
556         """
557         command = make_port_command(CMD_RESET, self)
558         return self._manager.driver.ask_verify(command)
559
560     def set_port_arp_reply(self, is_on=True, ipv6=False):
561         """
562         Set the port arpreply value
563         :param on: Enable or disable the arp reply on the port
564         :param v6: set the value on the ip v6, disabled will set at ip v4
565         :return: Boolean True if response OK, False if error
566         """
567         command = make_port_command('{} {}'.format(
568             CMD_SET_PORT_ARP_V6_REPLY if ipv6 else CMD_SET_PORT_ARP_REPLY,
569             "on" if is_on else "off"), self)
570         return self._manager.driver.ask_verify(command)
571
572     def set_port_ping_reply(self, is_on=True, ipv6=False):
573         """
574         Set the port ping reply value
575         :param on: Enable or disable the ping reply on the port
576         :param v6: set the value on the ip v6, disabled will set at ip v4
577         :return: Boolean True if response OK, False if error
578         """
579         command = make_port_command('{} {}'.format(
580             CMD_SET_PORT_PING_V6_REPLY if ipv6 else CMD_SET_PORT_PING_REPLY,
581             "on" if is_on else "off"), self)
582         return self._manager.driver.ask_verify(command)
583
584     def set_port_learning(self, interval):
585         """Start port learning with the interval in seconds specified. 0 disables port learning
586         :param: interval as int
587         :return: Boolean True if response OK, False if error.
588         """
589         command = make_port_command('{} {}'.format(CMD_PORT_LEARNING, interval), self)
590         return self._manager.driver.ask_verify(command)
591
592     def set_port_ip(self, ip_addr, cidr, gateway, wild='255'):
593         """
594         Set the port ip address of the specific port
595         :param ip_addr: IP address to set to port
596         :param cidr: cidr number for the subnet
597         :param gateway: Gateway ip for port
598         :param wild: wildcard used for ARP and PING replies
599         :return: Boolean True if response OK, False if error
600         """
601         # convert the cidr to a dot notation subnet address
602         subnet = socket.inet_ntoa(
603             struct.pack(">I", (0xffffffff << (32 - cidr)) & 0xffffffff))
604
605         command = make_port_command('{} {} {} {} 0.0.0.{}'.format(
606             CMD_PORT_IP, ip_addr, subnet, gateway, wild), self)
607         return self._manager.driver.ask_verify(command)
608
609     def set_port_time_limit(self, micro_seconds):
610         """Set the port time limit in ms
611         :param micro_seconds: ms for port time limit
612         :return: Boolean True if response OK, False if error.
613         """
614         command = make_port_command('{} {}'.format(
615             CMD_SET_PORT_TIME_LIMIT, micro_seconds), self)
616         return self._manager.driver.ask_verify(command)
617
618     def traffic_off(self):
619         """Start traffic
620         :return: Boolean True if response OK, False if error.
621         """
622         command = make_port_command(CMD_STOP_TRAFFIC, self)
623         return self._manager.driver.ask_verify(command)
624
625     def traffic_on(self):
626         """Stop traffic
627         :return: Boolean True if response OK, False if error.
628         """
629         command = make_port_command(CMD_START_TRAFFIC, self)
630         return self._manager.driver.ask_verify(command)
631
632
633 class XenaStream(object):
634     """
635     Xena stream emulator class
636     """
637     def __init__(self, xenaPort, streamID):
638         """Constructor
639
640         :param xenaPort: XenaPort object
641         :param streamID: Stream ID as int or string
642         :return: XenaStream object
643         """
644         self._xena_port = xenaPort
645         self._stream_id = str(streamID)
646         self._manager = self._xena_port.manager
647         self._header_protocol = None
648
649     @property
650     def xena_port(self):
651         """Property for port attribute
652         :return: XenaPort object
653         """
654         return self._xena_port
655
656     @property
657     def stream_id(self):
658         """Property for streamID attribute
659         :return: streamID value as string
660         """
661         return self._stream_id
662
663     def enable_multistream(self, flows, mod_class):
664         """
665         Implementation of multi stream. Enable multi stream by setting
666         modifiers on the stream. If no mods are selected, src_ip mod will be used.
667         :param flows: Numbers of flows, Values greater than 65535 will square rooted
668                       to the closest value. Xena mods are limited to 4 bytes.
669         :param mod_class: ModSet object
670         :return: True if success False otherwise
671         """
672         if not self._header_protocol:
673             raise RuntimeError(
674                 "Please set a protocol header before calling this method.")
675         # maximum value for a Xena modifier is 65535 (unsigned int). If flows
676         # is greater than 65535 we have to do two mods getting as close as we
677         # can with square rooting the flow count.
678         if flows > 4294836225:
679             _LOGGER.debug('Flow mods exceeds highest value, changing to 4294836225')
680             flows = 4294836225
681         if flows <= 65535:
682             mod1 = flows
683             mod2 = 0
684         else:
685             mod1, mod2 = int(math.sqrt(flows)), int(math.sqrt(flows))
686             _LOGGER.debug('Flow count modified to %s', mod1*mod2)
687         offset_list = list()
688         if not any([mod_class.mod_src_mac, mod_class.mod_dst_mac, mod_class.mod_src_ip,
689                     mod_class.mod_dst_ip, mod_class.mod_src_port, mod_class.mod_dst_port]):
690             # no mods were selected, default to src ip only
691             mod_class.mod_src_ip = True
692         if mod_class.mod_src_mac:
693             offset_list.append(3)
694         if mod_class.mod_dst_mac:
695             offset_list.append(9)
696         if mod_class.mod_src_ip:
697             offset_list.append(32 if 'VLAN' in self._header_protocol else 28)
698         if mod_class.mod_dst_ip:
699             offset_list.append(36 if 'VLAN' in self._header_protocol else 32)
700         if mod_class.mod_src_port:
701             offset_list.append(38 if 'VLAN' in self._header_protocol else 34)
702         if mod_class.mod_dst_port:
703             offset_list.append(40 if 'VLAN' in self._header_protocol else 36)
704         # calculate how many mods we have to do
705         countertotal = len(offset_list)
706         if mod2:
707             # to handle flows greater than 65535 we will need more mods for
708             # layer 2 and 3
709             for mod in [mod_class.mod_src_mac, mod_class.mod_dst_mac,
710                         mod_class.mod_src_ip, mod_class.mod_dst_ip]:
711                 if mod:
712                     countertotal += 1
713         command = make_port_command(
714             CMD_STREAM_MODIFIER_COUNT + ' [{}]'.format(self._stream_id) +
715             ' {}'.format(countertotal), self._xena_port)
716         responses = list()
717         responses.append(self._manager.driver.ask_verify(command))
718         modcounter = 0
719         for offset in offset_list:
720             if (mod_class.mod_dst_port or mod_class.mod_src_port) and \
721                     (offset >= 38 if 'VLAN' in self._header_protocol else 34):
722                 # only do a 1 mod for udp ports at max 65535
723                 newmod1 = 65535 if flows >= 65535 else flows
724                 command = make_port_command(
725                     CMD_STREAM_MODIFIER + ' [{},{}] {} 0xFFFF0000 INC 1'.format(
726                         self._stream_id, modcounter, offset), self._xena_port)
727                 responses.append(self._manager.driver.ask_verify(command))
728                 command = make_port_command(
729                     CMD_STREAM_MODIFIER_RANGE + ' [{},{}] 0 1 {}'.format(
730                         self._stream_id, modcounter, newmod1 - 1), self._xena_port)
731                 responses.append(self._manager.driver.ask_verify(command))
732             else:
733                 command = make_port_command(
734                     CMD_STREAM_MODIFIER + ' [{},{}] {} 0xFFFF0000 INC 1'.format(
735                         self._stream_id, modcounter, offset), self._xena_port)
736                 responses.append(self._manager.driver.ask_verify(command))
737                 command = make_port_command(
738                     CMD_STREAM_MODIFIER_RANGE + ' [{},{}] 0 1 {}'.format(
739                         self._stream_id, modcounter, mod1 - 1), self._xena_port)
740                 responses.append(self._manager.driver.ask_verify(command))
741                 # if we have a second modifier set the modifier to mod2 and to
742                 # incremement once every full rotation of mod 1
743                 if mod2:
744                     modcounter += 1
745                     command = make_port_command(
746                         CMD_STREAM_MODIFIER + ' [{},{}] {} 0xFFFF0000 INC {}'.format(
747                             self._stream_id, modcounter, offset-2, mod1), self._xena_port)
748                     responses.append(self._manager.driver.ask_verify(command))
749                     command = make_port_command(
750                         CMD_STREAM_MODIFIER_RANGE + ' [{},{}] 0 1 {}'.format(
751                             self._stream_id, modcounter, mod2), self._xena_port)
752                     responses.append(self._manager.driver.ask_verify(command))
753             modcounter += 1
754         return all(responses)  # return True if they all worked
755
756     def get_stream_data(self):
757         """
758         Get the response for stream data
759         :return: String of response for stream data info
760         """
761         command = make_stream_command(CMD_GET_STREAM_DATA, '?', self)
762         res = self._manager.driver.ask(command).decode(_LOCALE)
763         return res
764
765     def set_header_protocol(self, protocol_header):
766         """Set the header info for the packet header hex.
767         If the packet header contains just Ethernet and IP info then call this
768         method with ETHERNET IP as the protocol header.
769
770         :param protocol_header: protocol header argument
771         :return: Boolean True if success, False if error
772         """
773         command = make_stream_command(
774             CMD_SET_STREAM_HEADER_PROTOCOL,
775             protocol_header, self)
776         if self._manager.driver.ask_verify(command):
777             self._header_protocol = protocol_header
778             return True
779         else:
780             return False
781
782     def set_off(self):
783         """Set the stream to off
784         :return: Boolean True if success, False if error
785         """
786         return self._manager.driver.ask_verify(make_stream_command(
787             CMD_SET_STREAM_ON_OFF, 'off', self))
788
789     def set_on(self):
790         """Set the stream to on
791         :return: Boolean True if success, False if error
792         """
793         return self._manager.driver.ask_verify(make_stream_command(
794             CMD_SET_STREAM_ON_OFF, 'on', self))
795
796     def set_packet_header(self, header):
797         """Set the stream packet header
798
799         :param header: packet header as hex bytes
800         :return: Boolean True if success, False if error
801         """
802         return self._manager.driver.ask_verify(make_stream_command(
803             CMD_SET_STREAM_PACKET_HEADER, header, self))
804
805     def set_packet_length(self, pattern_type, minimum, maximum):
806         """Set the pattern length with min and max values based on the pattern
807         type supplied
808
809         :param pattern_type: String of pattern type, valid entries [ fixed,
810          butterfly, random, mix, incrementing ]
811         :param minimum: integer of minimum byte value
812         :param maximum: integer of maximum byte value
813         :return: Boolean True if success, False if error
814         """
815         return self._manager.driver.ask_verify(make_stream_command(
816             CMD_SET_STREAM_PACKET_LENGTH, '{} {} {}'.format(
817                 pattern_type, minimum, maximum), self))
818
819     def set_packet_limit(self, limit):
820         """Set the packet limit
821
822         :param limit: number of packets that will be sent, use -1 to disable
823         :return: Boolean True if success, False if error
824         """
825         return self._manager.driver.ask_verify(make_stream_command(
826             CMD_SET_STREAM_PACKET_LIMIT, limit, self))
827
828     def set_packet_payload(self, payload_type, hex_value):
829         """Set the payload to the hex value based on the payload type
830
831         :param payload_type: string of the payload type, valid entries [ pattern,
832          incrementing, prbs ]
833         :param hex_value: hex string of valid hex
834         :return: Boolean True if success, False if error
835         """
836         return self._manager.driver.ask_verify(make_stream_command(
837             CMD_SET_STREAM_PACKET_PAYLOAD, '{} {}'.format(
838                 payload_type, hex_value), self))
839
840     def set_rate_fraction(self, fraction):
841         """Set the rate fraction
842
843         :param fraction: fraction for the stream
844         :return: Boolean True if success, False if error
845         """
846         return self._manager.driver.ask_verify(make_stream_command(
847             CMD_SET_STREAM_RATE_FRACTION, fraction, self))
848
849     def set_payload_id(self, identifier):
850         """ Set the test payload ID
851         :param identifier: ID as int or string
852         :return: Boolean True if success, False if error
853         """
854         return self._manager.driver.ask_verify(make_stream_command(
855             CMD_SET_STREAM_TEST_PAYLOAD_ID, identifier, self))
856
857
858 class XenaRXStats(object):
859     """
860     Receive stat class
861     """
862     def __init__(self, stats, epoc):
863         """ Constructor
864         :param stats: Stats from pr all command as list
865         :param epoc: Current time in epoc
866         :return: XenaRXStats object
867         """
868         self._stats = stats
869         self._time = epoc
870         self.data = self.parse_stats()
871         self.preamble = 8
872
873     @staticmethod
874     def _pack_stats(param, start, fields=None):
875         """ Pack up the list of stats in a dictionary
876         :param param: The list of params to process
877         :param start: What element to start at
878         :param fields: The field names to pack as keys
879         :return: Dictionary of data where fields match up to the params
880         """
881         if not fields:
882             fields = ['bps', 'pps', 'bytes', 'packets']
883         data = {}
884         i = 0
885         for column in fields:
886             data[column] = int(param[start + i])
887             i += 1
888
889         return data
890
891     @staticmethod
892     def _pack_tplds_stats(param, start):
893         """ Pack up the tplds stats
894         :param param: List of params to pack
895         :param start: What element to start at
896         :return: Dictionary of stats
897         """
898         data = {}
899         i = 0
900         for val in range(start, len(param) - start):
901             data[i] = int(param[val])
902             i += 1
903         return data
904
905     def _pack_rxextra_stats(self, param, start):
906         """ Pack up the extra stats
907         :param param: List of params to pack
908         :param start: What element to start at
909         :return: Dictionary of stats
910         """
911         fields = ['fcserrors', 'pauseframes', 'arprequests', 'arpreplies',
912                   'pingrequests', 'pingreplies', 'gapcount', 'gapduration']
913         return self._pack_stats(param, start, fields)
914
915     def _pack_tplderrors_stats(self, param, start):
916         """ Pack up tlpd errors
917         :param param: List of params to pack
918         :param start: What element to start at
919         :return: Dictionary of stats
920         """
921         fields = ['dummy', 'seq', 'mis', 'pld']
922         return self._pack_stats(param, start, fields)
923
924     def _pack_tpldlatency_stats(self, param, start):
925         """ Pack up the tpld latency stats
926         :param param: List of params to pack
927         :param start: What element to start at
928         :return: Dictionary of stats
929         """
930         fields = ['min', 'avg', 'max', '1sec']
931         return self._pack_stats(param, start, fields)
932
933     def _pack_tpldjitter_stats(self, param, start):
934         """ Pack up the tpld jitter stats
935         :param param: List of params to pack
936         :param start: What element to start at
937         :return: Dictionary of stats
938         """
939         fields = ['min', 'avg', 'max', '1sec']
940         return self._pack_stats(param, start, fields)
941
942     @property
943     def time(self):
944         """
945         :return: Time as String of epoc of when stats were collected
946         """
947         return self._time
948
949     # pylint: disable=too-many-branches
950     def parse_stats(self):
951         """ Parse the stats from pr all command
952         :return: Dictionary of all stats
953         """
954         statdict = {}
955         for line in self._stats:
956             param = line.split()
957             if param[1] == 'PR_TOTAL':
958                 statdict['pr_total'] = self._pack_stats(param, 2)
959             elif param[1] == 'PR_NOTPLD':
960                 statdict['pr_notpld'] = self._pack_stats(param, 2,)
961             elif param[1] == 'PR_EXTRA':
962                 statdict['pr_extra'] = self._pack_rxextra_stats(param, 2)
963             elif param[1] == 'PT_STREAM':
964                 entry_id = "pt_stream_%s" % param[2].strip('[]')
965                 statdict[entry_id] = self._pack_stats(param, 3)
966             elif param[1] == 'PR_TPLDS':
967                 tid_list = self._pack_tplds_stats(param, 2)
968                 if tid_list:
969                     statdict['pr_tplds'] = tid_list
970             elif param[1] == 'PR_TPLDTRAFFIC':
971                 if 'pr_tpldstraffic' in statdict:
972                     data = statdict['pr_tpldstraffic']
973                 else:
974                     data = {}
975                 entry_id = param[2].strip('[]')
976                 data[entry_id] = self._pack_stats(param, 3)
977                 statdict['pr_tpldstraffic'] = data
978             elif param[1] == 'PR_TPLDERRORS':
979                 if 'pr_tplderrors' in statdict:
980                     data = statdict['pr_tplderrors']
981                 else:
982                     data = {}
983                 entry_id = param[2].strip('[]')
984                 data[entry_id] = self._pack_tplderrors_stats(param, 3)
985                 statdict['pr_tplderrors'] = data
986             elif param[1] == 'PR_TPLDLATENCY':
987                 if 'pr_tpldlatency' in statdict:
988                     data = statdict['pr_tpldlatency']
989                 else:
990                     data = {}
991                 entry_id = param[2].strip('[]')
992                 data[entry_id] = self._pack_tpldlatency_stats(param, 3)
993                 statdict['pr_tpldlatency'] = data
994             elif param[1] == 'PR_TPLDJITTER':
995                 if 'pr_tpldjitter' in statdict:
996                     data = statdict['pr_tpldjitter']
997                 else:
998                     data = {}
999                 entry_id = param[2].strip('[]')
1000                 data[entry_id] = self._pack_tpldjitter_stats(param, 3)
1001                 statdict['pr_pldjitter'] = data
1002             elif param[1] == 'PR_FILTER':
1003                 if 'pr_filter' in statdict:
1004                     data = statdict['pr_filter']
1005                 else:
1006                     data = {}
1007                 entry_id = param[2].strip('[]')
1008                 data[entry_id] = self._pack_stats(param, 3)
1009                 statdict['pr_filter'] = data
1010             elif param[1] == 'P_RECEIVESYNC':
1011                 if param[2] == 'IN_SYNC':
1012                     statdict['p_receivesync'] = {'IN SYNC': 'True'}
1013                 else:
1014                     statdict['p_receivesync'] = {'IN SYNC': 'False'}
1015             else:
1016                 logging.warning("XenaPort: unknown stats: %s", param[1])
1017
1018         mydict = statdict
1019         return mydict
1020
1021
1022 class XenaTXStats(object):
1023     """
1024     Xena transmit stat class
1025     """
1026     def __init__(self, stats, epoc):
1027         """ Constructor
1028         :param stats: Stats from pt all command as list
1029         :param epoc: Current time in epoc
1030         :return: XenaTXStats object
1031         """
1032         self._stats = stats
1033         self._time = epoc
1034         self._ptstreamkeys = list()
1035         self.data = self.parse_stats()
1036         self.preamble = 8
1037
1038     @staticmethod
1039     def _pack_stats(params, start, fields=None):
1040         """ Pack up the list of stats in a dictionary
1041         :param params: The list of params to process
1042         :param start: What element to start at
1043         :param fields: The field names to pack as keys
1044         :return: Dictionary of data where fields match up to the params
1045         """
1046         if not fields:
1047             fields = ['bps', 'pps', 'bytes', 'packets']
1048         data = {}
1049         i = 0
1050         for column in fields:
1051             data[column] = int(params[start + i])
1052             i += 1
1053
1054         return data
1055
1056     def _pack_txextra_stats(self, params, start):
1057         """ Pack up the tx extra stats
1058         :param params: List of params to pack
1059         :param start: What element to start at
1060         :return: Dictionary of stats
1061         """
1062         fields = ['arprequests', 'arpreplies', 'pingrequests', 'pingreplies',
1063                   'injectedfcs', 'injectedseq', 'injectedmis', 'injectedint',
1064                   'injectedtid', 'training']
1065         return self._pack_stats(params, start, fields)
1066
1067     @property
1068     def pt_stream_keys(self):
1069         """
1070         :return: Return a list of pt_stream_x stream key ids
1071         """
1072         return self._ptstreamkeys
1073
1074     @property
1075     def time(self):
1076         """
1077         :return: Time as String of epoc of when stats were collected
1078         """
1079         return self._time
1080
1081     def parse_stats(self):
1082         """ Parse the stats from pr all command
1083         :return: Dictionary of all stats
1084         """
1085         statdict = {}
1086         for line in self._stats:
1087             param = line.split()
1088             if param[1] == 'PT_TOTAL':
1089                 statdict['pt_total'] = self._pack_stats(param, 2)
1090             elif param[1] == 'PT_NOTPLD':
1091                 statdict['pt_notpld'] = self._pack_stats(param, 2,)
1092             elif param[1] == 'PT_EXTRA':
1093                 statdict['pt_extra'] = self._pack_txextra_stats(param, 2)
1094             elif param[1] == 'PT_STREAM':
1095                 entry_id = "pt_stream_%s" % param[2].strip('[]')
1096                 self._ptstreamkeys.append(entry_id)
1097                 statdict[entry_id] = self._pack_stats(param, 3)
1098             else:
1099                 logging.warning("XenaPort: unknown stats: %s", param[1])
1100         mydict = statdict
1101         return mydict
1102
1103 def aggregate_stats(stat1, stat2):
1104     """
1105     Judge whether stat1 and stat2 both have same key, if both have same key,
1106     call the aggregate fuction, else use the stat1's value
1107     """
1108     newstat = dict()
1109     for keys in stat1.keys():
1110         if keys in stat2 and isinstance(stat1[keys], dict):
1111             newstat[keys] = aggregate(stat1[keys], stat2[keys])
1112         else:
1113             newstat[keys] = stat1[keys]
1114     return newstat
1115
1116 def aggregate(stat1, stat2):
1117     """
1118     Recursive function to aggregate two sets of statistics. This is used when
1119     bi directional traffic is done and statistics need to be calculated based
1120     on two sets of statistics.
1121     :param stat1: One set of dictionary stats from RX or TX stats
1122     :param stat2: Second set of dictionary stats from RX or TX stats
1123     :return: stats for data entry in RX or TX Stats instance
1124     """
1125     newstat = dict()
1126     for (keys1, keys2) in zip(stat1.keys(), stat2.keys()):
1127         if isinstance(stat1[keys1], dict):
1128             newstat[keys1] = aggregate(stat1[keys1], stat2[keys2])
1129         else:
1130             if not isinstance(stat1[keys1], int) and not isinstance(
1131                     [keys1], float):
1132                 # its some value we don't need to aggregate
1133                 return stat1[keys1]
1134             # for latency stats do the appropriate calculation
1135             if keys1 == 'max':
1136                 newstat[keys1] = max(stat1[keys1], stat2[keys2])
1137             elif keys1 == 'min':
1138                 newstat[keys1] = min(stat1[keys1], stat2[keys2])
1139             elif keys1 == 'avg':
1140                 newstat[keys1] = (stat1[keys1] + stat2[keys2]) / 2
1141             else:
1142                 newstat[keys1] = (stat1[keys1] + stat2[keys2])
1143     return newstat
1144
1145
1146 def line_percentage(port, stats, time_active, packet_size):
1147     """
1148     Calculate the line percentage rate from the duration, port object and stat
1149     object.
1150     :param port: XenaPort object
1151     :param stats: Xena RXStat or TXStat object
1152     :param time_active: time the stream was active in secs as int
1153     :param packet_size: packet size as int
1154     :return: line percentage as float
1155     """
1156     # this is ugly, but its prettier than calling the get method 3 times...
1157     try:
1158         packets = stats.data['pr_total']['packets']
1159     except KeyError:
1160         try:
1161             packets = stats.data['pt_total']['packets']
1162         except KeyError:
1163             _LOGGER.error(
1164                 'Could not calculate line rate because packet stat not found.')
1165             return 0
1166     ifg = port.get_inter_frame_gap()
1167     pps = packets_per_second(packets, time_active)
1168     l2br = l2_bit_rate(packet_size, stats.preamble, pps)
1169     l1br = l1_bit_rate(l2br, pps, ifg, stats.preamble)
1170     return 100.0 * l1br / port.get_effective_speed()
1171
1172
1173 def l2_bit_rate(packet_size, preamble, pps):
1174     """
1175     Return the l2 bit rate
1176     :param packet_size: packet size on the line in bytes
1177     :param preamble: preamble size of the packet header in bytes
1178     :param pps: packets per second
1179     :return: l2 bit rate as float
1180     """
1181     return (packet_size * preamble) * pps
1182
1183
1184 def l1_bit_rate(l2br, pps, ifg, preamble):
1185     """
1186     Return the l1 bit rate
1187     :param l2br: l2 bit rate int bits per second
1188     :param pps: packets per second
1189     :param ifg: the inter frame gap
1190     :param preamble: preamble size of the packet header in bytes
1191     :return: l1 bit rate as float
1192     """
1193     return l2br + (pps * ifg * preamble)
1194
1195
1196 def make_manager_command(cmd, argument=None):
1197     """ String builder for Xena socket commands
1198
1199     :param cmd: Command to send
1200     :param argument: Arguments for command to send
1201     :return: String of command
1202     """
1203     command = '{} "{}"'.format(cmd, argument) if argument else cmd
1204     _LOGGER.info("[Command Sent] : %s", command)
1205     return command
1206
1207
1208 def make_port_command(cmd, xena_port):
1209     """ String builder for Xena port commands
1210
1211     :param cmd: Command to send
1212     :param xena_port: XenaPort object
1213     :return: String of command
1214     """
1215     command = "{} {}".format(xena_port.port_string(), cmd)
1216     _LOGGER.info("[Command Sent] : %s", command)
1217     return command
1218
1219
1220 def make_stream_command(cmd, args, xena_stream):
1221     """ String builder for Xena port commands
1222
1223     :param cmd: Command to send
1224     :param xena_stream: XenaStream object
1225     :return: String of command
1226     """
1227     command = "{} {} [{}] {}".format(xena_stream.xena_port.port_string(), cmd,
1228                                      xena_stream.stream_id, args)
1229     _LOGGER.info("[Command Sent] : %s", command)
1230     return command
1231
1232
1233 def packets_per_second(packets, time_in_sec):
1234     """
1235     Return the pps as float
1236     :param packets: total packets
1237     :param time_in_sec: time in seconds
1238     :return: float of pps
1239     """
1240     return packets / time_in_sec