Merge "Use context name instead of key_uuid"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import array
16 import io
17 import logging
18 import operator
19 import os
20 import re
21 import select
22 import socket
23 import time
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
28
29 import six
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
32
33 from yardstick.common import utils
34 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
35 from yardstick.network_services.helpers.iniparser import ConfigParser
36 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
38
39 PROX_PORT = 8474
40
41 SECTION_NAME = 0
42 SECTION_CONTENTS = 1
43
44 LOG = logging.getLogger(__name__)
45 LOG.setLevel(logging.DEBUG)
46
47 TEN_GIGABIT = 1e10
48 BITS_PER_BYTE = 8
49 RETRY_SECONDS = 60
50 RETRY_INTERVAL = 1
51
52 CONFIGURATION_OPTIONS = (
53     # dict key           section     key               default value
54     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
55     ('testDuration', 'general', 'test_duration', 5.0),
56     ('testPrecision', 'general', 'test_precision', 1.0),
57     ('tests', 'general', 'tests', None),
58     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
59
60     ('logFile', 'logging', 'file', 'dats.log'),
61     ('logDateFormat', 'logging', 'datefmt', None),
62     ('logLevel', 'logging', 'level', 'INFO'),
63     ('logOverwrite', 'logging', 'overwrite', 1),
64
65     ('testerIp', 'tester', 'ip', None),
66     ('testerUser', 'tester', 'user', 'root'),
67     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
68     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
69     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
70     ('testerSocketId', 'tester', 'socket_id', 0),
71
72     ('sutIp', 'sut', 'ip', None),
73     ('sutUser', 'sut', 'user', 'root'),
74     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
75     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
76     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
77     ('sutSocketId', 'sut', 'socket_id', 0),
78 )
79
80
81 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
82     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
83
84     def __new__(cls, *args):
85         try:
86             matches = cls.CORE_RE.search(str(args[0]))
87             if matches:
88                 args = matches.groups()
89
90             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
91                                                        'h' if args[2] else '')
92
93         except (AttributeError, TypeError, IndexError, ValueError):
94             raise ValueError('Invalid core spec {}'.format(args))
95
96     def is_hyperthread(self):
97         return self.hyperthread == 'h'
98
99     @property
100     def index(self):
101         return int(self.is_hyperthread())
102
103     def find_in_topology(self, cpu_topology):
104         try:
105             socket_core_match = cpu_topology[self.socket_id][self.core_id]
106             sorted_match = sorted(socket_core_match.values())
107             return sorted_match[self.index][0]
108         except (KeyError, IndexError):
109             template = "Core {}{} on socket {} does not exist"
110             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
111
112
113 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
114     def __new__(cls, *args):
115         try:
116             assert args[0] is not str(args[0])
117             args = tuple(args[0])
118         except (AssertionError, IndexError, TypeError):
119             pass
120
121         return super(TotStatsTuple, cls).__new__(cls, *args)
122
123
124 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
125                                                         'delta_tx,delta_tsc,'
126                                                         'latency,rx_total,tx_total,pps')):
127     @property
128     def pkt_loss(self):
129         try:
130             return 1e2 * self.drop_total / float(self.tx_total)
131         except ZeroDivisionError:
132             return 100.0
133
134     @property
135     def mpps(self):
136         # calculate the effective throughput in Mpps
137         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
138
139     @property
140     def can_be_lost(self):
141         return int(self.tx_total * self.tolerated / 1e2)
142
143     @property
144     def drop_total(self):
145         return self.tx_total - self.rx_total
146
147     @property
148     def success(self):
149         return self.drop_total <= self.can_be_lost
150
151     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
152         if pkt_loss is None:
153             pkt_loss = self.pkt_loss
154
155         if port_samples is None:
156             port_samples = {}
157
158         latency_keys = [
159             "LatencyMin",
160             "LatencyMax",
161             "LatencyAvg",
162         ]
163
164         samples = {
165             "Throughput": self.mpps,
166             "DropPackets": pkt_loss,
167             "CurrentDropPackets": pkt_loss,
168             "TxThroughput": self.pps / 1e6,
169             "RxThroughput": self.mpps,
170             "PktSize": pkt_size,
171         }
172         if port_samples:
173             samples.update(port_samples)
174
175         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
176         return samples
177
178     def log_data(self, logger=None):
179         if logger is None:
180             logger = LOG
181
182         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
183         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
184         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
185
186
187 class PacketDump(object):
188     @staticmethod
189     def assert_func(func, value1, value2, template=None):
190         assert func(value1, value2), template.format(value1, value2)
191
192     def __init__(self, port_id, data_len, payload):
193         template = "Packet dump has specified length {}, but payload is {} bytes long"
194         self.assert_func(operator.eq, data_len, len(payload), template)
195         self._port_id = port_id
196         self._data_len = data_len
197         self._payload = payload
198
199     @property
200     def port_id(self):
201         """Get the port id of the packet dump"""
202         return self._port_id
203
204     @property
205     def data_len(self):
206         """Get the length of the data received"""
207         return self._data_len
208
209     def __str__(self):
210         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
211
212     def payload(self, start=None, end=None):
213         """Get part of the payload as a list of ordinals.
214
215         Returns a list of byte values, matching the contents of the packet dump.
216         Optional start and end parameters can be specified to retrieve only a
217         part of the packet contents.
218
219         The number of elements in the list is equal to end - start + 1, so end
220         is the offset of the last character.
221
222         Args:
223             start (pos. int): the starting offset in the payload. If it is not
224                 specified or None, offset 0 is assumed.
225             end (pos. int): the ending offset of the payload. If it is not
226                 specified or None, the contents until the end of the packet are
227                 returned.
228
229         Returns:
230             [int, int, ...]. Each int represents the ordinal value of a byte in
231             the packet payload.
232         """
233         if start is None:
234             start = 0
235
236         if end is None:
237             end = self.data_len - 1
238
239         # Bounds checking on offsets
240         template = "Start offset must be non-negative"
241         self.assert_func(operator.ge, start, 0, template)
242
243         template = "End offset must be less than {1}"
244         self.assert_func(operator.lt, end, self.data_len, template)
245
246         # Adjust for splice operation: end offset must be 1 more than the offset
247         # of the last desired character.
248         end += 1
249
250         return self._payload[start:end]
251
252
253 class ProxSocketHelper(object):
254
255     def __init__(self, sock=None):
256         """ creates new prox instance """
257         super(ProxSocketHelper, self).__init__()
258
259         if sock is None:
260             sock = socket.socket()
261
262         self._sock = sock
263         self._pkt_dumps = []
264         self.master_stats = None
265
266     def connect(self, ip, port):
267         """Connect to the prox instance on the remote system"""
268         self._sock.connect((ip, port))
269
270     def get_socket(self):
271         """ get the socket connected to the remote instance """
272         return self._sock
273
274     def _parse_socket_data(self, decoded_data, pkt_dump_only):
275         def get_newline_index():
276             return decoded_data.find('\n', index)
277
278         ret_str = ''
279         index = 0
280         for newline_index in iter(get_newline_index, -1):
281             ret_str = decoded_data[index:newline_index]
282
283             try:
284                 mode, port_id, data_len = ret_str.split(',', 2)
285             except ValueError:
286                 mode, port_id, data_len = None, None, None
287
288             if mode != 'pktdump':
289                 # Regular 1-line message. Stop reading from the socket.
290                 LOG.debug("Regular response read")
291                 return ret_str
292
293             LOG.debug("Packet dump header read: [%s]", ret_str)
294
295             # The line is a packet dump header. Parse it, read the
296             # packet payload, store the dump for later retrieval.
297             # Skip over the packet dump and continue processing: a
298             # 1-line response may follow the packet dump.
299
300             data_len = int(data_len)
301             data_start = newline_index + 1  # + 1 to skip over \n
302             data_end = data_start + data_len
303             sub_data = decoded_data[data_start:data_end]
304             pkt_payload = array.array('B', (ord(v) for v in sub_data))
305             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
306             self._pkt_dumps.append(pkt_dump)
307
308             if pkt_dump_only:
309                 # Return boolean instead of string to signal
310                 # successful reception of the packet dump.
311                 LOG.debug("Packet dump stored, returning")
312                 return True
313
314             index = data_end + 1
315
316         return ret_str
317
318     def get_data(self, pkt_dump_only=False, timeout=1):
319         """ read data from the socket """
320
321         # This method behaves slightly differently depending on whether it is
322         # called to read the response to a command (pkt_dump_only = 0) or if
323         # it is called specifically to read a packet dump (pkt_dump_only = 1).
324         #
325         # Packet dumps look like:
326         #   pktdump,<port_id>,<data_len>\n
327         #   <packet contents as byte array>\n
328         # This means the total packet dump message consists of 2 lines instead
329         # of 1 line.
330         #
331         # - Response for a command (pkt_dump_only = 0):
332         #   1) Read response from the socket until \n (end of message)
333         #   2a) If the response is a packet dump header (starts with "pktdump,"):
334         #     - Read the packet payload and store the packet dump for later
335         #       retrieval.
336         #     - Reset the state and restart from 1). Eventually state 2b) will
337         #       be reached and the function will return.
338         #   2b) If the response is not a packet dump:
339         #     - Return the received message as a string
340         #
341         # - Explicit request to read a packet dump (pkt_dump_only = 1):
342         #   - Read the dump header and payload
343         #   - Store the packet dump for later retrieval
344         #   - Return True to signify a packet dump was successfully read
345
346         def is_ready():
347             # recv() is blocking, so avoid calling it when no data is waiting.
348             ready = select.select([self._sock], [], [], timeout)
349             return bool(ready[0])
350
351         status = False
352         ret_str = ""
353         for status in iter(is_ready, False):
354             decoded_data = self._sock.recv(256).decode('utf-8')
355             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
356
357         LOG.debug("Received data from socket: [%s]", ret_str)
358         return ret_str if status else ''
359
360     def put_command(self, to_send):
361         """ send data to the remote instance """
362         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
363         try:
364             # NOTE: sendall will block, we need a timeout
365             self._sock.sendall(to_send.encode('utf-8'))
366         except:  # pylint: disable=bare-except
367             pass
368
369     def get_packet_dump(self):
370         """ get the next packet dump """
371         if self._pkt_dumps:
372             return self._pkt_dumps.pop(0)
373         return None
374
375     def stop_all_reset(self):
376         """ stop the remote instance and reset stats """
377         LOG.debug("Stop all and reset stats")
378         self.stop_all()
379         self.reset_stats()
380
381     def stop_all(self):
382         """ stop all cores on the remote instance """
383         LOG.debug("Stop all")
384         self.put_command("stop all\n")
385         time.sleep(3)
386
387     def stop(self, cores, task=''):
388         """ stop specific cores on the remote instance """
389         LOG.debug("Stopping cores %s", cores)
390         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
391         time.sleep(3)
392
393     def start_all(self):
394         """ start all cores on the remote instance """
395         LOG.debug("Start all")
396         self.put_command("start all\n")
397
398     def start(self, cores):
399         """ start specific cores on the remote instance """
400         LOG.debug("Starting cores %s", cores)
401         self.put_command("start {}\n".format(join_non_strings(',', cores)))
402         time.sleep(3)
403
404     def reset_stats(self):
405         """ reset the statistics on the remote instance """
406         LOG.debug("Reset stats")
407         self.put_command("reset stats\n")
408         time.sleep(1)
409
410     def _run_template_over_cores(self, template, cores, *args):
411         for core in cores:
412             self.put_command(template.format(core, *args))
413
414     def set_pkt_size(self, cores, pkt_size):
415         """ set the packet size to generate on the remote instance """
416         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
417         pkt_size -= 4
418         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
419         time.sleep(1)
420
421     def set_value(self, cores, offset, value, length):
422         """ set value on the remote instance """
423         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
424         LOG.debug(msg, cores, value, length, offset)
425         template = "set value {} 0 {} {} {}\n"
426         self._run_template_over_cores(template, cores, offset, value, length)
427
428     def reset_values(self, cores):
429         """ reset values on the remote instance """
430         LOG.debug("Set value for core(s) %s", cores)
431         self._run_template_over_cores("reset values {} 0\n", cores)
432
433     def set_speed(self, cores, speed, tasks=None):
434         """ set speed on the remote instance """
435         if tasks is None:
436             tasks = [0] * len(cores)
437         elif len(tasks) != len(cores):
438             LOG.error("set_speed: cores and tasks must have the same len")
439         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
440         for (core, task) in list(zip(cores, tasks)):
441             self.put_command("speed {} {} {}\n".format(core, task, speed))
442
443     def slope_speed(self, cores_speed, duration, n_steps=0):
444         """will start to increase speed from 0 to N where N is taken from
445         a['speed'] for each a in cores_speed"""
446         # by default, each step will take 0.5 sec
447         if n_steps == 0:
448             n_steps = duration * 2
449
450         private_core_data = []
451         step_duration = float(duration) / n_steps
452         for core_data in cores_speed:
453             target = float(core_data['speed'])
454             private_core_data.append({
455                 'cores': core_data['cores'],
456                 'zero': 0,
457                 'delta': target / n_steps,
458                 'current': 0,
459                 'speed': target,
460             })
461
462         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
463         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
464             time.sleep(step_duration)
465             for core_data in private_core_data:
466                 core_data['current'] = core_data[key1] + core_data[key2]
467                 self.set_speed(core_data['cores'], core_data['current'])
468
469     def set_pps(self, cores, pps, pkt_size):
470         """ set packets per second for specific cores on the remote instance """
471         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
472         LOG.debug(msg, cores, pps, pkt_size)
473
474         # speed in percent of line-rate
475         speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
476         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
477
478     def lat_stats(self, cores, task=0):
479         """Get the latency statistics from the remote system"""
480         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
481         lat_min = {}
482         lat_max = {}
483         lat_avg = {}
484         for core in cores:
485             self.put_command("lat stats {} {} \n".format(core, task))
486             ret = self.get_data()
487
488             try:
489                 lat_min[core], lat_max[core], lat_avg[core] = \
490                     tuple(int(n) for n in ret.split(",")[:3])
491
492             except (AttributeError, ValueError, TypeError):
493                 pass
494
495         return lat_min, lat_max, lat_avg
496
497     def get_all_tot_stats(self):
498         self.put_command("tot stats\n")
499         all_stats_str = self.get_data().split(",")
500         if len(all_stats_str) != 4:
501             all_stats = [0] * 4
502             return all_stats
503         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
504         self.master_stats = all_stats
505         return all_stats
506
507     def hz(self):
508         return self.get_all_tot_stats()[3]
509
510     def core_stats(self, cores, task=0):
511         """Get the receive statistics from the remote system"""
512         rx = tx = drop = tsc = 0
513         for core in cores:
514             self.put_command("core stats {} {}\n".format(core, task))
515             ret = self.get_data().split(",")
516             rx += int(ret[0])
517             tx += int(ret[1])
518             drop += int(ret[2])
519             tsc = int(ret[3])
520         return rx, tx, drop, tsc
521
522     def port_stats(self, ports):
523         """get counter values from a specific port"""
524         tot_result = [0] * 12
525         for port in ports:
526             self.put_command("port_stats {}\n".format(port))
527             ret = [try_int(s, 0) for s in self.get_data().split(",")]
528             tot_result = [sum(x) for x in zip(tot_result, ret)]
529         return tot_result
530
531     @contextmanager
532     def measure_tot_stats(self):
533         start = self.get_all_tot_stats()
534         container = {'start_tot': start}
535         try:
536             yield container
537         finally:
538             container['end_tot'] = end = self.get_all_tot_stats()
539
540         container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
541
542     def tot_stats(self):
543         """Get the total statistics from the remote system"""
544         stats = self.get_all_tot_stats()
545         return stats[:3]
546
547     def tot_ierrors(self):
548         """Get the total ierrors from the remote system"""
549         self.put_command("tot ierrors tot\n")
550         recv = self.get_data().split(',')
551         tot_ierrors = int(recv[0])
552         tsc = int(recv[0])
553         return tot_ierrors, tsc
554
555     def set_count(self, count, cores):
556         """Set the number of packets to send on the specified core"""
557         self._run_template_over_cores("count {} 0 {}\n", cores, count)
558
559     def dump_rx(self, core_id, task_id=0, count=1):
560         """Activate dump on rx on the specified core"""
561         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
562         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
563         time.sleep(1.5)  # Give PROX time to set up packet dumping
564
565     def quit(self):
566         self.stop_all()
567         self._quit()
568         self.force_quit()
569
570     def _quit(self):
571         """ stop all cores on the remote instance """
572         LOG.debug("Quit prox")
573         self.put_command("quit\n")
574         time.sleep(3)
575
576     def force_quit(self):
577         """ stop all cores on the remote instance """
578         LOG.debug("Force Quit prox")
579         self.put_command("quit_force\n")
580         time.sleep(3)
581
582
583 _LOCAL_OBJECT = object()
584
585
586 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
587     # the actual app is lowercase
588     APP_NAME = 'prox'
589     # not used for Prox but added for consistency
590     VNF_TYPE = "PROX"
591
592     LUA_PARAMETER_NAME = ""
593     LUA_PARAMETER_PEER = {
594         "gen": "sut",
595         "sut": "gen",
596     }
597
598     CONFIG_QUEUE_TIMEOUT = 120
599
600     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
601         self.remote_path = None
602         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
603         self.remote_prox_file_name = None
604         self._prox_config_data = None
605         self.additional_files = {}
606         self.config_queue = Queue()
607         # allow_exit_without_flush
608         self.config_queue.cancel_join_thread()
609         self._global_section = None
610
611     @property
612     def prox_config_data(self):
613         if self._prox_config_data is None:
614             # this will block, but it needs too
615             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
616         return self._prox_config_data
617
618     @property
619     def global_section(self):
620         if self._global_section is None and self.prox_config_data:
621             self._global_section = self.find_section("global")
622         return self._global_section
623
624     def find_section(self, name, default=_LOCAL_OBJECT):
625         result = next((value for key, value in self.prox_config_data if key == name), default)
626         if result is _LOCAL_OBJECT:
627             raise KeyError('{} not found in Prox config'.format(name))
628         return result
629
630     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
631         section = self.find_section(section_name, [])
632         result = next((value for key, value in section if key == section_key), default)
633         if result is _LOCAL_OBJECT:
634             template = '{} not found in {} section of Prox config'
635             raise KeyError(template.format(section_key, section_name))
636         return result
637
638     def copy_to_target(self, config_file_path, prox_file):
639         remote_path = os.path.join("/tmp", prox_file)
640         self.ssh_helper.put(config_file_path, remote_path)
641         return remote_path
642
643     @staticmethod
644     def _get_tx_port(section, sections):
645         iface_port = [-1]
646         for item in sections[section]:
647             if item[0] == "tx port":
648                 iface_port = re.findall(r'\d+', item[1])
649                 # do we want the last one?
650                 #   if yes, then can we reverse?
651         return int(iface_port[0])
652
653     @staticmethod
654     def _replace_quoted_with_value(quoted, value, count=1):
655         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
656         return new_string
657
658     def _insert_additional_file(self, value):
659         file_str = value.split('"')
660         base_name = os.path.basename(file_str[1])
661         file_str[1] = self.additional_files[base_name]
662         return '"'.join(file_str)
663
664     def generate_prox_config_file(self, config_path):
665         sections = []
666         prox_config = ConfigParser(config_path, sections)
667         prox_config.parse()
668
669         # Ensure MAC is set "hardware"
670         all_ports = self.vnfd_helper.port_pairs.all_ports
671         # use dpdk port number
672         for port_name in all_ports:
673             port_num = self.vnfd_helper.port_num(port_name)
674             port_section_name = "port {}".format(port_num)
675             for section_name, section in sections:
676                 if port_section_name != section_name:
677                     continue
678
679                 for section_data in section:
680                     if section_data[0] == "mac":
681                         section_data[1] = "hardware"
682
683         # search for dst mac
684         for _, section in sections:
685             for section_data in section:
686                 item_key, item_val = section_data
687                 if item_val.startswith("@@dst_mac"):
688                     tx_port_iter = re.finditer(r'\d+', item_val)
689                     tx_port_no = int(next(tx_port_iter).group(0))
690                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
691                     mac = intf["virtual-interface"]["dst_mac"]
692                     section_data[1] = mac.replace(":", " ", 6)
693
694                 if item_key == "dst mac" and item_val.startswith("@@"):
695                     tx_port_iter = re.finditer(r'\d+', item_val)
696                     tx_port_no = int(next(tx_port_iter).group(0))
697                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
698                     mac = intf["virtual-interface"]["dst_mac"]
699                     section_data[1] = mac
700
701         # if addition file specified in prox config
702         if not self.additional_files:
703             return sections
704
705         for section_name, section in sections:
706             for section_data in section:
707                 try:
708                     if section_data[0].startswith("dofile"):
709                         section_data[0] = self._insert_additional_file(section_data[0])
710
711                     if section_data[1].startswith("dofile"):
712                         section_data[1] = self._insert_additional_file(section_data[1])
713                 except:  # pylint: disable=bare-except
714                     pass
715
716         return sections
717
718     @staticmethod
719     def write_prox_lua(lua_config):
720         """
721         Write an .ini-format config file for PROX (parameters.lua)
722         PROX does not allow a space before/after the =, so we need
723         a custom method
724         """
725         out = []
726         for key in lua_config:
727             value = '"' + lua_config[key] + '"'
728             if key == "__name__":
729                 continue
730             if value is not None and value != '@':
731                 key = "=".join((key, str(value).replace('\n', '\n\t')))
732                 out.append(key)
733             else:
734                 key = str(key).replace('\n', '\n\t')
735                 out.append(key)
736         return os.linesep.join(out)
737
738     @staticmethod
739     def write_prox_config(prox_config):
740         """
741         Write an .ini-format config file for PROX
742         PROX does not allow a space before/after the =, so we need
743         a custom method
744         """
745         out = []
746         for (section_name, section) in prox_config:
747             out.append("[{}]".format(section_name))
748             for item in section:
749                 key, value = item
750                 if key == "__name__":
751                     continue
752                 if value is not None and value != '@':
753                     key = "=".join((key, str(value).replace('\n', '\n\t')))
754                     out.append(key)
755                 else:
756                     key = str(key).replace('\n', '\n\t')
757                     out.append(key)
758         return os.linesep.join(out)
759
760     def put_string_to_file(self, s, remote_path):
761         file_obj = cStringIO(s)
762         self.ssh_helper.put_file_obj(file_obj, remote_path)
763         return remote_path
764
765     def generate_prox_lua_file(self):
766         p = OrderedDict()
767         all_ports = self.vnfd_helper.port_pairs.all_ports
768         for port_name in all_ports:
769             port_num = self.vnfd_helper.port_num(port_name)
770             intf = self.vnfd_helper.find_interface(name=port_name)
771             vintf = intf['virtual-interface']
772             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
773             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
774
775         return p
776
777     def upload_prox_lua(self, config_file, lua_data):
778         # prox can't handle spaces around ' = ' so use custom method
779         out = StringIO(self.write_prox_lua(lua_data))
780         out.seek(0)
781         remote_path = os.path.join("/tmp", config_file)
782         self.ssh_helper.put_file_obj(out, remote_path)
783
784         return remote_path
785
786     def upload_prox_config(self, config_file, prox_config_data):
787         # prox can't handle spaces around ' = ' so use custom method
788         out = StringIO(self.write_prox_config(prox_config_data))
789         out.seek(0)
790         remote_path = os.path.join("/tmp", config_file)
791         self.ssh_helper.put_file_obj(out, remote_path)
792
793         return remote_path
794
795     def build_config_file(self):
796         task_path = self.scenario_helper.task_path
797         options = self.scenario_helper.options
798         config_path = options['prox_config']
799         config_file = os.path.basename(config_path)
800         config_path = utils.find_relative_file(config_path, task_path)
801         self.additional_files = {}
802
803         try:
804             if options['prox_generate_parameter']:
805                 self.lua = []
806                 self.lua = self.generate_prox_lua_file()
807                 if len(self.lua) > 0:
808                     self.upload_prox_lua("parameters.lua", self.lua)
809         except:  # pylint: disable=bare-except
810             pass
811
812         prox_files = options.get('prox_files', [])
813         if isinstance(prox_files, six.string_types):
814             prox_files = [prox_files]
815         for key_prox_file in prox_files:
816             base_prox_file = os.path.basename(key_prox_file)
817             key_prox_path = utils.find_relative_file(key_prox_file, task_path)
818             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
819             self.additional_files[base_prox_file] = remote_prox_file
820
821         self._prox_config_data = self.generate_prox_config_file(config_path)
822         # copy config to queue so we can read it from traffic_runner process
823         self.config_queue.put(self._prox_config_data)
824         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
825
826     def build_config(self):
827         self.build_config_file()
828
829         options = self.scenario_helper.options
830         prox_args = options['prox_args']
831         tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
832
833         self.pipeline_kwargs = {
834             'tool_path': tool_path,
835             'tool_dir': os.path.dirname(tool_path),
836             'cfg_file': self.remote_path,
837             'args': ' '.join(' '.join([str(k), str(v) if v else ''])
838                              for k, v in prox_args.items())
839         }
840
841         cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
842                         "{args} -f {cfg_file} '")
843         return cmd_template.format(**self.pipeline_kwargs)
844
845
846 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
847 class ProxResourceHelper(ClientResourceHelper):
848
849     RESOURCE_WORD = 'prox'
850
851     PROX_MODE = ""
852
853     WAIT_TIME = 3
854
855     @staticmethod
856     def find_pci(pci, bound_pci):
857         # we have to substring match PCI bus address from the end
858         return any(b.endswith(pci) for b in bound_pci)
859
860     def __init__(self, setup_helper):
861         super(ProxResourceHelper, self).__init__(setup_helper)
862         self.mgmt_interface = self.vnfd_helper.mgmt_interface
863         self._user = self.mgmt_interface["user"]
864         self._ip = self.mgmt_interface["ip"]
865
866         self.done = False
867         self._vpci_to_if_name_map = None
868         self.additional_file = {}
869         self.remote_prox_file_name = None
870         self.lower = None
871         self.upper = None
872         self.step_delta = 1
873         self.step_time = 0.5
874         self._test_type = None
875
876     @property
877     def sut(self):
878         if not self.client:
879             self.client = self._connect()
880         return self.client
881
882     @property
883     def test_type(self):
884         if self._test_type is None:
885             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
886         return self._test_type
887
888     def run_traffic(self, traffic_profile):
889         self._queue.cancel_join_thread()
890         self.lower = 0.0
891         self.upper = 100.0
892
893         traffic_profile.init(self._queue)
894         # this frees up the run_traffic loop
895         self.client_started.value = 1
896
897         while not self._terminated.value:
898             # move it all to traffic_profile
899             self._run_traffic_once(traffic_profile)
900
901     def _run_traffic_once(self, traffic_profile):
902         traffic_profile.execute_traffic(self)
903         if traffic_profile.done:
904             self._queue.put({'done': True})
905             LOG.debug("tg_prox done")
906             self._terminated.value = 1
907
908     # For VNF use ResourceHelper method to collect KPIs directly.
909     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
910     def collect_collectd_kpi(self):
911         return self._collect_resource_kpi()
912
913     def collect_kpi(self):
914         result = super(ProxResourceHelper, self).collect_kpi()
915         # add in collectd kpis manually
916         if result:
917             result['collect_stats'] = self._collect_resource_kpi()
918         return result
919
920     def terminate(self):
921         # should not be called, use VNF terminate
922         raise NotImplementedError()
923
924     def up_post(self):
925         return self.sut  # force connection
926
927     def execute(self, cmd, *args, **kwargs):
928         func = getattr(self.sut, cmd, None)
929         if func:
930             return func(*args, **kwargs)
931         return None
932
933     def _connect(self, client=None):
934         """Run and connect to prox on the remote system """
935         # De-allocating a large amount of hugepages takes some time. If a new
936         # PROX instance is started immediately after killing the previous one,
937         # it might not be able to allocate hugepages, because they are still
938         # being freed. Hence the -w switch.
939         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
940         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
941         # -f ./handle_none-4.cfg"
942         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
943         #  "; " \
944         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
945         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
946         # sudo " \
947         #    + "./build/Prox " + prox_args
948         # log.debug("Starting PROX with command [%s]", prox_cmd)
949         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
950         # self._ip, prox_cmd))
951         if client is None:
952             client = ProxSocketHelper()
953
954         # try connecting to Prox for 60s
955         for _ in range(RETRY_SECONDS):
956             time.sleep(RETRY_INTERVAL)
957             try:
958                 client.connect(self._ip, PROX_PORT)
959             except (socket.gaierror, socket.error):
960                 continue
961             else:
962                 return client
963
964         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
965         raise Exception(msg.format(self._ip, PROX_PORT))
966
967
968 class ProxDataHelper(object):
969
970     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
971         super(ProxDataHelper, self).__init__()
972         self.vnfd_helper = vnfd_helper
973         self.sut = sut
974         self.pkt_size = pkt_size
975         self.value = value
976         self.tolerated_loss = tolerated_loss
977         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
978         self.tsc_hz = None
979         self.measured_stats = None
980         self.latency = None
981         self._totals_and_pps = None
982         self.result_tuple = None
983
984     @property
985     def totals_and_pps(self):
986         if self._totals_and_pps is None:
987             rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
988             pps = self.value / 100.0 * self.line_rate_to_pps()
989             self._totals_and_pps = rx_total, tx_total, pps
990         return self._totals_and_pps
991
992     @property
993     def rx_total(self):
994         return self.totals_and_pps[0]
995
996     @property
997     def tx_total(self):
998         return self.totals_and_pps[1]
999
1000     @property
1001     def pps(self):
1002         return self.totals_and_pps[2]
1003
1004     @property
1005     def samples(self):
1006         samples = {}
1007         for port_name, port_num in self.vnfd_helper.ports_iter():
1008             try:
1009                 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1010                 samples[port_name] = {
1011                     "in_packets": port_rx_total,
1012                     "out_packets": port_tx_total,
1013                 }
1014             except (KeyError, TypeError, NameError, MemoryError, ValueError,
1015                     SystemError, BufferError):
1016                 samples[port_name] = {
1017                     "in_packets": 0,
1018                     "out_packets": 0,
1019                 }
1020         return samples
1021
1022     def __enter__(self):
1023         self.check_interface_count()
1024         return self
1025
1026     def __exit__(self, exc_type, exc_val, exc_tb):
1027         self.make_tuple()
1028
1029     def make_tuple(self):
1030         if self.result_tuple:
1031             return
1032
1033         self.result_tuple = ProxTestDataTuple(
1034             self.tolerated_loss,
1035             self.tsc_hz,
1036             self.measured_stats['delta'].rx,
1037             self.measured_stats['delta'].tx,
1038             self.measured_stats['delta'].tsc,
1039             self.latency,
1040             self.rx_total,
1041             self.tx_total,
1042             self.pps,
1043         )
1044         self.result_tuple.log_data()
1045
1046     @contextmanager
1047     def measure_tot_stats(self):
1048         with self.sut.measure_tot_stats() as self.measured_stats:
1049             yield
1050
1051     def check_interface_count(self):
1052         # do this assert in init?  unless we expect interface count to
1053         # change from one run to another run...
1054         assert self.port_count in {1, 2, 4}, \
1055             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1056
1057     def capture_tsc_hz(self):
1058         self.tsc_hz = float(self.sut.hz())
1059
1060     def line_rate_to_pps(self):
1061         # NOTE: to fix, don't hardcode 10Gb/s
1062         return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1063
1064
1065 class ProxProfileHelper(object):
1066
1067     __prox_profile_type__ = "Generic"
1068
1069     PROX_CORE_GEN_MODE = "gen"
1070     PROX_CORE_LAT_MODE = "lat"
1071
1072     @classmethod
1073     def get_cls(cls, helper_type):
1074         """Return class of specified type."""
1075         if not helper_type:
1076             return ProxProfileHelper
1077
1078         for profile_helper_class in utils.itersubclasses(cls):
1079             if helper_type == profile_helper_class.__prox_profile_type__:
1080                 return profile_helper_class
1081
1082         return ProxProfileHelper
1083
1084     @classmethod
1085     def make_profile_helper(cls, resource_helper):
1086         return cls.get_cls(resource_helper.test_type)(resource_helper)
1087
1088     def __init__(self, resource_helper):
1089         super(ProxProfileHelper, self).__init__()
1090         self.resource_helper = resource_helper
1091         self._cpu_topology = None
1092         self._test_cores = None
1093         self._latency_cores = None
1094
1095     @property
1096     def cpu_topology(self):
1097         if not self._cpu_topology:
1098             stdout = io.BytesIO()
1099             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1100             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1101         return self._cpu_topology
1102
1103     @property
1104     def test_cores(self):
1105         if not self._test_cores:
1106             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1107         return self._test_cores
1108
1109     @property
1110     def latency_cores(self):
1111         if not self._latency_cores:
1112             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1113         return self._latency_cores
1114
1115     @contextmanager
1116     def traffic_context(self, pkt_size, value):
1117         self.sut.stop_all()
1118         self.sut.reset_stats()
1119         try:
1120             self.sut.set_pkt_size(self.test_cores, pkt_size)
1121             self.sut.set_speed(self.test_cores, value)
1122             self.sut.start_all()
1123             yield
1124         finally:
1125             self.sut.stop_all()
1126
1127     def get_cores(self, mode):
1128         cores = []
1129
1130         for section_name, section in self.setup_helper.prox_config_data:
1131             if not section_name.startswith("core"):
1132                 continue
1133
1134             for key, value in section:
1135                 if key == "mode" and value == mode:
1136                     core_tuple = CoreSocketTuple(section_name)
1137                     core = core_tuple.core_id
1138                     cores.append(core)
1139
1140         return cores
1141
1142     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1143         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1144
1145         with data_helper, self.traffic_context(pkt_size, value):
1146             with data_helper.measure_tot_stats():
1147                 time.sleep(duration)
1148                 # Getting statistics to calculate PPS at right speed....
1149                 data_helper.capture_tsc_hz()
1150                 data_helper.latency = self.get_latency()
1151
1152         return data_helper.result_tuple, data_helper.samples
1153
1154     def get_latency(self):
1155         """
1156         :return: return lat_min, lat_max, lat_avg
1157         :rtype: list
1158         """
1159
1160         if not self._latency_cores:
1161             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1162
1163         if self._latency_cores:
1164             return self.sut.lat_stats(self._latency_cores)
1165         return []
1166
1167     def terminate(self):
1168         pass
1169
1170     def __getattr__(self, item):
1171         return getattr(self.resource_helper, item)
1172
1173
1174 class ProxMplsProfileHelper(ProxProfileHelper):
1175
1176     __prox_profile_type__ = "MPLS tag/untag"
1177
1178     def __init__(self, resource_helper):
1179         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1180         self._cores_tuple = None
1181
1182     @property
1183     def mpls_cores(self):
1184         if not self._cores_tuple:
1185             self._cores_tuple = self.get_cores_mpls()
1186         return self._cores_tuple
1187
1188     @property
1189     def tagged_cores(self):
1190         return self.mpls_cores[0]
1191
1192     @property
1193     def plain_cores(self):
1194         return self.mpls_cores[1]
1195
1196     def get_cores_mpls(self):
1197         cores_tagged = []
1198         cores_plain = []
1199         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1200             if not section_name.startswith("core"):
1201                 continue
1202
1203             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1204                 continue
1205
1206             for item_key, item_value in section:
1207                 if item_key != 'name':
1208                     continue
1209
1210                 if item_value.startswith("tag"):
1211                     core_tuple = CoreSocketTuple(section_name)
1212                     core_tag = core_tuple.core_id
1213                     cores_tagged.append(core_tag)
1214
1215                 elif item_value.startswith("udp"):
1216                     core_tuple = CoreSocketTuple(section_name)
1217                     core_udp = core_tuple.core_id
1218                     cores_plain.append(core_udp)
1219
1220         return cores_tagged, cores_plain
1221
1222     @contextmanager
1223     def traffic_context(self, pkt_size, value):
1224         self.sut.stop_all()
1225         self.sut.reset_stats()
1226         try:
1227             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1228             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1229             self.sut.set_speed(self.tagged_cores, value)
1230             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1231             self.sut.set_speed(self.plain_cores, value * ratio)
1232             self.sut.start_all()
1233             yield
1234         finally:
1235             self.sut.stop_all()
1236
1237
1238 class ProxBngProfileHelper(ProxProfileHelper):
1239
1240     __prox_profile_type__ = "BNG gen"
1241
1242     def __init__(self, resource_helper):
1243         super(ProxBngProfileHelper, self).__init__(resource_helper)
1244         self._cores_tuple = None
1245
1246     @property
1247     def bng_cores(self):
1248         if not self._cores_tuple:
1249             self._cores_tuple = self.get_cores_gen_bng_qos()
1250         return self._cores_tuple
1251
1252     @property
1253     def cpe_cores(self):
1254         return self.bng_cores[0]
1255
1256     @property
1257     def inet_cores(self):
1258         return self.bng_cores[1]
1259
1260     @property
1261     def arp_cores(self):
1262         return self.bng_cores[2]
1263
1264     @property
1265     def arp_task_cores(self):
1266         return self.bng_cores[3]
1267
1268     @property
1269     def all_rx_cores(self):
1270         return self.latency_cores
1271
1272     def get_cores_gen_bng_qos(self):
1273         cpe_cores = []
1274         inet_cores = []
1275         arp_cores = []
1276         arp_tasks_core = [0]
1277         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1278             if not section_name.startswith("core"):
1279                 continue
1280
1281             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1282                 continue
1283
1284             for item_key, item_value in section:
1285                 if item_key != 'name':
1286                     continue
1287
1288                 if item_value.startswith("cpe"):
1289                     core_tuple = CoreSocketTuple(section_name)
1290                     cpe_core = core_tuple.core_id
1291                     cpe_cores.append(cpe_core)
1292
1293                 elif item_value.startswith("inet"):
1294                     core_tuple = CoreSocketTuple(section_name)
1295                     inet_core = core_tuple.core_id
1296                     inet_cores.append(inet_core)
1297
1298                 elif item_value.startswith("arp"):
1299                     core_tuple = CoreSocketTuple(section_name)
1300                     arp_core = core_tuple.core_id
1301                     arp_cores.append(arp_core)
1302
1303                 # We check the tasks/core separately
1304                 if item_value.startswith("arp_task"):
1305                     core_tuple = CoreSocketTuple(section_name)
1306                     arp_task_core = core_tuple.core_id
1307                     arp_tasks_core.append(arp_task_core)
1308
1309         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1310
1311     @contextmanager
1312     def traffic_context(self, pkt_size, value):
1313         # Tester is sending packets at the required speed already after
1314         # setup_test(). Just get the current statistics, sleep the required
1315         # amount of time and calculate packet loss.
1316         inet_pkt_size = pkt_size
1317         cpe_pkt_size = pkt_size - 24
1318         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1319
1320         curr_up_speed = curr_down_speed = 0
1321         max_up_speed = max_down_speed = value
1322         if ratio < 1:
1323             max_down_speed = value * ratio
1324         else:
1325             max_up_speed = value / ratio
1326
1327         # Initialize cores
1328         self.sut.stop_all()
1329         time.sleep(0.5)
1330
1331         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1332         # wrong.
1333         self.sut.start(self.all_rx_cores)
1334         time.sleep(0.5)
1335         self.sut.stop(self.all_rx_cores)
1336         time.sleep(0.5)
1337         self.sut.reset_stats()
1338
1339         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1340         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1341
1342         self.sut.reset_values(self.cpe_cores)
1343         self.sut.reset_values(self.inet_cores)
1344
1345         # Set correct IP and UDP lengths in packet headers
1346         # CPE
1347         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1348         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1349         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1350         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1351
1352         # INET
1353         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1354         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1355         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1356         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1357         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1358         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1359
1360         # Sending ARP to initialize tables - need a few seconds of generation
1361         # to make sure all CPEs are initialized
1362         LOG.info("Initializing SUT: sending ARP packets")
1363         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1364         self.sut.set_speed(self.inet_cores, curr_up_speed)
1365         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1366         self.sut.start(self.arp_cores)
1367         time.sleep(4)
1368
1369         # Ramp up the transmission speed. First go to the common speed, then
1370         # increase steps for the faster one.
1371         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1372
1373         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1374
1375         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1376             # The min(..., ...) takes care of 1) floating point rounding errors
1377             # that could make curr_*_speed to be slightly greater than
1378             # max_*_speed and 2) max_*_speed not being an exact multiple of
1379             # self._step_delta.
1380             if curr_up_speed < max_up_speed:
1381                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1382             if curr_down_speed < max_down_speed:
1383                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1384
1385             self.sut.set_speed(self.inet_cores, curr_up_speed)
1386             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1387             time.sleep(self.step_time)
1388
1389         LOG.info("Target speeds reached. Starting real test.")
1390
1391         yield
1392
1393         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1394         LOG.info("Test ended. Flushing NIC buffers")
1395         self.sut.start(self.all_rx_cores)
1396         time.sleep(3)
1397         self.sut.stop(self.all_rx_cores)
1398
1399     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1400         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1401
1402         with data_helper, self.traffic_context(pkt_size, value):
1403             with data_helper.measure_tot_stats():
1404                 time.sleep(duration)
1405                 # Getting statistics to calculate PPS at right speed....
1406                 data_helper.capture_tsc_hz()
1407                 data_helper.latency = self.get_latency()
1408
1409         return data_helper.result_tuple, data_helper.samples
1410
1411
1412 class ProxVpeProfileHelper(ProxProfileHelper):
1413
1414     __prox_profile_type__ = "vPE gen"
1415
1416     def __init__(self, resource_helper):
1417         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1418         self._cores_tuple = None
1419         self._ports_tuple = None
1420
1421     @property
1422     def vpe_cores(self):
1423         if not self._cores_tuple:
1424             self._cores_tuple = self.get_cores_gen_vpe()
1425         return self._cores_tuple
1426
1427     @property
1428     def cpe_cores(self):
1429         return self.vpe_cores[0]
1430
1431     @property
1432     def inet_cores(self):
1433         return self.vpe_cores[1]
1434
1435     @property
1436     def all_rx_cores(self):
1437         return self.latency_cores
1438
1439     @property
1440     def vpe_ports(self):
1441         if not self._ports_tuple:
1442             self._ports_tuple = self.get_ports_gen_vpe()
1443         return self._ports_tuple
1444
1445     @property
1446     def cpe_ports(self):
1447         return self.vpe_ports[0]
1448
1449     @property
1450     def inet_ports(self):
1451         return self.vpe_ports[1]
1452
1453     def get_cores_gen_vpe(self):
1454         cpe_cores = []
1455         inet_cores = []
1456         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1457             if not section_name.startswith("core"):
1458                 continue
1459
1460             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1461                 continue
1462
1463             for item_key, item_value in section:
1464                 if item_key != 'name':
1465                     continue
1466
1467                 if item_value.startswith("cpe"):
1468                     core_tuple = CoreSocketTuple(section_name)
1469                     core_tag = core_tuple.core_id
1470                     cpe_cores.append(core_tag)
1471
1472                 elif item_value.startswith("inet"):
1473                     core_tuple = CoreSocketTuple(section_name)
1474                     inet_core = core_tuple.core_id
1475                     inet_cores.append(inet_core)
1476
1477         return cpe_cores, inet_cores
1478
1479     def get_ports_gen_vpe(self):
1480         cpe_ports = []
1481         inet_ports = []
1482
1483         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1484             if not section_name.startswith("port"):
1485                 continue
1486             tx_port_iter = re.finditer(r'\d+', section_name)
1487             tx_port_no = int(next(tx_port_iter).group(0))
1488
1489             for item_key, item_value in section:
1490                 if item_key != 'name':
1491                     continue
1492
1493                 if item_value.startswith("cpe"):
1494                     cpe_ports.append(tx_port_no)
1495
1496                 elif item_value.startswith("inet"):
1497                     inet_ports.append(tx_port_no)
1498
1499         return cpe_ports, inet_ports
1500
1501     @contextmanager
1502     def traffic_context(self, pkt_size, value):
1503         # Calculate the target upload and download speed. The upload and
1504         # download packets have different packet sizes, so in order to get
1505         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1506         # of the packet sizes.
1507         cpe_pkt_size = pkt_size
1508         inet_pkt_size = pkt_size - 4
1509         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1510
1511         curr_up_speed = curr_down_speed = 0
1512         max_up_speed = max_down_speed = value
1513         if ratio < 1:
1514             max_down_speed = value * ratio
1515         else:
1516             max_up_speed = value / ratio
1517
1518         # Adjust speed when multiple cores per port are used to generate traffic
1519         if len(self.cpe_ports) != len(self.cpe_cores):
1520             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1521         if len(self.inet_ports) != len(self.inet_cores):
1522             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1523
1524         # Initialize cores
1525         self.sut.stop_all()
1526         time.sleep(2)
1527
1528         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1529         # wrong.
1530         self.sut.start(self.all_rx_cores)
1531         time.sleep(2)
1532         self.sut.stop(self.all_rx_cores)
1533         time.sleep(2)
1534         self.sut.reset_stats()
1535
1536         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1537         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1538
1539         self.sut.reset_values(self.cpe_cores)
1540         self.sut.reset_values(self.inet_cores)
1541
1542         # Set correct IP and UDP lengths in packet headers
1543         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1544         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1545         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1546         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1547
1548         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1549         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1550         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1551         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1552
1553         self.sut.set_speed(self.inet_cores, curr_up_speed)
1554         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1555
1556         # Ramp up the transmission speed. First go to the common speed, then
1557         # increase steps for the faster one.
1558         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1559
1560         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1561
1562         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1563             # The min(..., ...) takes care of 1) floating point rounding errors
1564             # that could make curr_*_speed to be slightly greater than
1565             # max_*_speed and 2) max_*_speed not being an exact multiple of
1566             # self._step_delta.
1567             if curr_up_speed < max_up_speed:
1568                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1569             if curr_down_speed < max_down_speed:
1570                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1571
1572             self.sut.set_speed(self.inet_cores, curr_up_speed)
1573             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1574             time.sleep(self.step_time)
1575
1576         LOG.info("Target speeds reached. Starting real test.")
1577
1578         yield
1579
1580         self.sut.stop(self.cpe_cores + self.inet_cores)
1581         LOG.info("Test ended. Flushing NIC buffers")
1582         self.sut.start(self.all_rx_cores)
1583         time.sleep(3)
1584         self.sut.stop(self.all_rx_cores)
1585
1586     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1587         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1588
1589         with data_helper, self.traffic_context(pkt_size, value):
1590             with data_helper.measure_tot_stats():
1591                 time.sleep(duration)
1592                 # Getting statistics to calculate PPS at right speed....
1593                 data_helper.capture_tsc_hz()
1594                 data_helper.latency = self.get_latency()
1595
1596         return data_helper.result_tuple, data_helper.samples
1597
1598
1599 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1600
1601     __prox_profile_type__ = "lwAFTR gen"
1602
1603     def __init__(self, resource_helper):
1604         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1605         self._cores_tuple = None
1606         self._ports_tuple = None
1607         self.step_delta = 5
1608         self.step_time = 0.5
1609
1610     @property
1611     def _lwaftr_cores(self):
1612         if not self._cores_tuple:
1613             self._cores_tuple = self._get_cores_gen_lwaftr()
1614         return self._cores_tuple
1615
1616     @property
1617     def tun_cores(self):
1618         return self._lwaftr_cores[0]
1619
1620     @property
1621     def inet_cores(self):
1622         return self._lwaftr_cores[1]
1623
1624     @property
1625     def _lwaftr_ports(self):
1626         if not self._ports_tuple:
1627             self._ports_tuple = self._get_ports_gen_lw_aftr()
1628         return self._ports_tuple
1629
1630     @property
1631     def tun_ports(self):
1632         return self._lwaftr_ports[0]
1633
1634     @property
1635     def inet_ports(self):
1636         return self._lwaftr_ports[1]
1637
1638     @property
1639     def all_rx_cores(self):
1640         return self.latency_cores
1641
1642     def _get_cores_gen_lwaftr(self):
1643         tun_cores = []
1644         inet_cores = []
1645         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1646             if not section_name.startswith("core"):
1647                 continue
1648
1649             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1650                 continue
1651
1652             core_tuple = CoreSocketTuple(section_name)
1653             core_tag = core_tuple.core_id
1654             for item_value in (v for k, v in section if k == 'name'):
1655                 if item_value.startswith('tun'):
1656                     tun_cores.append(core_tag)
1657                 elif item_value.startswith('inet'):
1658                     inet_cores.append(core_tag)
1659
1660         return tun_cores, inet_cores
1661
1662     def _get_ports_gen_lw_aftr(self):
1663         tun_ports = []
1664         inet_ports = []
1665
1666         re_port = re.compile(r'port (\d+)')
1667         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1668             match = re_port.search(section_name)
1669             if not match:
1670                 continue
1671
1672             tx_port_no = int(match.group(1))
1673             for item_value in (v for k, v in section if k == 'name'):
1674                 if item_value.startswith('lwB4'):
1675                     tun_ports.append(tx_port_no)
1676                 elif item_value.startswith('inet'):
1677                     inet_ports.append(tx_port_no)
1678
1679         return tun_ports, inet_ports
1680
1681     @staticmethod
1682     def _resize(len1, len2):
1683         if len1 == len2:
1684             return 1.0
1685         return 1.0 * len1 / len2
1686
1687     @contextmanager
1688     def traffic_context(self, pkt_size, value):
1689         # Tester is sending packets at the required speed already after
1690         # setup_test(). Just get the current statistics, sleep the required
1691         # amount of time and calculate packet loss.
1692         tun_pkt_size = pkt_size
1693         inet_pkt_size = pkt_size - 40
1694         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1695
1696         curr_up_speed = curr_down_speed = 0
1697         max_up_speed = max_down_speed = value
1698
1699         max_up_speed = value / ratio
1700
1701         # Adjust speed when multiple cores per port are used to generate traffic
1702         if len(self.tun_ports) != len(self.tun_cores):
1703             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1704         if len(self.inet_ports) != len(self.inet_cores):
1705             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1706
1707         # Initialize cores
1708         self.sut.stop_all()
1709         time.sleep(0.5)
1710
1711         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1712         # wrong.
1713         self.sut.start(self.all_rx_cores)
1714         time.sleep(0.5)
1715         self.sut.stop(self.all_rx_cores)
1716         time.sleep(0.5)
1717         self.sut.reset_stats()
1718
1719         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1720         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1721
1722         self.sut.reset_values(self.tun_cores)
1723         self.sut.reset_values(self.inet_cores)
1724
1725         # Set correct IP and UDP lengths in packet headers
1726         # tun
1727         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1728         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1729         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1730         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1731         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1732         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1733
1734         # INET
1735         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1736         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1737         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1738         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1739
1740         LOG.info("Initializing SUT: sending lwAFTR packets")
1741         self.sut.set_speed(self.inet_cores, curr_up_speed)
1742         self.sut.set_speed(self.tun_cores, curr_down_speed)
1743         time.sleep(4)
1744
1745         # Ramp up the transmission speed. First go to the common speed, then
1746         # increase steps for the faster one.
1747         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1748
1749         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1750
1751         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1752             # The min(..., ...) takes care of 1) floating point rounding errors
1753             # that could make curr_*_speed to be slightly greater than
1754             # max_*_speed and 2) max_*_speed not being an exact multiple of
1755             # self._step_delta.
1756             if curr_up_speed < max_up_speed:
1757                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1758             if curr_down_speed < max_down_speed:
1759                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1760
1761             self.sut.set_speed(self.inet_cores, curr_up_speed)
1762             self.sut.set_speed(self.tun_cores, curr_down_speed)
1763             time.sleep(self.step_time)
1764
1765         LOG.info("Target speeds reached. Starting real test.")
1766
1767         yield
1768
1769         self.sut.stop(self.tun_cores + self.inet_cores)
1770         LOG.info("Test ended. Flushing NIC buffers")
1771         self.sut.start(self.all_rx_cores)
1772         time.sleep(3)
1773         self.sut.stop(self.all_rx_cores)
1774
1775     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1776         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1777
1778         with data_helper, self.traffic_context(pkt_size, value):
1779             with data_helper.measure_tot_stats():
1780                 time.sleep(duration)
1781                 # Getting statistics to calculate PPS at right speed....
1782                 data_helper.capture_tsc_hz()
1783                 data_helper.latency = self.get_latency()
1784
1785         return data_helper.result_tuple, data_helper.samples