Merge "Resolve NameError in test_utils.py"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import array
16 import io
17 import logging
18 import operator
19 import os
20 import re
21 import select
22 import socket
23 import time
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
28
29 import six
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
32
33 from yardstick.common import utils
34 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
35 from yardstick.network_services.helpers.iniparser import ConfigParser
36 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
38 from yardstick.network_services import constants
39
40 PROX_PORT = 8474
41
42 SECTION_NAME = 0
43 SECTION_CONTENTS = 1
44
45 LOG = logging.getLogger(__name__)
46 LOG.setLevel(logging.DEBUG)
47 LOG_RESULT = logging.getLogger('yardstick')
48 LOG_RESULT.setLevel(logging.DEBUG)
49
50 BITS_PER_BYTE = 8
51 RETRY_SECONDS = 60
52 RETRY_INTERVAL = 1
53
54 CONFIGURATION_OPTIONS = (
55     # dict key           section     key               default value
56     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
57     ('testDuration', 'general', 'test_duration', 5.0),
58     ('testPrecision', 'general', 'test_precision', 1.0),
59     ('tests', 'general', 'tests', None),
60     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
61
62     ('logFile', 'logging', 'file', 'dats.log'),
63     ('logDateFormat', 'logging', 'datefmt', None),
64     ('logLevel', 'logging', 'level', 'INFO'),
65     ('logOverwrite', 'logging', 'overwrite', 1),
66
67     ('testerIp', 'tester', 'ip', None),
68     ('testerUser', 'tester', 'user', 'root'),
69     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
70     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
71     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
72     ('testerSocketId', 'tester', 'socket_id', 0),
73
74     ('sutIp', 'sut', 'ip', None),
75     ('sutUser', 'sut', 'user', 'root'),
76     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
77     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
78     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
79     ('sutSocketId', 'sut', 'socket_id', 0),
80 )
81
82
83 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
84     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
85
86     def __new__(cls, *args):
87         try:
88             matches = cls.CORE_RE.search(str(args[0]))
89             if matches:
90                 args = matches.groups()
91
92             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
93                                                        'h' if args[2] else '')
94
95         except (AttributeError, TypeError, IndexError, ValueError):
96             raise ValueError('Invalid core spec {}'.format(args))
97
98     def is_hyperthread(self):
99         return self.hyperthread == 'h'
100
101     @property
102     def index(self):
103         return int(self.is_hyperthread())
104
105     def find_in_topology(self, cpu_topology):
106         try:
107             socket_core_match = cpu_topology[self.socket_id][self.core_id]
108             sorted_match = sorted(socket_core_match.values())
109             return sorted_match[self.index][0]
110         except (KeyError, IndexError):
111             template = "Core {}{} on socket {} does not exist"
112             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
113
114
115 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
116     def __new__(cls, *args):
117         try:
118             assert args[0] is not str(args[0])
119             args = tuple(args[0])
120         except (AssertionError, IndexError, TypeError):
121             pass
122
123         return super(TotStatsTuple, cls).__new__(cls, *args)
124
125
126 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
127                                                         'delta_tx,delta_tsc,'
128                                                         'latency,rx_total,tx_total,'
129                                                         'requested_pps')):
130     @property
131     def pkt_loss(self):
132         try:
133             return 1e2 * self.drop_total / float(self.tx_total)
134         except ZeroDivisionError:
135             return 100.0
136
137     @property
138     def tx_mpps(self):
139         # calculate the effective throughput in Mpps
140         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
141
142     @property
143     def rx_mpps(self):
144         # calculate the effective throughput in Mpps
145         return float(self.delta_rx) * self.tsc_hz / self.delta_tsc / 1e6
146
147     @property
148     def can_be_lost(self):
149         return int(self.tx_total * self.tolerated / 1e2)
150
151     @property
152     def drop_total(self):
153         return self.tx_total - self.rx_total
154
155     @property
156     def success(self):
157         return self.drop_total <= self.can_be_lost
158
159     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
160         if pkt_loss is None:
161             pkt_loss = self.pkt_loss
162
163         if port_samples is None:
164             port_samples = {}
165
166         latency_keys = [
167             "LatencyMin",
168             "LatencyMax",
169             "LatencyAvg",
170         ]
171
172         samples = {
173             "Throughput": self.rx_mpps,
174             "RxThroughput": self.rx_mpps,
175             "DropPackets": pkt_loss,
176             "CurrentDropPackets": pkt_loss,
177             "RequestedTxThroughput": self.requested_pps / 1e6,
178             "TxThroughput": self.tx_mpps,
179             "PktSize": pkt_size,
180         }
181         if port_samples:
182             samples.update(port_samples)
183
184         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
185         return samples
186
187     def log_data(self, logger=None):
188         if logger is None:
189             logger = LOG_RESULT
190
191         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
192         logger.info(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
193         logger.info("Mpps configured: %f; Mpps generated %f; Mpps received %f",
194                     self.requested_pps / 1e6, self.tx_mpps, self.rx_mpps)
195
196
197 class PacketDump(object):
198     @staticmethod
199     def assert_func(func, value1, value2, template=None):
200         assert func(value1, value2), template.format(value1, value2)
201
202     def __init__(self, port_id, data_len, payload):
203         template = "Packet dump has specified length {}, but payload is {} bytes long"
204         self.assert_func(operator.eq, data_len, len(payload), template)
205         self._port_id = port_id
206         self._data_len = data_len
207         self._payload = payload
208
209     @property
210     def port_id(self):
211         """Get the port id of the packet dump"""
212         return self._port_id
213
214     @property
215     def data_len(self):
216         """Get the length of the data received"""
217         return self._data_len
218
219     def __str__(self):
220         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
221
222     def payload(self, start=None, end=None):
223         """Get part of the payload as a list of ordinals.
224
225         Returns a list of byte values, matching the contents of the packet dump.
226         Optional start and end parameters can be specified to retrieve only a
227         part of the packet contents.
228
229         The number of elements in the list is equal to end - start + 1, so end
230         is the offset of the last character.
231
232         Args:
233             start (pos. int): the starting offset in the payload. If it is not
234                 specified or None, offset 0 is assumed.
235             end (pos. int): the ending offset of the payload. If it is not
236                 specified or None, the contents until the end of the packet are
237                 returned.
238
239         Returns:
240             [int, int, ...]. Each int represents the ordinal value of a byte in
241             the packet payload.
242         """
243         if start is None:
244             start = 0
245
246         if end is None:
247             end = self.data_len - 1
248
249         # Bounds checking on offsets
250         template = "Start offset must be non-negative"
251         self.assert_func(operator.ge, start, 0, template)
252
253         template = "End offset must be less than {1}"
254         self.assert_func(operator.lt, end, self.data_len, template)
255
256         # Adjust for splice operation: end offset must be 1 more than the offset
257         # of the last desired character.
258         end += 1
259
260         return self._payload[start:end]
261
262
263 class ProxSocketHelper(object):
264
265     def __init__(self, sock=None):
266         """ creates new prox instance """
267         super(ProxSocketHelper, self).__init__()
268
269         if sock is None:
270             sock = socket.socket()
271
272         self._sock = sock
273         self._pkt_dumps = []
274         self.master_stats = None
275
276     def connect(self, ip, port):
277         """Connect to the prox instance on the remote system"""
278         self._sock.connect((ip, port))
279
280     def get_socket(self):
281         """ get the socket connected to the remote instance """
282         return self._sock
283
284     def _parse_socket_data(self, decoded_data, pkt_dump_only):
285         def get_newline_index():
286             return decoded_data.find('\n', index)
287
288         ret_str = ''
289         index = 0
290         for newline_index in iter(get_newline_index, -1):
291             ret_str = decoded_data[index:newline_index]
292
293             try:
294                 mode, port_id, data_len = ret_str.split(',', 2)
295             except ValueError:
296                 mode, port_id, data_len = None, None, None
297
298             if mode != 'pktdump':
299                 # Regular 1-line message. Stop reading from the socket.
300                 LOG.debug("Regular response read")
301                 return ret_str, True
302
303             LOG.debug("Packet dump header read: [%s]", ret_str)
304
305             # The line is a packet dump header. Parse it, read the
306             # packet payload, store the dump for later retrieval.
307             # Skip over the packet dump and continue processing: a
308             # 1-line response may follow the packet dump.
309
310             data_len = int(data_len)
311             data_start = newline_index + 1  # + 1 to skip over \n
312             data_end = data_start + data_len
313             sub_data = decoded_data[data_start:data_end]
314             pkt_payload = array.array('B', (ord(v) for v in sub_data))
315             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
316             self._pkt_dumps.append(pkt_dump)
317
318             if pkt_dump_only:
319                 # Return boolean instead of string to signal
320                 # successful reception of the packet dump.
321                 LOG.debug("Packet dump stored, returning")
322                 return True, False
323
324             index = data_end + 1
325
326         return ret_str, False
327
328     def get_data(self, pkt_dump_only=False, timeout=0.01):
329         """ read data from the socket """
330
331         # This method behaves slightly differently depending on whether it is
332         # called to read the response to a command (pkt_dump_only = 0) or if
333         # it is called specifically to read a packet dump (pkt_dump_only = 1).
334         #
335         # Packet dumps look like:
336         #   pktdump,<port_id>,<data_len>\n
337         #   <packet contents as byte array>\n
338         # This means the total packet dump message consists of 2 lines instead
339         # of 1 line.
340         #
341         # - Response for a command (pkt_dump_only = 0):
342         #   1) Read response from the socket until \n (end of message)
343         #   2a) If the response is a packet dump header (starts with "pktdump,"):
344         #     - Read the packet payload and store the packet dump for later
345         #       retrieval.
346         #     - Reset the state and restart from 1). Eventually state 2b) will
347         #       be reached and the function will return.
348         #   2b) If the response is not a packet dump:
349         #     - Return the received message as a string
350         #
351         # - Explicit request to read a packet dump (pkt_dump_only = 1):
352         #   - Read the dump header and payload
353         #   - Store the packet dump for later retrieval
354         #   - Return True to signify a packet dump was successfully read
355
356         def is_ready():
357             # recv() is blocking, so avoid calling it when no data is waiting.
358             ready = select.select([self._sock], [], [], timeout)
359             return bool(ready[0])
360
361         status = False
362         ret_str = ""
363         for status in iter(is_ready, False):
364             decoded_data = self._sock.recv(256).decode('utf-8')
365             ret_str, done = self._parse_socket_data(decoded_data, pkt_dump_only)
366             if (done):
367                 break
368
369         LOG.debug("Received data from socket: [%s]", ret_str)
370         return ret_str if status else ''
371
372     def put_command(self, to_send):
373         """ send data to the remote instance """
374         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
375         try:
376             # NOTE: sendall will block, we need a timeout
377             self._sock.sendall(to_send.encode('utf-8'))
378         except:  # pylint: disable=bare-except
379             pass
380
381     def get_packet_dump(self):
382         """ get the next packet dump """
383         if self._pkt_dumps:
384             return self._pkt_dumps.pop(0)
385         return None
386
387     def stop_all_reset(self):
388         """ stop the remote instance and reset stats """
389         LOG.debug("Stop all and reset stats")
390         self.stop_all()
391         self.reset_stats()
392
393     def stop_all(self):
394         """ stop all cores on the remote instance """
395         LOG.debug("Stop all")
396         self.put_command("stop all\n")
397         time.sleep(3)
398
399     def stop(self, cores, task=''):
400         """ stop specific cores on the remote instance """
401
402         tmpcores = []
403         for core in cores:
404             if core not in tmpcores:
405                 tmpcores.append(core)
406
407         LOG.debug("Stopping cores %s", tmpcores)
408         self.put_command("stop {} {}\n".format(join_non_strings(',', tmpcores), task))
409         time.sleep(3)
410
411     def start_all(self):
412         """ start all cores on the remote instance """
413         LOG.debug("Start all")
414         self.put_command("start all\n")
415
416     def start(self, cores):
417         """ start specific cores on the remote instance """
418
419         tmpcores = []
420         for core in cores:
421             if core not in tmpcores:
422                 tmpcores.append(core)
423
424         LOG.debug("Starting cores %s", tmpcores)
425         self.put_command("start {}\n".format(join_non_strings(',', tmpcores)))
426         time.sleep(3)
427
428     def reset_stats(self):
429         """ reset the statistics on the remote instance """
430         LOG.debug("Reset stats")
431         self.put_command("reset stats\n")
432         time.sleep(1)
433
434     def _run_template_over_cores(self, template, cores, *args):
435         for core in cores:
436             self.put_command(template.format(core, *args))
437
438     def set_pkt_size(self, cores, pkt_size):
439         """ set the packet size to generate on the remote instance """
440         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
441         pkt_size -= 4
442         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
443         time.sleep(1)
444
445     def set_value(self, cores, offset, value, length):
446         """ set value on the remote instance """
447         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
448         LOG.debug(msg, cores, value, length, offset)
449         template = "set value {} 0 {} {} {}\n"
450         self._run_template_over_cores(template, cores, offset, value, length)
451
452     def reset_values(self, cores):
453         """ reset values on the remote instance """
454         LOG.debug("Set value for core(s) %s", cores)
455         self._run_template_over_cores("reset values {} 0\n", cores)
456
457     def set_speed(self, cores, speed, tasks=None):
458         """ set speed on the remote instance """
459         if tasks is None:
460             tasks = [0] * len(cores)
461         elif len(tasks) != len(cores):
462             LOG.error("set_speed: cores and tasks must have the same len")
463         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
464         for (core, task) in list(zip(cores, tasks)):
465             self.put_command("speed {} {} {}\n".format(core, task, speed))
466
467     def slope_speed(self, cores_speed, duration, n_steps=0):
468         """will start to increase speed from 0 to N where N is taken from
469         a['speed'] for each a in cores_speed"""
470         # by default, each step will take 0.5 sec
471         if n_steps == 0:
472             n_steps = duration * 2
473
474         private_core_data = []
475         step_duration = float(duration) / n_steps
476         for core_data in cores_speed:
477             target = float(core_data['speed'])
478             private_core_data.append({
479                 'cores': core_data['cores'],
480                 'zero': 0,
481                 'delta': target / n_steps,
482                 'current': 0,
483                 'speed': target,
484             })
485
486         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
487         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
488             time.sleep(step_duration)
489             for core_data in private_core_data:
490                 core_data['current'] = core_data[key1] + core_data[key2]
491                 self.set_speed(core_data['cores'], core_data['current'])
492
493     def set_pps(self, cores, pps, pkt_size,
494                 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
495         """ set packets per second for specific cores on the remote instance """
496         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
497         LOG.debug(msg, cores, pps, pkt_size)
498
499         # speed in percent of line-rate
500         speed = float(pps) * (pkt_size + 20) / line_speed / BITS_PER_BYTE
501         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
502
503     def lat_stats(self, cores, task=0):
504         """Get the latency statistics from the remote system"""
505         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
506         lat_min = {}
507         lat_max = {}
508         lat_avg = {}
509         for core in cores:
510             self.put_command("lat stats {} {} \n".format(core, task))
511             ret = self.get_data()
512
513             try:
514                 lat_min[core], lat_max[core], lat_avg[core] = \
515                     tuple(int(n) for n in ret.split(",")[:3])
516
517             except (AttributeError, ValueError, TypeError):
518                 pass
519
520         return lat_min, lat_max, lat_avg
521
522     def get_all_tot_stats(self):
523         self.put_command("tot stats\n")
524         all_stats_str = self.get_data().split(",")
525         if len(all_stats_str) != 4:
526             all_stats = [0] * 4
527             return all_stats
528         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
529         self.master_stats = all_stats
530         return all_stats
531
532     def hz(self):
533         return self.get_all_tot_stats()[3]
534
535     def core_stats(self, cores, task=0):
536         """Get the receive statistics from the remote system"""
537         rx = tx = drop = tsc = 0
538         for core in cores:
539             self.put_command("core stats {} {}\n".format(core, task))
540             ret = self.get_data().split(",")
541             rx += int(ret[0])
542             tx += int(ret[1])
543             drop += int(ret[2])
544             tsc = int(ret[3])
545         return rx, tx, drop, tsc
546
547     def multi_port_stats(self, ports):
548         """get counter values from all ports port"""
549
550         ports_str = ""
551         for port in ports:
552             ports_str = ports_str + str(port) + ","
553         ports_str = ports_str[:-1]
554
555         ports_all_data = []
556         tot_result = [0] * len(ports)
557
558         retry_counter = 0
559         port_index = 0
560         while (len(ports) is not len(ports_all_data)) and (retry_counter < 10):
561             self.put_command("multi port stats {}\n".format(ports_str))
562             ports_all_data = self.get_data().split(";")
563
564             if len(ports) is len(ports_all_data):
565                 for port_data_str in ports_all_data:
566
567                     try:
568                         tot_result[port_index] = [try_int(s, 0) for s in port_data_str.split(",")]
569                     except (IndexError, TypeError):
570                         LOG.error("Port Index error %d  %s - retrying ", port_index, port_data_str)
571
572                     if (len(tot_result[port_index]) is not 6) or \
573                                     tot_result[port_index][0] is not ports[port_index]:
574                         ports_all_data = []
575                         tot_result = [0] * len(ports)
576                         port_index = 0
577                         time.sleep(0.1)
578                         LOG.error("Corrupted PACKET %s - retrying", port_data_str)
579                         break
580                     else:
581                         port_index = port_index + 1
582             else:
583                 LOG.error("Empty / too much data - retry -%s-", ports_all_data)
584                 ports_all_data = []
585                 tot_result = [0] * len(ports)
586                 port_index = 0
587                 time.sleep(0.1)
588
589             retry_counter = retry_counter + 1
590         return tot_result
591
592     def port_stats(self, ports):
593         """get counter values from a specific port"""
594         tot_result = [0] * 12
595         for port in ports:
596             self.put_command("port_stats {}\n".format(port))
597             ret = [try_int(s, 0) for s in self.get_data().split(",")]
598             tot_result = [sum(x) for x in zip(tot_result, ret)]
599         return tot_result
600
601     @contextmanager
602     def measure_tot_stats(self):
603         start = self.get_all_tot_stats()
604         container = {'start_tot': start}
605         try:
606             yield container
607         finally:
608             container['end_tot'] = end = self.get_all_tot_stats()
609
610         container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
611
612     def tot_stats(self):
613         """Get the total statistics from the remote system"""
614         stats = self.get_all_tot_stats()
615         return stats[:3]
616
617     def tot_ierrors(self):
618         """Get the total ierrors from the remote system"""
619         self.put_command("tot ierrors tot\n")
620         recv = self.get_data().split(',')
621         tot_ierrors = int(recv[0])
622         tsc = int(recv[0])
623         return tot_ierrors, tsc
624
625     def set_count(self, count, cores):
626         """Set the number of packets to send on the specified core"""
627         self._run_template_over_cores("count {} 0 {}\n", cores, count)
628
629     def dump_rx(self, core_id, task_id=0, count=1):
630         """Activate dump on rx on the specified core"""
631         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
632         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
633         time.sleep(1.5)  # Give PROX time to set up packet dumping
634
635     def quit(self):
636         self.stop_all()
637         self._quit()
638         self.force_quit()
639
640     def _quit(self):
641         """ stop all cores on the remote instance """
642         LOG.debug("Quit prox")
643         self.put_command("quit\n")
644         time.sleep(3)
645
646     def force_quit(self):
647         """ stop all cores on the remote instance """
648         LOG.debug("Force Quit prox")
649         self.put_command("quit_force\n")
650         time.sleep(3)
651
652
653 _LOCAL_OBJECT = object()
654
655
656 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
657     # the actual app is lowercase
658     APP_NAME = 'prox'
659     # not used for Prox but added for consistency
660     VNF_TYPE = "PROX"
661
662     LUA_PARAMETER_NAME = ""
663     LUA_PARAMETER_PEER = {
664         "gen": "sut",
665         "sut": "gen",
666     }
667
668     CONFIG_QUEUE_TIMEOUT = 120
669
670     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
671         self.remote_path = None
672         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
673         self.remote_prox_file_name = None
674         self._prox_config_data = None
675         self.additional_files = {}
676         self.config_queue = Queue()
677         # allow_exit_without_flush
678         self.config_queue.cancel_join_thread()
679         self._global_section = None
680
681     @property
682     def prox_config_data(self):
683         if self._prox_config_data is None:
684             # this will block, but it needs too
685             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
686         return self._prox_config_data
687
688     @property
689     def global_section(self):
690         if self._global_section is None and self.prox_config_data:
691             self._global_section = self.find_section("global")
692         return self._global_section
693
694     def find_section(self, name, default=_LOCAL_OBJECT):
695         result = next((value for key, value in self.prox_config_data if key == name), default)
696         if result is _LOCAL_OBJECT:
697             raise KeyError('{} not found in Prox config'.format(name))
698         return result
699
700     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
701         section = self.find_section(section_name, [])
702         result = next((value for key, value in section if key == section_key), default)
703         if result is _LOCAL_OBJECT:
704             template = '{} not found in {} section of Prox config'
705             raise KeyError(template.format(section_key, section_name))
706         return result
707
708     def copy_to_target(self, config_file_path, prox_file):
709         remote_path = os.path.join("/tmp", prox_file)
710         self.ssh_helper.put(config_file_path, remote_path)
711         return remote_path
712
713     @staticmethod
714     def _get_tx_port(section, sections):
715         iface_port = [-1]
716         for item in sections[section]:
717             if item[0] == "tx port":
718                 iface_port = re.findall(r'\d+', item[1])
719                 # do we want the last one?
720                 #   if yes, then can we reverse?
721         return int(iface_port[0])
722
723     @staticmethod
724     def _replace_quoted_with_value(quoted, value, count=1):
725         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
726         return new_string
727
728     def _insert_additional_file(self, value):
729         file_str = value.split('"')
730         base_name = os.path.basename(file_str[1])
731         file_str[1] = self.additional_files[base_name]
732         return '"'.join(file_str)
733
734     def generate_prox_config_file(self, config_path):
735         sections = []
736         prox_config = ConfigParser(config_path, sections)
737         prox_config.parse()
738
739         # Ensure MAC is set "hardware"
740         all_ports = self.vnfd_helper.port_pairs.all_ports
741         # use dpdk port number
742         for port_name in all_ports:
743             port_num = self.vnfd_helper.port_num(port_name)
744             port_section_name = "port {}".format(port_num)
745             for section_name, section in sections:
746                 if port_section_name != section_name:
747                     continue
748
749                 for section_data in section:
750                     if section_data[0] == "mac":
751                         section_data[1] = "hardware"
752
753         # search for dst mac
754         for _, section in sections:
755             for section_data in section:
756                 item_key, item_val = section_data
757                 if item_val.startswith("@@dst_mac"):
758                     tx_port_iter = re.finditer(r'\d+', item_val)
759                     tx_port_no = int(next(tx_port_iter).group(0))
760                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
761                     mac = intf["virtual-interface"]["dst_mac"]
762                     section_data[1] = mac.replace(":", " ", 6)
763
764                 if item_key == "dst mac" and item_val.startswith("@@"):
765                     tx_port_iter = re.finditer(r'\d+', item_val)
766                     tx_port_no = int(next(tx_port_iter).group(0))
767                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
768                     mac = intf["virtual-interface"]["dst_mac"]
769                     section_data[1] = mac
770
771                 if item_val.startswith("@@src_mac"):
772                     tx_port_iter = re.finditer(r'\d+', item_val)
773                     tx_port_no = int(next(tx_port_iter).group(0))
774                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
775                     mac = intf["virtual-interface"]["local_mac"]
776                     section_data[1] = mac.replace(":", " ", 6)
777
778                 if item_key == "src mac" and item_val.startswith("@@"):
779                     tx_port_iter = re.finditer(r'\d+', item_val)
780                     tx_port_no = int(next(tx_port_iter).group(0))
781                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
782                     mac = intf["virtual-interface"]["local_mac"]
783                     section_data[1] = mac
784
785         # if addition file specified in prox config
786         if not self.additional_files:
787             return sections
788
789         for section_name, section in sections:
790             for section_data in section:
791                 try:
792                     if section_data[0].startswith("dofile"):
793                         section_data[0] = self._insert_additional_file(section_data[0])
794
795                     if section_data[1].startswith("dofile"):
796                         section_data[1] = self._insert_additional_file(section_data[1])
797                 except:  # pylint: disable=bare-except
798                     pass
799
800         return sections
801
802     @staticmethod
803     def write_prox_lua(lua_config):
804         """
805         Write an .ini-format config file for PROX (parameters.lua)
806         PROX does not allow a space before/after the =, so we need
807         a custom method
808         """
809         out = []
810         for key in lua_config:
811             value = '"' + lua_config[key] + '"'
812             if key == "__name__":
813                 continue
814             if value is not None and value != '@':
815                 key = "=".join((key, str(value).replace('\n', '\n\t')))
816                 out.append(key)
817             else:
818                 key = str(key).replace('\n', '\n\t')
819                 out.append(key)
820         return os.linesep.join(out)
821
822     @staticmethod
823     def write_prox_config(prox_config):
824         """
825         Write an .ini-format config file for PROX
826         PROX does not allow a space before/after the =, so we need
827         a custom method
828         """
829         out = []
830         for (section_name, section) in prox_config:
831             out.append("[{}]".format(section_name))
832             for item in section:
833                 key, value = item
834                 if key == "__name__":
835                     continue
836                 if value is not None and value != '@':
837                     key = "=".join((key, str(value).replace('\n', '\n\t')))
838                     out.append(key)
839                 else:
840                     key = str(key).replace('\n', '\n\t')
841                     out.append(key)
842         return os.linesep.join(out)
843
844     def put_string_to_file(self, s, remote_path):
845         file_obj = cStringIO(s)
846         self.ssh_helper.put_file_obj(file_obj, remote_path)
847         return remote_path
848
849     def generate_prox_lua_file(self):
850         p = OrderedDict()
851         all_ports = self.vnfd_helper.port_pairs.all_ports
852         for port_name in all_ports:
853             port_num = self.vnfd_helper.port_num(port_name)
854             intf = self.vnfd_helper.find_interface(name=port_name)
855             vintf = intf['virtual-interface']
856             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
857             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
858
859         return p
860
861     def upload_prox_lua(self, config_file, lua_data):
862         # prox can't handle spaces around ' = ' so use custom method
863         out = StringIO(self.write_prox_lua(lua_data))
864         out.seek(0)
865         remote_path = os.path.join("/tmp", config_file)
866         self.ssh_helper.put_file_obj(out, remote_path)
867
868         return remote_path
869
870     def upload_prox_config(self, config_file, prox_config_data):
871         # prox can't handle spaces around ' = ' so use custom method
872         out = StringIO(self.write_prox_config(prox_config_data))
873         out.seek(0)
874         remote_path = os.path.join("/tmp", config_file)
875         self.ssh_helper.put_file_obj(out, remote_path)
876
877         return remote_path
878
879     def build_config_file(self):
880         task_path = self.scenario_helper.task_path
881         options = self.scenario_helper.options
882         config_path = options['prox_config']
883         config_file = os.path.basename(config_path)
884         config_path = utils.find_relative_file(config_path, task_path)
885         self.additional_files = {}
886
887         try:
888             if options['prox_generate_parameter']:
889                 self.lua = []
890                 self.lua = self.generate_prox_lua_file()
891                 if len(self.lua) > 0:
892                     self.upload_prox_lua("parameters.lua", self.lua)
893         except:  # pylint: disable=bare-except
894             pass
895
896         prox_files = options.get('prox_files', [])
897         if isinstance(prox_files, six.string_types):
898             prox_files = [prox_files]
899         for key_prox_file in prox_files:
900             base_prox_file = os.path.basename(key_prox_file)
901             key_prox_path = utils.find_relative_file(key_prox_file, task_path)
902             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
903             self.additional_files[base_prox_file] = remote_prox_file
904
905         self._prox_config_data = self.generate_prox_config_file(config_path)
906         # copy config to queue so we can read it from traffic_runner process
907         self.config_queue.put(self._prox_config_data)
908         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
909
910     def build_config(self):
911         self.build_config_file()
912
913         options = self.scenario_helper.options
914         prox_args = options['prox_args']
915         tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
916
917         self.pipeline_kwargs = {
918             'tool_path': tool_path,
919             'tool_dir': os.path.dirname(tool_path),
920             'cfg_file': self.remote_path,
921             'args': ' '.join(' '.join([str(k), str(v) if v else ''])
922                              for k, v in prox_args.items())
923         }
924
925         cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
926                         "{args} -f {cfg_file} '")
927         return cmd_template.format(**self.pipeline_kwargs)
928
929
930 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
931 class ProxResourceHelper(ClientResourceHelper):
932
933     RESOURCE_WORD = 'prox'
934
935     PROX_MODE = ""
936
937     WAIT_TIME = 3
938
939     @staticmethod
940     def find_pci(pci, bound_pci):
941         # we have to substring match PCI bus address from the end
942         return any(b.endswith(pci) for b in bound_pci)
943
944     def __init__(self, setup_helper):
945         super(ProxResourceHelper, self).__init__(setup_helper)
946         self.mgmt_interface = self.vnfd_helper.mgmt_interface
947         self._user = self.mgmt_interface["user"]
948         self._ip = self.mgmt_interface["ip"]
949
950         self.done = False
951         self._vpci_to_if_name_map = None
952         self.additional_file = {}
953         self.remote_prox_file_name = None
954         self.lower = None
955         self.upper = None
956         self.step_delta = 1
957         self.step_time = 0.5
958         self._test_type = None
959
960     @property
961     def sut(self):
962         if not self.client:
963             self.client = self._connect()
964         return self.client
965
966     @property
967     def test_type(self):
968         if self._test_type is None:
969             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
970         return self._test_type
971
972     def run_traffic(self, traffic_profile):
973         self._queue.cancel_join_thread()
974         self.lower = 0.0
975         self.upper = 100.0
976
977         traffic_profile.init(self._queue)
978         # this frees up the run_traffic loop
979         self.client_started.value = 1
980
981         while not self._terminated.value:
982             # move it all to traffic_profile
983             self._run_traffic_once(traffic_profile)
984
985     def _run_traffic_once(self, traffic_profile):
986         traffic_profile.execute_traffic(self)
987         if traffic_profile.done:
988             self._queue.put({'done': True})
989             LOG.debug("tg_prox done")
990             self._terminated.value = 1
991
992     # For VNF use ResourceHelper method to collect KPIs directly.
993     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
994     def collect_collectd_kpi(self):
995         return self._collect_resource_kpi()
996
997     def collect_kpi(self):
998         result = super(ProxResourceHelper, self).collect_kpi()
999         # add in collectd kpis manually
1000         if result:
1001             result['collect_stats'] = self._collect_resource_kpi()
1002         return result
1003
1004     def terminate(self):
1005         # should not be called, use VNF terminate
1006         raise NotImplementedError()
1007
1008     def up_post(self):
1009         return self.sut  # force connection
1010
1011     def execute(self, cmd, *args, **kwargs):
1012         func = getattr(self.sut, cmd, None)
1013         if func:
1014             return func(*args, **kwargs)
1015         return None
1016
1017     def _connect(self, client=None):
1018         """Run and connect to prox on the remote system """
1019         # De-allocating a large amount of hugepages takes some time. If a new
1020         # PROX instance is started immediately after killing the previous one,
1021         # it might not be able to allocate hugepages, because they are still
1022         # being freed. Hence the -w switch.
1023         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
1024         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
1025         # -f ./handle_none-4.cfg"
1026         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
1027         #  "; " \
1028         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
1029         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
1030         # sudo " \
1031         #    + "./build/Prox " + prox_args
1032         # log.debug("Starting PROX with command [%s]", prox_cmd)
1033         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
1034         # self._ip, prox_cmd))
1035         if client is None:
1036             client = ProxSocketHelper()
1037
1038         # try connecting to Prox for 60s
1039         for _ in range(RETRY_SECONDS):
1040             time.sleep(RETRY_INTERVAL)
1041             try:
1042                 client.connect(self._ip, PROX_PORT)
1043             except (socket.gaierror, socket.error):
1044                 continue
1045             else:
1046                 return client
1047
1048         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1049         raise Exception(msg.format(self._ip, PROX_PORT))
1050
1051
1052 class ProxDataHelper(object):
1053
1054     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss, line_speed):
1055         super(ProxDataHelper, self).__init__()
1056         self.vnfd_helper = vnfd_helper
1057         self.sut = sut
1058         self.pkt_size = pkt_size
1059         self.value = value
1060         self.line_speed = line_speed
1061         self.tolerated_loss = tolerated_loss
1062         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
1063         self.tsc_hz = None
1064         self.measured_stats = None
1065         self.latency = None
1066         self._totals_and_pps = None
1067         self.result_tuple = None
1068
1069     @property
1070     def totals_and_pps(self):
1071         if self._totals_and_pps is None:
1072             rx_total = tx_total = 0
1073             all_ports = self.sut.multi_port_stats(range(self.port_count))
1074             for port in all_ports:
1075                 rx_total = rx_total + port[1]
1076                 tx_total = tx_total + port[2]
1077             requested_pps = self.value / 100.0 * self.line_rate_to_pps()
1078             self._totals_and_pps = rx_total, tx_total, requested_pps
1079         return self._totals_and_pps
1080
1081     @property
1082     def rx_total(self):
1083         return self.totals_and_pps[0]
1084
1085     @property
1086     def tx_total(self):
1087         return self.totals_and_pps[1]
1088
1089     @property
1090     def requested_pps(self):
1091         return self.totals_and_pps[2]
1092
1093     @property
1094     def samples(self):
1095         samples = {}
1096         ports = []
1097         port_names = []
1098         for port_name, port_num in self.vnfd_helper.ports_iter():
1099             ports.append(port_num)
1100             port_names.append(port_name)
1101
1102         results = self.sut.multi_port_stats(ports)
1103         for result in results:
1104             port_num = result[0]
1105             samples[port_names[port_num]] = {
1106                     "in_packets": result[1],
1107                     "out_packets": result[2]}
1108         return samples
1109
1110     def __enter__(self):
1111         self.check_interface_count()
1112         return self
1113
1114     def __exit__(self, exc_type, exc_val, exc_tb):
1115         self.make_tuple()
1116
1117     def make_tuple(self):
1118         if self.result_tuple:
1119             return
1120
1121         self.result_tuple = ProxTestDataTuple(
1122             self.tolerated_loss,
1123             self.tsc_hz,
1124             self.measured_stats['delta'].rx,
1125             self.measured_stats['delta'].tx,
1126             self.measured_stats['delta'].tsc,
1127             self.latency,
1128             self.rx_total,
1129             self.tx_total,
1130             self.requested_pps,
1131         )
1132         self.result_tuple.log_data()
1133
1134     @contextmanager
1135     def measure_tot_stats(self):
1136         with self.sut.measure_tot_stats() as self.measured_stats:
1137             yield
1138
1139     def check_interface_count(self):
1140         # do this assert in init?  unless we expect interface count to
1141         # change from one run to another run...
1142         assert self.port_count in {1, 2, 4}, \
1143             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1144
1145     def capture_tsc_hz(self):
1146         self.tsc_hz = float(self.sut.hz())
1147
1148     def line_rate_to_pps(self):
1149       return self.port_count * self.line_speed  / BITS_PER_BYTE / (self.pkt_size + 20)
1150
1151 class ProxProfileHelper(object):
1152
1153     __prox_profile_type__ = "Generic"
1154
1155     PROX_CORE_GEN_MODE = "gen"
1156     PROX_CORE_LAT_MODE = "lat"
1157
1158     @classmethod
1159     def get_cls(cls, helper_type):
1160         """Return class of specified type."""
1161         if not helper_type:
1162             return ProxProfileHelper
1163
1164         for profile_helper_class in utils.itersubclasses(cls):
1165             if helper_type == profile_helper_class.__prox_profile_type__:
1166                 return profile_helper_class
1167
1168         return ProxProfileHelper
1169
1170     @classmethod
1171     def make_profile_helper(cls, resource_helper):
1172         return cls.get_cls(resource_helper.test_type)(resource_helper)
1173
1174     def __init__(self, resource_helper):
1175         super(ProxProfileHelper, self).__init__()
1176         self.resource_helper = resource_helper
1177         self._cpu_topology = None
1178         self._test_cores = None
1179         self._latency_cores = None
1180
1181     @property
1182     def cpu_topology(self):
1183         if not self._cpu_topology:
1184             stdout = io.BytesIO()
1185             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1186             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1187         return self._cpu_topology
1188
1189     @property
1190     def test_cores(self):
1191         if not self._test_cores:
1192             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1193         return self._test_cores
1194
1195     @property
1196     def latency_cores(self):
1197         if not self._latency_cores:
1198             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1199         return self._latency_cores
1200
1201     @contextmanager
1202     def traffic_context(self, pkt_size, value):
1203         self.sut.stop_all()
1204         self.sut.reset_stats()
1205         try:
1206             self.sut.set_pkt_size(self.test_cores, pkt_size)
1207             self.sut.set_speed(self.test_cores, value)
1208             self.sut.start_all()
1209             time.sleep(1)
1210             yield
1211         finally:
1212             self.sut.stop_all()
1213
1214     def get_cores(self, mode):
1215         cores = []
1216
1217         for section_name, section in self.setup_helper.prox_config_data:
1218             if not section_name.startswith("core"):
1219                 continue
1220
1221             for key, value in section:
1222                 if key == "mode" and value == mode:
1223                     core_tuple = CoreSocketTuple(section_name)
1224                     core = core_tuple.core_id
1225                     cores.append(core)
1226
1227         return cores
1228
1229     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1230                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1231         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1232                                      value, tolerated_loss, line_speed)
1233
1234         with data_helper, self.traffic_context(pkt_size, value):
1235             with data_helper.measure_tot_stats():
1236                 time.sleep(duration)
1237                 # Getting statistics to calculate PPS at right speed....
1238                 data_helper.capture_tsc_hz()
1239                 data_helper.latency = self.get_latency()
1240
1241         return data_helper.result_tuple, data_helper.samples
1242
1243     def get_latency(self):
1244         """
1245         :return: return lat_min, lat_max, lat_avg
1246         :rtype: list
1247         """
1248
1249         if not self._latency_cores:
1250             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1251
1252         if self._latency_cores:
1253             return self.sut.lat_stats(self._latency_cores)
1254         return []
1255
1256     def terminate(self):
1257         pass
1258
1259     def __getattr__(self, item):
1260         return getattr(self.resource_helper, item)
1261
1262
1263 class ProxMplsProfileHelper(ProxProfileHelper):
1264
1265     __prox_profile_type__ = "MPLS tag/untag"
1266
1267     def __init__(self, resource_helper):
1268         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1269         self._cores_tuple = None
1270
1271     @property
1272     def mpls_cores(self):
1273         if not self._cores_tuple:
1274             self._cores_tuple = self.get_cores_mpls()
1275         return self._cores_tuple
1276
1277     @property
1278     def tagged_cores(self):
1279         return self.mpls_cores[0]
1280
1281     @property
1282     def plain_cores(self):
1283         return self.mpls_cores[1]
1284
1285     def get_cores_mpls(self):
1286         cores_tagged = []
1287         cores_plain = []
1288         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1289             if not section_name.startswith("core"):
1290                 continue
1291
1292             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1293                 continue
1294
1295             for item_key, item_value in section:
1296                 if item_key != 'name':
1297                     continue
1298
1299                 if item_value.startswith("tag"):
1300                     core_tuple = CoreSocketTuple(section_name)
1301                     core_tag = core_tuple.core_id
1302                     cores_tagged.append(core_tag)
1303
1304                 elif item_value.startswith("udp"):
1305                     core_tuple = CoreSocketTuple(section_name)
1306                     core_udp = core_tuple.core_id
1307                     cores_plain.append(core_udp)
1308
1309         return cores_tagged, cores_plain
1310
1311     @contextmanager
1312     def traffic_context(self, pkt_size, value):
1313         self.sut.stop_all()
1314         self.sut.reset_stats()
1315         try:
1316             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1317             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1318             self.sut.set_speed(self.tagged_cores, value)
1319             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1320             self.sut.set_speed(self.plain_cores, value * ratio)
1321             self.sut.start_all()
1322             time.sleep(1)
1323             yield
1324         finally:
1325             self.sut.stop_all()
1326
1327
1328 class ProxBngProfileHelper(ProxProfileHelper):
1329
1330     __prox_profile_type__ = "BNG gen"
1331
1332     def __init__(self, resource_helper):
1333         super(ProxBngProfileHelper, self).__init__(resource_helper)
1334         self._cores_tuple = None
1335
1336     @property
1337     def bng_cores(self):
1338         if not self._cores_tuple:
1339             self._cores_tuple = self.get_cores_gen_bng_qos()
1340         return self._cores_tuple
1341
1342     @property
1343     def cpe_cores(self):
1344         return self.bng_cores[0]
1345
1346     @property
1347     def inet_cores(self):
1348         return self.bng_cores[1]
1349
1350     @property
1351     def arp_cores(self):
1352         return self.bng_cores[2]
1353
1354     @property
1355     def arp_task_cores(self):
1356         return self.bng_cores[3]
1357
1358     @property
1359     def all_rx_cores(self):
1360         return self.latency_cores
1361
1362     def get_cores_gen_bng_qos(self):
1363         cpe_cores = []
1364         inet_cores = []
1365         arp_cores = []
1366         arp_tasks_core = [0]
1367         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1368             if not section_name.startswith("core"):
1369                 continue
1370
1371             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1372                 continue
1373
1374             for item_key, item_value in section:
1375                 if item_key != 'name':
1376                     continue
1377
1378                 if item_value.startswith("cpe"):
1379                     core_tuple = CoreSocketTuple(section_name)
1380                     cpe_core = core_tuple.core_id
1381                     cpe_cores.append(cpe_core)
1382
1383                 elif item_value.startswith("inet"):
1384                     core_tuple = CoreSocketTuple(section_name)
1385                     inet_core = core_tuple.core_id
1386                     inet_cores.append(inet_core)
1387
1388                 elif item_value.startswith("arp"):
1389                     core_tuple = CoreSocketTuple(section_name)
1390                     arp_core = core_tuple.core_id
1391                     arp_cores.append(arp_core)
1392
1393                 # We check the tasks/core separately
1394                 if item_value.startswith("arp_task"):
1395                     core_tuple = CoreSocketTuple(section_name)
1396                     arp_task_core = core_tuple.core_id
1397                     arp_tasks_core.append(arp_task_core)
1398
1399         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1400
1401     @contextmanager
1402     def traffic_context(self, pkt_size, value):
1403         # Tester is sending packets at the required speed already after
1404         # setup_test(). Just get the current statistics, sleep the required
1405         # amount of time and calculate packet loss.
1406         inet_pkt_size = pkt_size
1407         cpe_pkt_size = pkt_size - 24
1408         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1409
1410         curr_up_speed = curr_down_speed = 0
1411         max_up_speed = max_down_speed = value
1412         if ratio < 1:
1413             max_down_speed = value * ratio
1414         else:
1415             max_up_speed = value / ratio
1416
1417         # Initialize cores
1418         self.sut.stop_all()
1419         time.sleep(0.5)
1420
1421         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1422         # wrong.
1423         self.sut.start(self.all_rx_cores)
1424         time.sleep(0.5)
1425         self.sut.stop(self.all_rx_cores)
1426         time.sleep(0.5)
1427         self.sut.reset_stats()
1428
1429         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1430         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1431
1432         self.sut.reset_values(self.cpe_cores)
1433         self.sut.reset_values(self.inet_cores)
1434
1435         # Set correct IP and UDP lengths in packet headers
1436         # CPE
1437         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1438         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1439         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1440         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1441
1442         # INET
1443         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1444         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1445         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1446         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1447         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1448         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1449
1450         # Sending ARP to initialize tables - need a few seconds of generation
1451         # to make sure all CPEs are initialized
1452         LOG.info("Initializing SUT: sending ARP packets")
1453         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1454         self.sut.set_speed(self.inet_cores, curr_up_speed)
1455         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1456         self.sut.start(self.arp_cores)
1457         time.sleep(4)
1458
1459         # Ramp up the transmission speed. First go to the common speed, then
1460         # increase steps for the faster one.
1461         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1462
1463         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1464
1465         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1466             # The min(..., ...) takes care of 1) floating point rounding errors
1467             # that could make curr_*_speed to be slightly greater than
1468             # max_*_speed and 2) max_*_speed not being an exact multiple of
1469             # self._step_delta.
1470             if curr_up_speed < max_up_speed:
1471                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1472             if curr_down_speed < max_down_speed:
1473                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1474
1475             self.sut.set_speed(self.inet_cores, curr_up_speed)
1476             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1477             time.sleep(self.step_time)
1478
1479         LOG.info("Target speeds reached. Starting real test.")
1480
1481         yield
1482
1483         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1484         LOG.info("Test ended. Flushing NIC buffers")
1485         self.sut.start(self.all_rx_cores)
1486         time.sleep(3)
1487         self.sut.stop(self.all_rx_cores)
1488
1489     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1490                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1491         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1492                                      value, tolerated_loss, line_speed)
1493
1494         with data_helper, self.traffic_context(pkt_size, value):
1495             with data_helper.measure_tot_stats():
1496                 time.sleep(duration)
1497                 # Getting statistics to calculate PPS at right speed....
1498                 data_helper.capture_tsc_hz()
1499                 data_helper.latency = self.get_latency()
1500
1501         return data_helper.result_tuple, data_helper.samples
1502
1503
1504 class ProxVpeProfileHelper(ProxProfileHelper):
1505
1506     __prox_profile_type__ = "vPE gen"
1507
1508     def __init__(self, resource_helper):
1509         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1510         self._cores_tuple = None
1511         self._ports_tuple = None
1512
1513     @property
1514     def vpe_cores(self):
1515         if not self._cores_tuple:
1516             self._cores_tuple = self.get_cores_gen_vpe()
1517         return self._cores_tuple
1518
1519     @property
1520     def cpe_cores(self):
1521         return self.vpe_cores[0]
1522
1523     @property
1524     def inet_cores(self):
1525         return self.vpe_cores[1]
1526
1527     @property
1528     def all_rx_cores(self):
1529         return self.latency_cores
1530
1531     @property
1532     def vpe_ports(self):
1533         if not self._ports_tuple:
1534             self._ports_tuple = self.get_ports_gen_vpe()
1535         return self._ports_tuple
1536
1537     @property
1538     def cpe_ports(self):
1539         return self.vpe_ports[0]
1540
1541     @property
1542     def inet_ports(self):
1543         return self.vpe_ports[1]
1544
1545     def get_cores_gen_vpe(self):
1546         cpe_cores = []
1547         inet_cores = []
1548         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1549             if not section_name.startswith("core"):
1550                 continue
1551
1552             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1553                 continue
1554
1555             for item_key, item_value in section:
1556                 if item_key != 'name':
1557                     continue
1558
1559                 if item_value.startswith("cpe"):
1560                     core_tuple = CoreSocketTuple(section_name)
1561                     core_tag = core_tuple.core_id
1562                     cpe_cores.append(core_tag)
1563
1564                 elif item_value.startswith("inet"):
1565                     core_tuple = CoreSocketTuple(section_name)
1566                     inet_core = core_tuple.core_id
1567                     inet_cores.append(inet_core)
1568
1569         return cpe_cores, inet_cores
1570
1571     def get_ports_gen_vpe(self):
1572         cpe_ports = []
1573         inet_ports = []
1574
1575         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1576             if not section_name.startswith("port"):
1577                 continue
1578             tx_port_iter = re.finditer(r'\d+', section_name)
1579             tx_port_no = int(next(tx_port_iter).group(0))
1580
1581             for item_key, item_value in section:
1582                 if item_key != 'name':
1583                     continue
1584
1585                 if item_value.startswith("cpe"):
1586                     cpe_ports.append(tx_port_no)
1587
1588                 elif item_value.startswith("inet"):
1589                     inet_ports.append(tx_port_no)
1590
1591         return cpe_ports, inet_ports
1592
1593     @contextmanager
1594     def traffic_context(self, pkt_size, value):
1595         # Calculate the target upload and download speed. The upload and
1596         # download packets have different packet sizes, so in order to get
1597         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1598         # of the packet sizes.
1599         cpe_pkt_size = pkt_size
1600         inet_pkt_size = pkt_size - 4
1601         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1602
1603         curr_up_speed = curr_down_speed = 0
1604         max_up_speed = max_down_speed = value
1605         if ratio < 1:
1606             max_down_speed = value * ratio
1607         else:
1608             max_up_speed = value / ratio
1609
1610         # Adjust speed when multiple cores per port are used to generate traffic
1611         if len(self.cpe_ports) != len(self.cpe_cores):
1612             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1613         if len(self.inet_ports) != len(self.inet_cores):
1614             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1615
1616         # Initialize cores
1617         self.sut.stop_all()
1618         time.sleep(2)
1619
1620         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1621         # wrong.
1622         self.sut.start(self.all_rx_cores)
1623         time.sleep(2)
1624         self.sut.stop(self.all_rx_cores)
1625         time.sleep(2)
1626         self.sut.reset_stats()
1627
1628         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1629         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1630
1631         self.sut.reset_values(self.cpe_cores)
1632         self.sut.reset_values(self.inet_cores)
1633
1634         # Set correct IP and UDP lengths in packet headers
1635         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1636         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1637         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1638         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1639
1640         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1641         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1642         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1643         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1644
1645         self.sut.set_speed(self.inet_cores, curr_up_speed)
1646         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1647
1648         # Ramp up the transmission speed. First go to the common speed, then
1649         # increase steps for the faster one.
1650         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1651
1652         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1653
1654         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1655             # The min(..., ...) takes care of 1) floating point rounding errors
1656             # that could make curr_*_speed to be slightly greater than
1657             # max_*_speed and 2) max_*_speed not being an exact multiple of
1658             # self._step_delta.
1659             if curr_up_speed < max_up_speed:
1660                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1661             if curr_down_speed < max_down_speed:
1662                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1663
1664             self.sut.set_speed(self.inet_cores, curr_up_speed)
1665             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1666             time.sleep(self.step_time)
1667
1668         LOG.info("Target speeds reached. Starting real test.")
1669
1670         yield
1671
1672         self.sut.stop(self.cpe_cores + self.inet_cores)
1673         LOG.info("Test ended. Flushing NIC buffers")
1674         self.sut.start(self.all_rx_cores)
1675         time.sleep(3)
1676         self.sut.stop(self.all_rx_cores)
1677
1678     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1679                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1680         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1681                                      value, tolerated_loss, line_speed)
1682
1683         with data_helper, self.traffic_context(pkt_size, value):
1684             with data_helper.measure_tot_stats():
1685                 time.sleep(duration)
1686                 # Getting statistics to calculate PPS at right speed....
1687                 data_helper.capture_tsc_hz()
1688                 data_helper.latency = self.get_latency()
1689
1690         return data_helper.result_tuple, data_helper.samples
1691
1692
1693 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1694
1695     __prox_profile_type__ = "lwAFTR gen"
1696
1697     def __init__(self, resource_helper):
1698         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1699         self._cores_tuple = None
1700         self._ports_tuple = None
1701         self.step_delta = 5
1702         self.step_time = 0.5
1703
1704     @property
1705     def _lwaftr_cores(self):
1706         if not self._cores_tuple:
1707             self._cores_tuple = self._get_cores_gen_lwaftr()
1708         return self._cores_tuple
1709
1710     @property
1711     def tun_cores(self):
1712         return self._lwaftr_cores[0]
1713
1714     @property
1715     def inet_cores(self):
1716         return self._lwaftr_cores[1]
1717
1718     @property
1719     def _lwaftr_ports(self):
1720         if not self._ports_tuple:
1721             self._ports_tuple = self._get_ports_gen_lw_aftr()
1722         return self._ports_tuple
1723
1724     @property
1725     def tun_ports(self):
1726         return self._lwaftr_ports[0]
1727
1728     @property
1729     def inet_ports(self):
1730         return self._lwaftr_ports[1]
1731
1732     @property
1733     def all_rx_cores(self):
1734         return self.latency_cores
1735
1736     def _get_cores_gen_lwaftr(self):
1737         tun_cores = []
1738         inet_cores = []
1739         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1740             if not section_name.startswith("core"):
1741                 continue
1742
1743             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1744                 continue
1745
1746             core_tuple = CoreSocketTuple(section_name)
1747             core_tag = core_tuple.core_id
1748             for item_value in (v for k, v in section if k == 'name'):
1749                 if item_value.startswith('tun'):
1750                     tun_cores.append(core_tag)
1751                 elif item_value.startswith('inet'):
1752                     inet_cores.append(core_tag)
1753
1754         return tun_cores, inet_cores
1755
1756     def _get_ports_gen_lw_aftr(self):
1757         tun_ports = []
1758         inet_ports = []
1759
1760         re_port = re.compile(r'port (\d+)')
1761         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1762             match = re_port.search(section_name)
1763             if not match:
1764                 continue
1765
1766             tx_port_no = int(match.group(1))
1767             for item_value in (v for k, v in section if k == 'name'):
1768                 if item_value.startswith('lwB4'):
1769                     tun_ports.append(tx_port_no)
1770                 elif item_value.startswith('inet'):
1771                     inet_ports.append(tx_port_no)
1772
1773         return tun_ports, inet_ports
1774
1775     @staticmethod
1776     def _resize(len1, len2):
1777         if len1 == len2:
1778             return 1.0
1779         return 1.0 * len1 / len2
1780
1781     @contextmanager
1782     def traffic_context(self, pkt_size, value):
1783         # Tester is sending packets at the required speed already after
1784         # setup_test(). Just get the current statistics, sleep the required
1785         # amount of time and calculate packet loss.
1786         tun_pkt_size = pkt_size
1787         inet_pkt_size = pkt_size - 40
1788         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1789
1790         curr_up_speed = curr_down_speed = 0
1791         max_up_speed = max_down_speed = value
1792
1793         max_up_speed = value / ratio
1794
1795         # Adjust speed when multiple cores per port are used to generate traffic
1796         if len(self.tun_ports) != len(self.tun_cores):
1797             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1798         if len(self.inet_ports) != len(self.inet_cores):
1799             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1800
1801         # Initialize cores
1802         self.sut.stop_all()
1803         time.sleep(0.5)
1804
1805         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1806         # wrong.
1807         self.sut.start(self.all_rx_cores)
1808         time.sleep(0.5)
1809         self.sut.stop(self.all_rx_cores)
1810         time.sleep(0.5)
1811         self.sut.reset_stats()
1812
1813         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1814         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1815
1816         self.sut.reset_values(self.tun_cores)
1817         self.sut.reset_values(self.inet_cores)
1818
1819         # Set correct IP and UDP lengths in packet headers
1820         # tun
1821         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1822         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1823         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1824         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1825         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1826         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1827
1828         # INET
1829         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1830         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1831         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1832         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1833
1834         LOG.info("Initializing SUT: sending lwAFTR packets")
1835         self.sut.set_speed(self.inet_cores, curr_up_speed)
1836         self.sut.set_speed(self.tun_cores, curr_down_speed)
1837         time.sleep(4)
1838
1839         # Ramp up the transmission speed. First go to the common speed, then
1840         # increase steps for the faster one.
1841         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1842
1843         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1844
1845         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1846             # The min(..., ...) takes care of 1) floating point rounding errors
1847             # that could make curr_*_speed to be slightly greater than
1848             # max_*_speed and 2) max_*_speed not being an exact multiple of
1849             # self._step_delta.
1850             if curr_up_speed < max_up_speed:
1851                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1852             if curr_down_speed < max_down_speed:
1853                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1854
1855             self.sut.set_speed(self.inet_cores, curr_up_speed)
1856             self.sut.set_speed(self.tun_cores, curr_down_speed)
1857             time.sleep(self.step_time)
1858
1859         LOG.info("Target speeds reached. Starting real test.")
1860
1861         yield
1862
1863         self.sut.stop(self.tun_cores + self.inet_cores)
1864         LOG.info("Test ended. Flushing NIC buffers")
1865         self.sut.start(self.all_rx_cores)
1866         time.sleep(3)
1867         self.sut.stop(self.all_rx_cores)
1868
1869     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1870                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1871         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1872                                      value, tolerated_loss, line_speed)
1873
1874         with data_helper, self.traffic_context(pkt_size, value):
1875             with data_helper.measure_tot_stats():
1876                 time.sleep(duration)
1877                 # Getting statistics to calculate PPS at right speed....
1878                 data_helper.capture_tsc_hz()
1879                 data_helper.latency = self.get_latency()
1880
1881         return data_helper.result_tuple, data_helper.samples