Merge "Add prox test case for SRIOV & 4 ports."
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import array
16 import io
17 import logging
18 import operator
19 import os
20 import re
21 import select
22 import socket
23 import time
24
25 from collections import OrderedDict, namedtuple
26 from contextlib import contextmanager
27 from itertools import repeat, chain
28 from multiprocessing import Queue
29
30 import six
31 from six.moves import cStringIO
32 from six.moves import zip, StringIO
33
34 from yardstick.common import utils
35 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
36 from yardstick.network_services.helpers.iniparser import ConfigParser
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
39 from yardstick.network_services import constants
40
41 PROX_PORT = 8474
42
43 SECTION_NAME = 0
44 SECTION_CONTENTS = 1
45
46 LOG = logging.getLogger(__name__)
47 LOG.setLevel(logging.DEBUG)
48 LOG_RESULT = logging.getLogger('yardstick')
49 LOG_RESULT.setLevel(logging.DEBUG)
50
51 BITS_PER_BYTE = 8
52 RETRY_SECONDS = 60
53 RETRY_INTERVAL = 1
54
55 CONFIGURATION_OPTIONS = (
56     # dict key           section     key               default value
57     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
58     ('testDuration', 'general', 'test_duration', 5.0),
59     ('testPrecision', 'general', 'test_precision', 1.0),
60     ('tests', 'general', 'tests', None),
61     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
62
63     ('logFile', 'logging', 'file', 'dats.log'),
64     ('logDateFormat', 'logging', 'datefmt', None),
65     ('logLevel', 'logging', 'level', 'INFO'),
66     ('logOverwrite', 'logging', 'overwrite', 1),
67
68     ('testerIp', 'tester', 'ip', None),
69     ('testerUser', 'tester', 'user', 'root'),
70     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
71     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
72     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
73     ('testerSocketId', 'tester', 'socket_id', 0),
74
75     ('sutIp', 'sut', 'ip', None),
76     ('sutUser', 'sut', 'user', 'root'),
77     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
78     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
79     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
80     ('sutSocketId', 'sut', 'socket_id', 0),
81 )
82
83
84 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
85     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
86
87     def __new__(cls, *args):
88         try:
89             matches = cls.CORE_RE.search(str(args[0]))
90             if matches:
91                 args = matches.groups()
92
93             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94                                                        'h' if args[2] else '')
95
96         except (AttributeError, TypeError, IndexError, ValueError):
97             raise ValueError('Invalid core spec {}'.format(args))
98
99     def is_hyperthread(self):
100         return self.hyperthread == 'h'
101
102     @property
103     def index(self):
104         return int(self.is_hyperthread())
105
106     def find_in_topology(self, cpu_topology):
107         try:
108             socket_core_match = cpu_topology[self.socket_id][self.core_id]
109             sorted_match = sorted(socket_core_match.values())
110             return sorted_match[self.index][0]
111         except (KeyError, IndexError):
112             template = "Core {}{} on socket {} does not exist"
113             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
114
115
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
117     def __new__(cls, *args):
118         try:
119             assert args[0] is not str(args[0])
120             args = tuple(args[0])
121         except (AssertionError, IndexError, TypeError):
122             pass
123
124         return super(TotStatsTuple, cls).__new__(cls, *args)
125
126
127 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
128                                                         'delta_tx,delta_tsc,'
129                                                         'latency,rx_total,tx_total,'
130                                                         'requested_pps')):
131     @property
132     def pkt_loss(self):
133         try:
134             return 1e2 * self.drop_total / float(self.tx_total)
135         except ZeroDivisionError:
136             return 100.0
137
138     @property
139     def tx_mpps(self):
140         # calculate the effective throughput in Mpps
141         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
142
143     @property
144     def rx_mpps(self):
145         # calculate the effective throughput in Mpps
146         return float(self.delta_rx) * self.tsc_hz / self.delta_tsc / 1e6
147
148     @property
149     def can_be_lost(self):
150         return int(self.tx_total * self.tolerated / 1e2)
151
152     @property
153     def drop_total(self):
154         return self.tx_total - self.rx_total
155
156     @property
157     def success(self):
158         return self.drop_total <= self.can_be_lost
159
160     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
161         if pkt_loss is None:
162             pkt_loss = self.pkt_loss
163
164         if port_samples is None:
165             port_samples = {}
166
167         latency_keys = [
168             "LatencyMin",
169             "LatencyMax",
170             "LatencyAvg",
171         ]
172
173         samples = {
174             "Throughput": self.rx_mpps,
175             "RxThroughput": self.rx_mpps,
176             "DropPackets": pkt_loss,
177             "CurrentDropPackets": pkt_loss,
178             "RequestedTxThroughput": self.requested_pps / 1e6,
179             "TxThroughput": self.tx_mpps,
180             "PktSize": pkt_size,
181         }
182         if port_samples:
183             samples.update(port_samples)
184
185         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
186         return samples
187
188     def log_data(self, logger=None):
189         if logger is None:
190             logger = LOG_RESULT
191
192         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
193         logger.info(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
194         logger.info("Mpps configured: %f; Mpps generated %f; Mpps received %f",
195                     self.requested_pps / 1e6, self.tx_mpps, self.rx_mpps)
196
197
198 class PacketDump(object):
199     @staticmethod
200     def assert_func(func, value1, value2, template=None):
201         assert func(value1, value2), template.format(value1, value2)
202
203     def __init__(self, port_id, data_len, payload):
204         template = "Packet dump has specified length {}, but payload is {} bytes long"
205         self.assert_func(operator.eq, data_len, len(payload), template)
206         self._port_id = port_id
207         self._data_len = data_len
208         self._payload = payload
209
210     @property
211     def port_id(self):
212         """Get the port id of the packet dump"""
213         return self._port_id
214
215     @property
216     def data_len(self):
217         """Get the length of the data received"""
218         return self._data_len
219
220     def __str__(self):
221         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
222
223     def payload(self, start=None, end=None):
224         """Get part of the payload as a list of ordinals.
225
226         Returns a list of byte values, matching the contents of the packet dump.
227         Optional start and end parameters can be specified to retrieve only a
228         part of the packet contents.
229
230         The number of elements in the list is equal to end - start + 1, so end
231         is the offset of the last character.
232
233         Args:
234             start (pos. int): the starting offset in the payload. If it is not
235                 specified or None, offset 0 is assumed.
236             end (pos. int): the ending offset of the payload. If it is not
237                 specified or None, the contents until the end of the packet are
238                 returned.
239
240         Returns:
241             [int, int, ...]. Each int represents the ordinal value of a byte in
242             the packet payload.
243         """
244         if start is None:
245             start = 0
246
247         if end is None:
248             end = self.data_len - 1
249
250         # Bounds checking on offsets
251         template = "Start offset must be non-negative"
252         self.assert_func(operator.ge, start, 0, template)
253
254         template = "End offset must be less than {1}"
255         self.assert_func(operator.lt, end, self.data_len, template)
256
257         # Adjust for splice operation: end offset must be 1 more than the offset
258         # of the last desired character.
259         end += 1
260
261         return self._payload[start:end]
262
263
264 class ProxSocketHelper(object):
265
266     def __init__(self, sock=None):
267         """ creates new prox instance """
268         super(ProxSocketHelper, self).__init__()
269
270         if sock is None:
271             sock = socket.socket()
272
273         self._sock = sock
274         self._pkt_dumps = []
275         self.master_stats = None
276
277     def connect(self, ip, port):
278         """Connect to the prox instance on the remote system"""
279         self._sock.connect((ip, port))
280
281     def get_socket(self):
282         """ get the socket connected to the remote instance """
283         return self._sock
284
285     def _parse_socket_data(self, decoded_data, pkt_dump_only):
286         def get_newline_index():
287             return decoded_data.find('\n', index)
288
289         ret_str = ''
290         index = 0
291         for newline_index in iter(get_newline_index, -1):
292             ret_str = decoded_data[index:newline_index]
293
294             try:
295                 mode, port_id, data_len = ret_str.split(',', 2)
296             except ValueError:
297                 mode, port_id, data_len = None, None, None
298
299             if mode != 'pktdump':
300                 # Regular 1-line message. Stop reading from the socket.
301                 LOG.debug("Regular response read")
302                 return ret_str, True
303
304             LOG.debug("Packet dump header read: [%s]", ret_str)
305
306             # The line is a packet dump header. Parse it, read the
307             # packet payload, store the dump for later retrieval.
308             # Skip over the packet dump and continue processing: a
309             # 1-line response may follow the packet dump.
310
311             data_len = int(data_len)
312             data_start = newline_index + 1  # + 1 to skip over \n
313             data_end = data_start + data_len
314             sub_data = decoded_data[data_start:data_end]
315             pkt_payload = array.array('B', (ord(v) for v in sub_data))
316             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
317             self._pkt_dumps.append(pkt_dump)
318
319             if pkt_dump_only:
320                 # Return boolean instead of string to signal
321                 # successful reception of the packet dump.
322                 LOG.debug("Packet dump stored, returning")
323                 return True, False
324
325             index = data_end + 1
326
327         return ret_str, False
328
329     def get_string(self, pkt_dump_only=False, timeout=0.01):
330
331         def is_ready_string():
332             # recv() is blocking, so avoid calling it when no data is waiting.
333             ready = select.select([self._sock], [], [], timeout)
334             return bool(ready[0])
335
336         status = False
337         ret_str = ""
338         while status is False:
339             for status in iter(is_ready_string, False):
340                 decoded_data = self._sock.recv(256).decode('utf-8')
341                 ret_str, done = self._parse_socket_data(decoded_data,
342                                                         pkt_dump_only)
343                 if (done):
344                     status = True
345                     break
346
347         LOG.debug("Received data from socket: [%s]", ret_str)
348         return status, ret_str
349
350     def get_data(self, pkt_dump_only=False, timeout=0.01):
351         """ read data from the socket """
352
353         # This method behaves slightly differently depending on whether it is
354         # called to read the response to a command (pkt_dump_only = 0) or if
355         # it is called specifically to read a packet dump (pkt_dump_only = 1).
356         #
357         # Packet dumps look like:
358         #   pktdump,<port_id>,<data_len>\n
359         #   <packet contents as byte array>\n
360         # This means the total packet dump message consists of 2 lines instead
361         # of 1 line.
362         #
363         # - Response for a command (pkt_dump_only = 0):
364         #   1) Read response from the socket until \n (end of message)
365         #   2a) If the response is a packet dump header (starts with "pktdump,"):
366         #     - Read the packet payload and store the packet dump for later
367         #       retrieval.
368         #     - Reset the state and restart from 1). Eventually state 2b) will
369         #       be reached and the function will return.
370         #   2b) If the response is not a packet dump:
371         #     - Return the received message as a string
372         #
373         # - Explicit request to read a packet dump (pkt_dump_only = 1):
374         #   - Read the dump header and payload
375         #   - Store the packet dump for later retrieval
376         #   - Return True to signify a packet dump was successfully read
377
378         def is_ready():
379             # recv() is blocking, so avoid calling it when no data is waiting.
380             ready = select.select([self._sock], [], [], timeout)
381             return bool(ready[0])
382
383         status = False
384         ret_str = ""
385         for status in iter(is_ready, False):
386             decoded_data = self._sock.recv(256).decode('utf-8')
387             ret_str, done = self._parse_socket_data(decoded_data, pkt_dump_only)
388             if (done):
389                 break
390
391         LOG.debug("Received data from socket: [%s]", ret_str)
392         return ret_str if status else ''
393
394     def put_command(self, to_send):
395         """ send data to the remote instance """
396         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
397         try:
398             # NOTE: sendall will block, we need a timeout
399             self._sock.sendall(to_send.encode('utf-8'))
400         except:  # pylint: disable=bare-except
401             pass
402
403     def get_packet_dump(self):
404         """ get the next packet dump """
405         if self._pkt_dumps:
406             return self._pkt_dumps.pop(0)
407         return None
408
409     def stop_all_reset(self):
410         """ stop the remote instance and reset stats """
411         LOG.debug("Stop all and reset stats")
412         self.stop_all()
413         self.reset_stats()
414
415     def stop_all(self):
416         """ stop all cores on the remote instance """
417         LOG.debug("Stop all")
418         self.put_command("stop all\n")
419
420     def stop(self, cores, task=''):
421         """ stop specific cores on the remote instance """
422
423         tmpcores = []
424         for core in cores:
425             if core not in tmpcores:
426                 tmpcores.append(core)
427
428         LOG.debug("Stopping cores %s", tmpcores)
429         self.put_command("stop {} {}\n".format(join_non_strings(',', tmpcores), task))
430
431     def start_all(self):
432         """ start all cores on the remote instance """
433         LOG.debug("Start all")
434         self.put_command("start all\n")
435
436     def start(self, cores):
437         """ start specific cores on the remote instance """
438
439         tmpcores = []
440         for core in cores:
441             if core not in tmpcores:
442                 tmpcores.append(core)
443
444         LOG.debug("Starting cores %s", tmpcores)
445         self.put_command("start {}\n".format(join_non_strings(',', tmpcores)))
446
447     def reset_stats(self):
448         """ reset the statistics on the remote instance """
449         LOG.debug("Reset stats")
450         self.put_command("reset stats\n")
451
452     def _run_template_over_cores(self, template, cores, *args):
453         for core in cores:
454             self.put_command(template.format(core, *args))
455
456     def set_pkt_size(self, cores, pkt_size):
457         """ set the packet size to generate on the remote instance """
458         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
459         pkt_size -= 4
460         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
461
462     def set_value(self, cores, offset, value, length):
463         """ set value on the remote instance """
464         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
465         LOG.debug(msg, cores, value, length, offset)
466         template = "set value {} 0 {} {} {}\n"
467         self._run_template_over_cores(template, cores, offset, value, length)
468
469     def reset_values(self, cores):
470         """ reset values on the remote instance """
471         LOG.debug("Set value for core(s) %s", cores)
472         self._run_template_over_cores("reset values {} 0\n", cores)
473
474     def set_speed(self, cores, speed, tasks=None):
475         """ set speed on the remote instance """
476         if tasks is None:
477             tasks = [0] * len(cores)
478         elif len(tasks) != len(cores):
479             LOG.error("set_speed: cores and tasks must have the same len")
480         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
481         for (core, task) in list(zip(cores, tasks)):
482             self.put_command("speed {} {} {}\n".format(core, task, speed))
483
484     def slope_speed(self, cores_speed, duration, n_steps=0):
485         """will start to increase speed from 0 to N where N is taken from
486         a['speed'] for each a in cores_speed"""
487         # by default, each step will take 0.5 sec
488         if n_steps == 0:
489             n_steps = duration * 2
490
491         private_core_data = []
492         step_duration = float(duration) / n_steps
493         for core_data in cores_speed:
494             target = float(core_data['speed'])
495             private_core_data.append({
496                 'cores': core_data['cores'],
497                 'zero': 0,
498                 'delta': target / n_steps,
499                 'current': 0,
500                 'speed': target,
501             })
502
503         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
504         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
505             time.sleep(step_duration)
506             for core_data in private_core_data:
507                 core_data['current'] = core_data[key1] + core_data[key2]
508                 self.set_speed(core_data['cores'], core_data['current'])
509
510     def set_pps(self, cores, pps, pkt_size,
511                 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
512         """ set packets per second for specific cores on the remote instance """
513         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
514         LOG.debug(msg, cores, pps, pkt_size)
515
516         # speed in percent of line-rate
517         speed = float(pps) * (pkt_size + 20) / line_speed / BITS_PER_BYTE
518         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
519
520     def lat_stats(self, cores, task=0):
521         """Get the latency statistics from the remote system"""
522         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
523         lat_min = {}
524         lat_max = {}
525         lat_avg = {}
526         for core in cores:
527             self.put_command("lat stats {} {} \n".format(core, task))
528             ret = self.get_data()
529
530             try:
531                 lat_min[core], lat_max[core], lat_avg[core] = \
532                     tuple(int(n) for n in ret.split(",")[:3])
533
534             except (AttributeError, ValueError, TypeError):
535                 pass
536
537         return lat_min, lat_max, lat_avg
538
539     def get_all_tot_stats(self):
540         self.put_command("tot stats\n")
541         all_stats_str = self.get_data().split(",")
542         if len(all_stats_str) != 4:
543             all_stats = [0] * 4
544             return all_stats
545         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
546         self.master_stats = all_stats
547         return all_stats
548
549     def hz(self):
550         return self.get_all_tot_stats()[3]
551
552     def core_stats(self, cores, task=0):
553         """Get the receive statistics from the remote system"""
554         rx = tx = drop = tsc = 0
555         for core in cores:
556             self.put_command("core stats {} {}\n".format(core, task))
557             ret = self.get_data().split(",")
558             rx += int(ret[0])
559             tx += int(ret[1])
560             drop += int(ret[2])
561             tsc = int(ret[3])
562         return rx, tx, drop, tsc
563
564     def multi_port_stats(self, ports):
565         """get counter values from all  ports at once"""
566
567         ports_str = ",".join(map(str, ports))
568         ports_all_data = []
569         tot_result = [0] * len(ports)
570
571         port_index = 0
572         while (len(ports) is not len(ports_all_data)):
573             self.put_command("multi port stats {}\n".format(ports_str))
574             status, ports_all_data_str = self.get_string()
575
576             if not status:
577                 return False, []
578
579             ports_all_data = ports_all_data_str.split(";")
580
581             if len(ports) is len(ports_all_data):
582                 for port_data_str in ports_all_data:
583
584                     tmpdata = []
585                     try:
586                         tmpdata = [try_int(s, 0) for s in port_data_str.split(",")]
587                     except (IndexError, TypeError):
588                         LOG.error("Unpacking data error %s", port_data_str)
589                         return False, []
590
591                     if (len(tmpdata) < 6) or tmpdata[0] not in ports:
592                         LOG.error("Corrupted PACKET %s - retrying", port_data_str)
593                         return False, []
594                     else:
595                         tot_result[port_index] = tmpdata
596                         port_index = port_index + 1
597             else:
598                 LOG.error("Empty / too much data - retry -%s-", ports_all_data)
599                 return False, []
600
601         LOG.debug("Multi port packet ..OK.. %s", tot_result)
602         return True, tot_result
603
604     def port_stats(self, ports):
605         """get counter values from a specific port"""
606         tot_result = [0] * 12
607         for port in ports:
608             self.put_command("port_stats {}\n".format(port))
609             ret = [try_int(s, 0) for s in self.get_data().split(",")]
610             tot_result = [sum(x) for x in zip(tot_result, ret)]
611         return tot_result
612
613     @contextmanager
614     def measure_tot_stats(self):
615         start = self.get_all_tot_stats()
616         container = {'start_tot': start}
617         try:
618             yield container
619         finally:
620             container['end_tot'] = end = self.get_all_tot_stats()
621
622         container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
623
624     def tot_stats(self):
625         """Get the total statistics from the remote system"""
626         stats = self.get_all_tot_stats()
627         return stats[:3]
628
629     def tot_ierrors(self):
630         """Get the total ierrors from the remote system"""
631         self.put_command("tot ierrors tot\n")
632         recv = self.get_data().split(',')
633         tot_ierrors = int(recv[0])
634         tsc = int(recv[0])
635         return tot_ierrors, tsc
636
637     def set_count(self, count, cores):
638         """Set the number of packets to send on the specified core"""
639         self._run_template_over_cores("count {} 0 {}\n", cores, count)
640
641     def dump_rx(self, core_id, task_id=0, count=1):
642         """Activate dump on rx on the specified core"""
643         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
644         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
645         time.sleep(1.5)  # Give PROX time to set up packet dumping
646
647     def quit(self):
648         self.stop_all()
649         self._quit()
650         self.force_quit()
651
652     def _quit(self):
653         """ stop all cores on the remote instance """
654         LOG.debug("Quit prox")
655         self.put_command("quit\n")
656         time.sleep(3)
657
658     def force_quit(self):
659         """ stop all cores on the remote instance """
660         LOG.debug("Force Quit prox")
661         self.put_command("quit_force\n")
662         time.sleep(3)
663
664
665 _LOCAL_OBJECT = object()
666
667
668 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
669     # the actual app is lowercase
670     APP_NAME = 'prox'
671     # not used for Prox but added for consistency
672     VNF_TYPE = "PROX"
673
674     LUA_PARAMETER_NAME = ""
675     LUA_PARAMETER_PEER = {
676         "gen": "sut",
677         "sut": "gen",
678     }
679
680     CONFIG_QUEUE_TIMEOUT = 120
681
682     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
683         self.remote_path = None
684         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
685         self.remote_prox_file_name = None
686         self._prox_config_data = None
687         self.additional_files = {}
688         self.config_queue = Queue()
689         # allow_exit_without_flush
690         self.config_queue.cancel_join_thread()
691         self._global_section = None
692
693     @property
694     def prox_config_data(self):
695         if self._prox_config_data is None:
696             # this will block, but it needs too
697             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
698         return self._prox_config_data
699
700     @property
701     def global_section(self):
702         if self._global_section is None and self.prox_config_data:
703             self._global_section = self.find_section("global")
704         return self._global_section
705
706     def find_section(self, name, default=_LOCAL_OBJECT):
707         result = next((value for key, value in self.prox_config_data if key == name), default)
708         if result is _LOCAL_OBJECT:
709             raise KeyError('{} not found in Prox config'.format(name))
710         return result
711
712     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
713         section = self.find_section(section_name, [])
714         result = next((value for key, value in section if key == section_key), default)
715         if result is _LOCAL_OBJECT:
716             template = '{} not found in {} section of Prox config'
717             raise KeyError(template.format(section_key, section_name))
718         return result
719
720     def copy_to_target(self, config_file_path, prox_file):
721         remote_path = os.path.join("/tmp", prox_file)
722         self.ssh_helper.put(config_file_path, remote_path)
723         return remote_path
724
725     @staticmethod
726     def _get_tx_port(section, sections):
727         iface_port = [-1]
728         for item in sections[section]:
729             if item[0] == "tx port":
730                 iface_port = re.findall(r'\d+', item[1])
731                 # do we want the last one?
732                 #   if yes, then can we reverse?
733         return int(iface_port[0])
734
735     @staticmethod
736     def _replace_quoted_with_value(quoted, value, count=1):
737         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
738         return new_string
739
740     def _insert_additional_file(self, value):
741         file_str = value.split('"')
742         base_name = os.path.basename(file_str[1])
743         file_str[1] = self.additional_files[base_name]
744         return '"'.join(file_str)
745
746     def generate_prox_config_file(self, config_path):
747         sections = []
748         prox_config = ConfigParser(config_path, sections)
749         prox_config.parse()
750
751         # Ensure MAC is set "hardware"
752         all_ports = self.vnfd_helper.port_pairs.all_ports
753         # use dpdk port number
754         for port_name in all_ports:
755             port_num = self.vnfd_helper.port_num(port_name)
756             port_section_name = "port {}".format(port_num)
757             for section_name, section in sections:
758                 if port_section_name != section_name:
759                     continue
760
761                 for section_data in section:
762                     if section_data[0] == "mac":
763                         section_data[1] = "hardware"
764
765         # search for dst mac
766         for _, section in sections:
767             for section_data in section:
768                 item_key, item_val = section_data
769                 if item_val.startswith("@@dst_mac"):
770                     tx_port_iter = re.finditer(r'\d+', item_val)
771                     tx_port_no = int(next(tx_port_iter).group(0))
772                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
773                     mac = intf["virtual-interface"]["dst_mac"]
774                     section_data[1] = mac.replace(":", " ", 6)
775
776                 if item_key == "dst mac" and item_val.startswith("@@"):
777                     tx_port_iter = re.finditer(r'\d+', item_val)
778                     tx_port_no = int(next(tx_port_iter).group(0))
779                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
780                     mac = intf["virtual-interface"]["dst_mac"]
781                     section_data[1] = mac
782
783                 if item_val.startswith("@@src_mac"):
784                     tx_port_iter = re.finditer(r'\d+', item_val)
785                     tx_port_no = int(next(tx_port_iter).group(0))
786                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
787                     mac = intf["virtual-interface"]["local_mac"]
788                     section_data[1] = mac.replace(":", " ", 6)
789
790                 if item_key == "src mac" and item_val.startswith("@@"):
791                     tx_port_iter = re.finditer(r'\d+', item_val)
792                     tx_port_no = int(next(tx_port_iter).group(0))
793                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
794                     mac = intf["virtual-interface"]["local_mac"]
795                     section_data[1] = mac
796
797         # if addition file specified in prox config
798         if not self.additional_files:
799             return sections
800
801         for section_name, section in sections:
802             for section_data in section:
803                 try:
804                     if section_data[0].startswith("dofile"):
805                         section_data[0] = self._insert_additional_file(section_data[0])
806
807                     if section_data[1].startswith("dofile"):
808                         section_data[1] = self._insert_additional_file(section_data[1])
809                 except:  # pylint: disable=bare-except
810                     pass
811
812         return sections
813
814     @staticmethod
815     def write_prox_lua(lua_config):
816         """
817         Write an .ini-format config file for PROX (parameters.lua)
818         PROX does not allow a space before/after the =, so we need
819         a custom method
820         """
821         out = []
822         for key in lua_config:
823             value = '"' + lua_config[key] + '"'
824             if key == "__name__":
825                 continue
826             if value is not None and value != '@':
827                 key = "=".join((key, str(value).replace('\n', '\n\t')))
828                 out.append(key)
829             else:
830                 key = str(key).replace('\n', '\n\t')
831                 out.append(key)
832         return os.linesep.join(out)
833
834     @staticmethod
835     def write_prox_config(prox_config):
836         """
837         Write an .ini-format config file for PROX
838         PROX does not allow a space before/after the =, so we need
839         a custom method
840         """
841         out = []
842         for (section_name, section) in prox_config:
843             out.append("[{}]".format(section_name))
844             for item in section:
845                 key, value = item
846                 if key == "__name__":
847                     continue
848                 if value is not None and value != '@':
849                     key = "=".join((key, str(value).replace('\n', '\n\t')))
850                     out.append(key)
851                 else:
852                     key = str(key).replace('\n', '\n\t')
853                     out.append(key)
854         return os.linesep.join(out)
855
856     def put_string_to_file(self, s, remote_path):
857         file_obj = cStringIO(s)
858         self.ssh_helper.put_file_obj(file_obj, remote_path)
859         return remote_path
860
861     def generate_prox_lua_file(self):
862         p = OrderedDict()
863         all_ports = self.vnfd_helper.port_pairs.all_ports
864         for port_name in all_ports:
865             port_num = self.vnfd_helper.port_num(port_name)
866             intf = self.vnfd_helper.find_interface(name=port_name)
867             vintf = intf['virtual-interface']
868             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
869             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
870
871         return p
872
873     def upload_prox_lua(self, config_file, lua_data):
874         # prox can't handle spaces around ' = ' so use custom method
875         out = StringIO(self.write_prox_lua(lua_data))
876         out.seek(0)
877         remote_path = os.path.join("/tmp", config_file)
878         self.ssh_helper.put_file_obj(out, remote_path)
879
880         return remote_path
881
882     def upload_prox_config(self, config_file, prox_config_data):
883         # prox can't handle spaces around ' = ' so use custom method
884         out = StringIO(self.write_prox_config(prox_config_data))
885         out.seek(0)
886         remote_path = os.path.join("/tmp", config_file)
887         self.ssh_helper.put_file_obj(out, remote_path)
888
889         return remote_path
890
891     def build_config_file(self):
892         task_path = self.scenario_helper.task_path
893         options = self.scenario_helper.options
894         config_path = options['prox_config']
895         config_file = os.path.basename(config_path)
896         config_path = utils.find_relative_file(config_path, task_path)
897         self.additional_files = {}
898
899         try:
900             if options['prox_generate_parameter']:
901                 self.lua = []
902                 self.lua = self.generate_prox_lua_file()
903                 if len(self.lua) > 0:
904                     self.upload_prox_lua("parameters.lua", self.lua)
905         except:  # pylint: disable=bare-except
906             pass
907
908         prox_files = options.get('prox_files', [])
909         if isinstance(prox_files, six.string_types):
910             prox_files = [prox_files]
911         for key_prox_file in prox_files:
912             base_prox_file = os.path.basename(key_prox_file)
913             key_prox_path = utils.find_relative_file(key_prox_file, task_path)
914             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
915             self.additional_files[base_prox_file] = remote_prox_file
916
917         self._prox_config_data = self.generate_prox_config_file(config_path)
918         # copy config to queue so we can read it from traffic_runner process
919         self.config_queue.put(self._prox_config_data)
920         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
921
922     def build_config(self):
923         self.build_config_file()
924
925         options = self.scenario_helper.options
926         prox_args = options['prox_args']
927         tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
928
929         self.pipeline_kwargs = {
930             'tool_path': tool_path,
931             'tool_dir': os.path.dirname(tool_path),
932             'cfg_file': self.remote_path,
933             'args': ' '.join(' '.join([str(k), str(v) if v else ''])
934                              for k, v in prox_args.items())
935         }
936
937         cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
938                         "{args} -f {cfg_file} '")
939         return cmd_template.format(**self.pipeline_kwargs)
940
941
942 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
943 class ProxResourceHelper(ClientResourceHelper):
944
945     RESOURCE_WORD = 'prox'
946
947     PROX_MODE = ""
948
949     WAIT_TIME = 3
950
951     @staticmethod
952     def find_pci(pci, bound_pci):
953         # we have to substring match PCI bus address from the end
954         return any(b.endswith(pci) for b in bound_pci)
955
956     def __init__(self, setup_helper):
957         super(ProxResourceHelper, self).__init__(setup_helper)
958         self.mgmt_interface = self.vnfd_helper.mgmt_interface
959         self._user = self.mgmt_interface["user"]
960         self._ip = self.mgmt_interface["ip"]
961
962         self.done = False
963         self._vpci_to_if_name_map = None
964         self.additional_file = {}
965         self.remote_prox_file_name = None
966         self.lower = None
967         self.upper = None
968         self.step_delta = 1
969         self.step_time = 0.5
970         self._test_type = None
971
972     @property
973     def sut(self):
974         if not self.client:
975             self.client = self._connect()
976         return self.client
977
978     @property
979     def test_type(self):
980         if self._test_type is None:
981             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
982         return self._test_type
983
984     def run_traffic(self, traffic_profile, *args):
985         self._queue.cancel_join_thread()
986         self.lower = 0.0
987         self.upper = 100.0
988
989         traffic_profile.init(self._queue)
990         # this frees up the run_traffic loop
991         self.client_started.value = 1
992
993         while not self._terminated.value:
994             # move it all to traffic_profile
995             self._run_traffic_once(traffic_profile)
996
997     def _run_traffic_once(self, traffic_profile):
998         traffic_profile.execute_traffic(self)
999         if traffic_profile.done.is_set():
1000             self._queue.put({'done': True})
1001             LOG.debug("tg_prox done")
1002             self._terminated.value = 1
1003
1004     # For VNF use ResourceHelper method to collect KPIs directly.
1005     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
1006     def collect_collectd_kpi(self):
1007         return self._collect_resource_kpi()
1008
1009     def collect_kpi(self):
1010         result = super(ProxResourceHelper, self).collect_kpi()
1011         # add in collectd kpis manually
1012         if result:
1013             result['collect_stats'] = self._collect_resource_kpi()
1014         return result
1015
1016     def terminate(self):
1017         # should not be called, use VNF terminate
1018         raise NotImplementedError()
1019
1020     def up_post(self):
1021         return self.sut  # force connection
1022
1023     def execute(self, cmd, *args, **kwargs):
1024         func = getattr(self.sut, cmd, None)
1025         if func:
1026             return func(*args, **kwargs)
1027         return None
1028
1029     def _connect(self, client=None):
1030         """Run and connect to prox on the remote system """
1031         # De-allocating a large amount of hugepages takes some time. If a new
1032         # PROX instance is started immediately after killing the previous one,
1033         # it might not be able to allocate hugepages, because they are still
1034         # being freed. Hence the -w switch.
1035         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
1036         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
1037         # -f ./handle_none-4.cfg"
1038         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
1039         #  "; " \
1040         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
1041         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
1042         # sudo " \
1043         #    + "./build/Prox " + prox_args
1044         # log.debug("Starting PROX with command [%s]", prox_cmd)
1045         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
1046         # self._ip, prox_cmd))
1047         if client is None:
1048             client = ProxSocketHelper()
1049
1050         # try connecting to Prox for 60s
1051         for _ in range(RETRY_SECONDS):
1052             time.sleep(RETRY_INTERVAL)
1053             try:
1054                 client.connect(self._ip, PROX_PORT)
1055             except (socket.gaierror, socket.error):
1056                 continue
1057             else:
1058                 return client
1059
1060         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1061         raise Exception(msg.format(self._ip, PROX_PORT))
1062
1063
1064 class ProxDataHelper(object):
1065
1066     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss, line_speed):
1067         super(ProxDataHelper, self).__init__()
1068         self.vnfd_helper = vnfd_helper
1069         self.sut = sut
1070         self.pkt_size = pkt_size
1071         self.value = value
1072         self.line_speed = line_speed
1073         self.tolerated_loss = tolerated_loss
1074         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
1075         self.tsc_hz = None
1076         self.measured_stats = None
1077         self.latency = None
1078         self._totals_and_pps = None
1079         self.result_tuple = None
1080
1081     @property
1082     def totals_and_pps(self):
1083         if self._totals_and_pps is None:
1084             rx_total = tx_total = 0
1085             ok = False
1086             timeout = time.time() + constants.RETRY_TIMEOUT
1087             while not ok:
1088                 ok, all_ports = self.sut.multi_port_stats([
1089                     self.vnfd_helper.port_num(port_name)
1090                     for port_name in self.vnfd_helper.port_pairs.all_ports])
1091                 if time.time() > timeout:
1092                     break
1093             if ok:
1094                 for port in all_ports:
1095                     rx_total = rx_total + port[1]
1096                     tx_total = tx_total + port[2]
1097                 requested_pps = self.value / 100.0 * self.line_rate_to_pps()
1098                 self._totals_and_pps = rx_total, tx_total, requested_pps
1099         return self._totals_and_pps
1100
1101     @property
1102     def rx_total(self):
1103         try:
1104             ret_val = self.totals_and_pps[0]
1105         except (AttributeError, ValueError, TypeError, LookupError):
1106             ret_val = 0
1107         return ret_val
1108
1109     @property
1110     def tx_total(self):
1111         try:
1112             ret_val = self.totals_and_pps[1]
1113         except (AttributeError, ValueError, TypeError, LookupError):
1114             ret_val = 0
1115         return ret_val
1116
1117     @property
1118     def requested_pps(self):
1119         try:
1120             ret_val = self.totals_and_pps[2]
1121         except (AttributeError, ValueError, TypeError, LookupError):
1122             ret_val = 0
1123         return ret_val
1124
1125     @property
1126     def samples(self):
1127         samples = {}
1128         ports = []
1129         port_names = {}
1130         for port_name, port_num in self.vnfd_helper.ports_iter():
1131             ports.append(port_num)
1132             port_names[port_num] = port_name
1133
1134         ok = False
1135         timeout = time.time() + constants.RETRY_TIMEOUT
1136         while not ok:
1137             ok, results = self.sut.multi_port_stats(ports)
1138             if time.time() > timeout:
1139                 break
1140         if ok:
1141             for result in results:
1142                 port_num = result[0]
1143                 try:
1144                     samples[port_names[port_num]] = {
1145                         "in_packets": result[1],
1146                         "out_packets": result[2]}
1147                 except (IndexError, KeyError):
1148                     pass
1149         return samples
1150
1151     def __enter__(self):
1152         self.check_interface_count()
1153         return self
1154
1155     def __exit__(self, exc_type, exc_val, exc_tb):
1156         self.make_tuple()
1157
1158     def make_tuple(self):
1159         if self.result_tuple:
1160             return
1161
1162         self.result_tuple = ProxTestDataTuple(
1163             self.tolerated_loss,
1164             self.tsc_hz,
1165             self.measured_stats['delta'].rx,
1166             self.measured_stats['delta'].tx,
1167             self.measured_stats['delta'].tsc,
1168             self.latency,
1169             self.rx_total,
1170             self.tx_total,
1171             self.requested_pps,
1172         )
1173         self.result_tuple.log_data()
1174
1175     @contextmanager
1176     def measure_tot_stats(self):
1177         with self.sut.measure_tot_stats() as self.measured_stats:
1178             yield
1179
1180     def check_interface_count(self):
1181         # do this assert in init?  unless we expect interface count to
1182         # change from one run to another run...
1183         assert self.port_count in {1, 2, 4}, \
1184             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1185
1186     def capture_tsc_hz(self):
1187         self.tsc_hz = float(self.sut.hz())
1188
1189     def line_rate_to_pps(self):
1190       return self.port_count * self.line_speed  / BITS_PER_BYTE / (self.pkt_size + 20)
1191
1192 class ProxProfileHelper(object):
1193
1194     __prox_profile_type__ = "Generic"
1195
1196     PROX_CORE_GEN_MODE = "gen"
1197     PROX_CORE_LAT_MODE = "lat"
1198
1199     @classmethod
1200     def get_cls(cls, helper_type):
1201         """Return class of specified type."""
1202         if not helper_type:
1203             return ProxProfileHelper
1204
1205         for profile_helper_class in utils.itersubclasses(cls):
1206             if helper_type == profile_helper_class.__prox_profile_type__:
1207                 return profile_helper_class
1208
1209         return ProxProfileHelper
1210
1211     @classmethod
1212     def make_profile_helper(cls, resource_helper):
1213         return cls.get_cls(resource_helper.test_type)(resource_helper)
1214
1215     def __init__(self, resource_helper):
1216         super(ProxProfileHelper, self).__init__()
1217         self.resource_helper = resource_helper
1218         self._cpu_topology = None
1219         self._test_cores = None
1220         self._latency_cores = None
1221
1222     @property
1223     def cpu_topology(self):
1224         if not self._cpu_topology:
1225             stdout = io.BytesIO()
1226             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1227             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1228         return self._cpu_topology
1229
1230     @property
1231     def test_cores(self):
1232         if not self._test_cores:
1233             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1234         return self._test_cores
1235
1236     @property
1237     def latency_cores(self):
1238         if not self._latency_cores:
1239             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1240         return self._latency_cores
1241
1242     @contextmanager
1243     def traffic_context(self, pkt_size, value):
1244         self.sut.stop_all()
1245         self.sut.reset_stats()
1246         try:
1247             self.sut.set_pkt_size(self.test_cores, pkt_size)
1248             self.sut.set_speed(self.test_cores, value)
1249             self.sut.start_all()
1250             time.sleep(1)
1251             yield
1252         finally:
1253             self.sut.stop_all()
1254
1255     def get_cores(self, mode):
1256         cores = []
1257
1258         for section_name, section in self.setup_helper.prox_config_data:
1259             if not section_name.startswith("core"):
1260                 continue
1261
1262             for key, value in section:
1263                 if key == "mode" and value == mode:
1264                     core_tuple = CoreSocketTuple(section_name)
1265                     core = core_tuple.core_id
1266                     cores.append(core)
1267
1268         return cores
1269
1270     def pct_10gbps(self, percent, line_speed):
1271         """Get rate in percent of 10 Gbps.
1272
1273         Returns the rate in percent of 10 Gbps.
1274         For instance 100.0 = 10 Gbps; 400.0 = 40 Gbps.
1275
1276         This helper method isrequired when setting interface_speed option in
1277         the testcase because NSB/PROX considers 10Gbps as 100% of line rate,
1278         this means that the line rate must be expressed as a percentage of
1279         10Gbps.
1280
1281         :param percent: (float) Percent of line rate (100.0 = line rate).
1282         :param line_speed: (int) line rate speed, in bits per second.
1283
1284         :return: (float) Represents the rate in percent of 10Gbps.
1285         """
1286         return (percent * line_speed / (
1287             constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT))
1288
1289     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1290                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1291         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1292                                      value, tolerated_loss, line_speed)
1293
1294         with data_helper, self.traffic_context(pkt_size,
1295                                                self.pct_10gbps(value, line_speed)):
1296             with data_helper.measure_tot_stats():
1297                 time.sleep(duration)
1298                 # Getting statistics to calculate PPS at right speed....
1299                 data_helper.capture_tsc_hz()
1300                 data_helper.latency = self.get_latency()
1301
1302         return data_helper.result_tuple, data_helper.samples
1303
1304     def get_latency(self):
1305         """
1306         :return: return lat_min, lat_max, lat_avg
1307         :rtype: list
1308         """
1309
1310         if not self._latency_cores:
1311             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1312
1313         if self._latency_cores:
1314             return self.sut.lat_stats(self._latency_cores)
1315         return []
1316
1317     def terminate(self):
1318         pass
1319
1320     def __getattr__(self, item):
1321         return getattr(self.resource_helper, item)
1322
1323
1324 class ProxMplsProfileHelper(ProxProfileHelper):
1325
1326     __prox_profile_type__ = "MPLS tag/untag"
1327
1328     def __init__(self, resource_helper):
1329         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1330         self._cores_tuple = None
1331
1332     @property
1333     def mpls_cores(self):
1334         if not self._cores_tuple:
1335             self._cores_tuple = self.get_cores_mpls()
1336         return self._cores_tuple
1337
1338     @property
1339     def tagged_cores(self):
1340         return self.mpls_cores[0]
1341
1342     @property
1343     def plain_cores(self):
1344         return self.mpls_cores[1]
1345
1346     def get_cores_mpls(self):
1347         cores_tagged = []
1348         cores_plain = []
1349         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1350             if not section_name.startswith("core"):
1351                 continue
1352
1353             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1354                 continue
1355
1356             for item_key, item_value in section:
1357                 if item_key != 'name':
1358                     continue
1359
1360                 if item_value.startswith("tag"):
1361                     core_tuple = CoreSocketTuple(section_name)
1362                     core_tag = core_tuple.core_id
1363                     cores_tagged.append(core_tag)
1364
1365                 elif item_value.startswith("udp"):
1366                     core_tuple = CoreSocketTuple(section_name)
1367                     core_udp = core_tuple.core_id
1368                     cores_plain.append(core_udp)
1369
1370         return cores_tagged, cores_plain
1371
1372     @contextmanager
1373     def traffic_context(self, pkt_size, value):
1374         self.sut.stop_all()
1375         self.sut.reset_stats()
1376         try:
1377             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1378             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1379             self.sut.set_speed(self.tagged_cores, value)
1380             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1381             self.sut.set_speed(self.plain_cores, value * ratio)
1382             self.sut.start_all()
1383             time.sleep(1)
1384             yield
1385         finally:
1386             self.sut.stop_all()
1387
1388
1389 class ProxBngProfileHelper(ProxProfileHelper):
1390
1391     __prox_profile_type__ = "BNG gen"
1392
1393     def __init__(self, resource_helper):
1394         super(ProxBngProfileHelper, self).__init__(resource_helper)
1395         self._cores_tuple = None
1396
1397     @property
1398     def bng_cores(self):
1399         if not self._cores_tuple:
1400             self._cores_tuple = self.get_cores_gen_bng_qos()
1401         return self._cores_tuple
1402
1403     @property
1404     def cpe_cores(self):
1405         return self.bng_cores[0]
1406
1407     @property
1408     def inet_cores(self):
1409         return self.bng_cores[1]
1410
1411     @property
1412     def arp_cores(self):
1413         return self.bng_cores[2]
1414
1415     @property
1416     def arp_task_cores(self):
1417         return self.bng_cores[3]
1418
1419     @property
1420     def all_rx_cores(self):
1421         return self.latency_cores
1422
1423     def get_cores_gen_bng_qos(self):
1424         cpe_cores = []
1425         inet_cores = []
1426         arp_cores = []
1427         arp_tasks_core = [0]
1428         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1429             if not section_name.startswith("core"):
1430                 continue
1431
1432             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1433                 continue
1434
1435             for item_key, item_value in section:
1436                 if item_key != 'name':
1437                     continue
1438
1439                 if item_value.startswith("cpe"):
1440                     core_tuple = CoreSocketTuple(section_name)
1441                     cpe_core = core_tuple.core_id
1442                     cpe_cores.append(cpe_core)
1443
1444                 elif item_value.startswith("inet"):
1445                     core_tuple = CoreSocketTuple(section_name)
1446                     inet_core = core_tuple.core_id
1447                     inet_cores.append(inet_core)
1448
1449                 elif item_value.startswith("arp"):
1450                     core_tuple = CoreSocketTuple(section_name)
1451                     arp_core = core_tuple.core_id
1452                     arp_cores.append(arp_core)
1453
1454                 # We check the tasks/core separately
1455                 if item_value.startswith("arp_task"):
1456                     core_tuple = CoreSocketTuple(section_name)
1457                     arp_task_core = core_tuple.core_id
1458                     arp_tasks_core.append(arp_task_core)
1459
1460         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1461
1462     @contextmanager
1463     def traffic_context(self, pkt_size, value):
1464         # Tester is sending packets at the required speed already after
1465         # setup_test(). Just get the current statistics, sleep the required
1466         # amount of time and calculate packet loss.
1467         inet_pkt_size = pkt_size
1468         cpe_pkt_size = pkt_size - 24
1469         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1470
1471         curr_up_speed = curr_down_speed = 0
1472         max_up_speed = max_down_speed = value
1473         if ratio < 1:
1474             max_down_speed = value * ratio
1475         else:
1476             max_up_speed = value / ratio
1477
1478         # Initialize cores
1479         self.sut.stop_all()
1480         time.sleep(0.5)
1481
1482         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1483         # wrong.
1484         self.sut.start(self.all_rx_cores)
1485         time.sleep(0.5)
1486         self.sut.stop(self.all_rx_cores)
1487         time.sleep(0.5)
1488         self.sut.reset_stats()
1489
1490         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1491         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1492
1493         self.sut.reset_values(self.cpe_cores)
1494         self.sut.reset_values(self.inet_cores)
1495
1496         # Set correct IP and UDP lengths in packet headers
1497         # CPE
1498         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1499         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1500         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1501         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1502
1503         # INET
1504         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1505         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1506         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1507         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1508         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1509         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1510
1511         # Sending ARP to initialize tables - need a few seconds of generation
1512         # to make sure all CPEs are initialized
1513         LOG.info("Initializing SUT: sending ARP packets")
1514         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1515         self.sut.set_speed(self.inet_cores, curr_up_speed)
1516         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1517         self.sut.start(self.arp_cores)
1518         time.sleep(4)
1519
1520         # Ramp up the transmission speed. First go to the common speed, then
1521         # increase steps for the faster one.
1522         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1523
1524         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1525
1526         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1527             # The min(..., ...) takes care of 1) floating point rounding errors
1528             # that could make curr_*_speed to be slightly greater than
1529             # max_*_speed and 2) max_*_speed not being an exact multiple of
1530             # self._step_delta.
1531             if curr_up_speed < max_up_speed:
1532                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1533             if curr_down_speed < max_down_speed:
1534                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1535
1536             self.sut.set_speed(self.inet_cores, curr_up_speed)
1537             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1538             time.sleep(self.step_time)
1539
1540         LOG.info("Target speeds reached. Starting real test.")
1541
1542         yield
1543
1544         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1545         LOG.info("Test ended. Flushing NIC buffers")
1546         self.sut.start(self.all_rx_cores)
1547         time.sleep(3)
1548         self.sut.stop(self.all_rx_cores)
1549
1550     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1551                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1552         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1553                                      value, tolerated_loss, line_speed)
1554
1555         with data_helper, self.traffic_context(pkt_size,
1556                                                self.pct_10gbps(value, line_speed)):
1557             with data_helper.measure_tot_stats():
1558                 time.sleep(duration)
1559                 # Getting statistics to calculate PPS at right speed....
1560                 data_helper.capture_tsc_hz()
1561                 data_helper.latency = self.get_latency()
1562
1563         return data_helper.result_tuple, data_helper.samples
1564
1565
1566 class ProxVpeProfileHelper(ProxProfileHelper):
1567
1568     __prox_profile_type__ = "vPE gen"
1569
1570     def __init__(self, resource_helper):
1571         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1572         self._cores_tuple = None
1573         self._ports_tuple = None
1574
1575     @property
1576     def vpe_cores(self):
1577         if not self._cores_tuple:
1578             self._cores_tuple = self.get_cores_gen_vpe()
1579         return self._cores_tuple
1580
1581     @property
1582     def cpe_cores(self):
1583         return self.vpe_cores[0]
1584
1585     @property
1586     def inet_cores(self):
1587         return self.vpe_cores[1]
1588
1589     @property
1590     def all_rx_cores(self):
1591         return self.latency_cores
1592
1593     @property
1594     def vpe_ports(self):
1595         if not self._ports_tuple:
1596             self._ports_tuple = self.get_ports_gen_vpe()
1597         return self._ports_tuple
1598
1599     @property
1600     def cpe_ports(self):
1601         return self.vpe_ports[0]
1602
1603     @property
1604     def inet_ports(self):
1605         return self.vpe_ports[1]
1606
1607     def get_cores_gen_vpe(self):
1608         cpe_cores = []
1609         inet_cores = []
1610         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1611             if not section_name.startswith("core"):
1612                 continue
1613
1614             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1615                 continue
1616
1617             for item_key, item_value in section:
1618                 if item_key != 'name':
1619                     continue
1620
1621                 if item_value.startswith("cpe"):
1622                     core_tuple = CoreSocketTuple(section_name)
1623                     core_tag = core_tuple.core_id
1624                     cpe_cores.append(core_tag)
1625
1626                 elif item_value.startswith("inet"):
1627                     core_tuple = CoreSocketTuple(section_name)
1628                     inet_core = core_tuple.core_id
1629                     inet_cores.append(inet_core)
1630
1631         return cpe_cores, inet_cores
1632
1633     def get_ports_gen_vpe(self):
1634         cpe_ports = []
1635         inet_ports = []
1636
1637         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1638             if not section_name.startswith("port"):
1639                 continue
1640             tx_port_iter = re.finditer(r'\d+', section_name)
1641             tx_port_no = int(next(tx_port_iter).group(0))
1642
1643             for item_key, item_value in section:
1644                 if item_key != 'name':
1645                     continue
1646
1647                 if item_value.startswith("cpe"):
1648                     cpe_ports.append(tx_port_no)
1649
1650                 elif item_value.startswith("inet"):
1651                     inet_ports.append(tx_port_no)
1652
1653         return cpe_ports, inet_ports
1654
1655     @contextmanager
1656     def traffic_context(self, pkt_size, value):
1657         # Calculate the target upload and download speed. The upload and
1658         # download packets have different packet sizes, so in order to get
1659         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1660         # of the packet sizes.
1661         cpe_pkt_size = pkt_size
1662         inet_pkt_size = pkt_size - 4
1663         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1664
1665         curr_up_speed = curr_down_speed = 0
1666         max_up_speed = max_down_speed = value
1667         if ratio < 1:
1668             max_down_speed = value * ratio
1669         else:
1670             max_up_speed = value / ratio
1671
1672         # Adjust speed when multiple cores per port are used to generate traffic
1673         if len(self.cpe_ports) != len(self.cpe_cores):
1674             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1675         if len(self.inet_ports) != len(self.inet_cores):
1676             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1677
1678         # Initialize cores
1679         self.sut.stop_all()
1680         time.sleep(2)
1681
1682         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1683         # wrong.
1684         self.sut.start(self.all_rx_cores)
1685         time.sleep(2)
1686         self.sut.stop(self.all_rx_cores)
1687         time.sleep(2)
1688         self.sut.reset_stats()
1689
1690         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1691         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1692
1693         self.sut.reset_values(self.cpe_cores)
1694         self.sut.reset_values(self.inet_cores)
1695
1696         # Set correct IP and UDP lengths in packet headers
1697         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1698         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1699         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1700         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1701
1702         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1703         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1704         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1705         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1706
1707         self.sut.set_speed(self.inet_cores, curr_up_speed)
1708         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1709
1710         # Ramp up the transmission speed. First go to the common speed, then
1711         # increase steps for the faster one.
1712         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1713
1714         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1715
1716         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1717             # The min(..., ...) takes care of 1) floating point rounding errors
1718             # that could make curr_*_speed to be slightly greater than
1719             # max_*_speed and 2) max_*_speed not being an exact multiple of
1720             # self._step_delta.
1721             if curr_up_speed < max_up_speed:
1722                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1723             if curr_down_speed < max_down_speed:
1724                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1725
1726             self.sut.set_speed(self.inet_cores, curr_up_speed)
1727             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1728             time.sleep(self.step_time)
1729
1730         LOG.info("Target speeds reached. Starting real test.")
1731
1732         yield
1733
1734         self.sut.stop(self.cpe_cores + self.inet_cores)
1735         LOG.info("Test ended. Flushing NIC buffers")
1736         self.sut.start(self.all_rx_cores)
1737         time.sleep(3)
1738         self.sut.stop(self.all_rx_cores)
1739
1740     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1741                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1742         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1743                                      value, tolerated_loss, line_speed)
1744
1745         with data_helper, self.traffic_context(pkt_size,
1746                                                self.pct_10gbps(value, line_speed)):
1747             with data_helper.measure_tot_stats():
1748                 time.sleep(duration)
1749                 # Getting statistics to calculate PPS at right speed....
1750                 data_helper.capture_tsc_hz()
1751                 data_helper.latency = self.get_latency()
1752
1753         return data_helper.result_tuple, data_helper.samples
1754
1755
1756 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1757
1758     __prox_profile_type__ = "lwAFTR gen"
1759
1760     def __init__(self, resource_helper):
1761         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1762         self._cores_tuple = None
1763         self._ports_tuple = None
1764         self.step_delta = 5
1765         self.step_time = 0.5
1766
1767     @property
1768     def _lwaftr_cores(self):
1769         if not self._cores_tuple:
1770             self._cores_tuple = self._get_cores_gen_lwaftr()
1771         return self._cores_tuple
1772
1773     @property
1774     def tun_cores(self):
1775         return self._lwaftr_cores[0]
1776
1777     @property
1778     def inet_cores(self):
1779         return self._lwaftr_cores[1]
1780
1781     @property
1782     def _lwaftr_ports(self):
1783         if not self._ports_tuple:
1784             self._ports_tuple = self._get_ports_gen_lw_aftr()
1785         return self._ports_tuple
1786
1787     @property
1788     def tun_ports(self):
1789         return self._lwaftr_ports[0]
1790
1791     @property
1792     def inet_ports(self):
1793         return self._lwaftr_ports[1]
1794
1795     @property
1796     def all_rx_cores(self):
1797         return self.latency_cores
1798
1799     def _get_cores_gen_lwaftr(self):
1800         tun_cores = []
1801         inet_cores = []
1802         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1803             if not section_name.startswith("core"):
1804                 continue
1805
1806             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1807                 continue
1808
1809             core_tuple = CoreSocketTuple(section_name)
1810             core_tag = core_tuple.core_id
1811             for item_value in (v for k, v in section if k == 'name'):
1812                 if item_value.startswith('tun'):
1813                     tun_cores.append(core_tag)
1814                 elif item_value.startswith('inet'):
1815                     inet_cores.append(core_tag)
1816
1817         return tun_cores, inet_cores
1818
1819     def _get_ports_gen_lw_aftr(self):
1820         tun_ports = []
1821         inet_ports = []
1822
1823         re_port = re.compile(r'port (\d+)')
1824         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1825             match = re_port.search(section_name)
1826             if not match:
1827                 continue
1828
1829             tx_port_no = int(match.group(1))
1830             for item_value in (v for k, v in section if k == 'name'):
1831                 if item_value.startswith('lwB4'):
1832                     tun_ports.append(tx_port_no)
1833                 elif item_value.startswith('inet'):
1834                     inet_ports.append(tx_port_no)
1835
1836         return tun_ports, inet_ports
1837
1838     @staticmethod
1839     def _resize(len1, len2):
1840         if len1 == len2:
1841             return 1.0
1842         return 1.0 * len1 / len2
1843
1844     @contextmanager
1845     def traffic_context(self, pkt_size, value):
1846         # Tester is sending packets at the required speed already after
1847         # setup_test(). Just get the current statistics, sleep the required
1848         # amount of time and calculate packet loss.
1849         tun_pkt_size = pkt_size
1850         inet_pkt_size = pkt_size - 40
1851         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1852
1853         curr_up_speed = curr_down_speed = 0
1854         max_up_speed = max_down_speed = value
1855
1856         max_up_speed = value / ratio
1857
1858         # Adjust speed when multiple cores per port are used to generate traffic
1859         if len(self.tun_ports) != len(self.tun_cores):
1860             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1861         if len(self.inet_ports) != len(self.inet_cores):
1862             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1863
1864         # Initialize cores
1865         self.sut.stop_all()
1866         time.sleep(0.5)
1867
1868         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1869         # wrong.
1870         self.sut.start(self.all_rx_cores)
1871         time.sleep(0.5)
1872         self.sut.stop(self.all_rx_cores)
1873         time.sleep(0.5)
1874         self.sut.reset_stats()
1875
1876         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1877         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1878
1879         self.sut.reset_values(self.tun_cores)
1880         self.sut.reset_values(self.inet_cores)
1881
1882         # Set correct IP and UDP lengths in packet headers
1883         # tun
1884         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1885         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1886         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1887         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1888         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1889         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1890
1891         # INET
1892         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1893         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1894         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1895         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1896
1897         LOG.info("Initializing SUT: sending lwAFTR packets")
1898         self.sut.set_speed(self.inet_cores, curr_up_speed)
1899         self.sut.set_speed(self.tun_cores, curr_down_speed)
1900         time.sleep(4)
1901
1902         # Ramp up the transmission speed. First go to the common speed, then
1903         # increase steps for the faster one.
1904         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1905
1906         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1907
1908         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1909             # The min(..., ...) takes care of 1) floating point rounding errors
1910             # that could make curr_*_speed to be slightly greater than
1911             # max_*_speed and 2) max_*_speed not being an exact multiple of
1912             # self._step_delta.
1913             if curr_up_speed < max_up_speed:
1914                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1915             if curr_down_speed < max_down_speed:
1916                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1917
1918             self.sut.set_speed(self.inet_cores, curr_up_speed)
1919             self.sut.set_speed(self.tun_cores, curr_down_speed)
1920             time.sleep(self.step_time)
1921
1922         LOG.info("Target speeds reached. Starting real test.")
1923
1924         yield
1925
1926         self.sut.stop(self.tun_cores + self.inet_cores)
1927         LOG.info("Test ended. Flushing NIC buffers")
1928         self.sut.start(self.all_rx_cores)
1929         time.sleep(3)
1930         self.sut.stop(self.all_rx_cores)
1931
1932     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1933                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1934         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1935                                      value, tolerated_loss, line_speed)
1936
1937         with data_helper, self.traffic_context(pkt_size,
1938                                                self.pct_10gbps(value, line_speed)):
1939             with data_helper.measure_tot_stats():
1940                 time.sleep(duration)
1941                 # Getting statistics to calculate PPS at right speed....
1942                 data_helper.capture_tsc_hz()
1943                 data_helper.latency = self.get_latency()
1944
1945         return data_helper.result_tuple, data_helper.samples