Merge "Add Flags class to base.Context"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import array
16 import io
17 import logging
18 import operator
19 import os
20 import re
21 import select
22 import socket
23 import time
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
28
29 import six
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
32
33 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
34 from yardstick.common import utils
35 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
36 from yardstick.network_services.helpers.iniparser import ConfigParser
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
39
40 PROX_PORT = 8474
41
42 SECTION_NAME = 0
43 SECTION_CONTENTS = 1
44
45 LOG = logging.getLogger(__name__)
46 LOG.setLevel(logging.DEBUG)
47
48 TEN_GIGABIT = 1e10
49 BITS_PER_BYTE = 8
50 RETRY_SECONDS = 60
51 RETRY_INTERVAL = 1
52
53 CONFIGURATION_OPTIONS = (
54     # dict key           section     key               default value
55     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
56     ('testDuration', 'general', 'test_duration', 5.0),
57     ('testPrecision', 'general', 'test_precision', 1.0),
58     ('tests', 'general', 'tests', None),
59     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
60
61     ('logFile', 'logging', 'file', 'dats.log'),
62     ('logDateFormat', 'logging', 'datefmt', None),
63     ('logLevel', 'logging', 'level', 'INFO'),
64     ('logOverwrite', 'logging', 'overwrite', 1),
65
66     ('testerIp', 'tester', 'ip', None),
67     ('testerUser', 'tester', 'user', 'root'),
68     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
69     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
70     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
71     ('testerSocketId', 'tester', 'socket_id', 0),
72
73     ('sutIp', 'sut', 'ip', None),
74     ('sutUser', 'sut', 'user', 'root'),
75     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
76     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
77     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
78     ('sutSocketId', 'sut', 'socket_id', 0),
79 )
80
81
82 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
83     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
84
85     def __new__(cls, *args):
86         try:
87             matches = cls.CORE_RE.search(str(args[0]))
88             if matches:
89                 args = matches.groups()
90
91             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
92                                                        'h' if args[2] else '')
93
94         except (AttributeError, TypeError, IndexError, ValueError):
95             raise ValueError('Invalid core spec {}'.format(args))
96
97     def is_hyperthread(self):
98         return self.hyperthread == 'h'
99
100     @property
101     def index(self):
102         return int(self.is_hyperthread())
103
104     def find_in_topology(self, cpu_topology):
105         try:
106             socket_core_match = cpu_topology[self.socket_id][self.core_id]
107             sorted_match = sorted(socket_core_match.values())
108             return sorted_match[self.index][0]
109         except (KeyError, IndexError):
110             template = "Core {}{} on socket {} does not exist"
111             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
112
113
114 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
115     def __new__(cls, *args):
116         try:
117             assert args[0] is not str(args[0])
118             args = tuple(args[0])
119         except (AssertionError, IndexError, TypeError):
120             pass
121
122         return super(TotStatsTuple, cls).__new__(cls, *args)
123
124
125 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
126                                                         'delta_tx,delta_tsc,'
127                                                         'latency,rx_total,tx_total,pps')):
128     @property
129     def pkt_loss(self):
130         try:
131             return 1e2 * self.drop_total / float(self.tx_total)
132         except ZeroDivisionError:
133             return 100.0
134
135     @property
136     def mpps(self):
137         # calculate the effective throughput in Mpps
138         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
139
140     @property
141     def can_be_lost(self):
142         return int(self.tx_total * self.tolerated / 1e2)
143
144     @property
145     def drop_total(self):
146         return self.tx_total - self.rx_total
147
148     @property
149     def success(self):
150         return self.drop_total <= self.can_be_lost
151
152     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
153         if pkt_loss is None:
154             pkt_loss = self.pkt_loss
155
156         if port_samples is None:
157             port_samples = {}
158
159         latency_keys = [
160             "LatencyMin",
161             "LatencyMax",
162             "LatencyAvg",
163         ]
164
165         samples = {
166             "Throughput": self.mpps,
167             "DropPackets": pkt_loss,
168             "CurrentDropPackets": pkt_loss,
169             "TxThroughput": self.pps / 1e6,
170             "RxThroughput": self.mpps,
171             "PktSize": pkt_size,
172         }
173         if port_samples:
174             samples.update(port_samples)
175
176         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
177         return samples
178
179     def log_data(self, logger=None):
180         if logger is None:
181             logger = LOG
182
183         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
184         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
185         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
186
187
188 class PacketDump(object):
189     @staticmethod
190     def assert_func(func, value1, value2, template=None):
191         assert func(value1, value2), template.format(value1, value2)
192
193     def __init__(self, port_id, data_len, payload):
194         template = "Packet dump has specified length {}, but payload is {} bytes long"
195         self.assert_func(operator.eq, data_len, len(payload), template)
196         self._port_id = port_id
197         self._data_len = data_len
198         self._payload = payload
199
200     @property
201     def port_id(self):
202         """Get the port id of the packet dump"""
203         return self._port_id
204
205     @property
206     def data_len(self):
207         """Get the length of the data received"""
208         return self._data_len
209
210     def __str__(self):
211         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
212
213     def payload(self, start=None, end=None):
214         """Get part of the payload as a list of ordinals.
215
216         Returns a list of byte values, matching the contents of the packet dump.
217         Optional start and end parameters can be specified to retrieve only a
218         part of the packet contents.
219
220         The number of elements in the list is equal to end - start + 1, so end
221         is the offset of the last character.
222
223         Args:
224             start (pos. int): the starting offset in the payload. If it is not
225                 specified or None, offset 0 is assumed.
226             end (pos. int): the ending offset of the payload. If it is not
227                 specified or None, the contents until the end of the packet are
228                 returned.
229
230         Returns:
231             [int, int, ...]. Each int represents the ordinal value of a byte in
232             the packet payload.
233         """
234         if start is None:
235             start = 0
236
237         if end is None:
238             end = self.data_len - 1
239
240         # Bounds checking on offsets
241         template = "Start offset must be non-negative"
242         self.assert_func(operator.ge, start, 0, template)
243
244         template = "End offset must be less than {1}"
245         self.assert_func(operator.lt, end, self.data_len, template)
246
247         # Adjust for splice operation: end offset must be 1 more than the offset
248         # of the last desired character.
249         end += 1
250
251         return self._payload[start:end]
252
253
254 class ProxSocketHelper(object):
255
256     def __init__(self, sock=None):
257         """ creates new prox instance """
258         super(ProxSocketHelper, self).__init__()
259
260         if sock is None:
261             sock = socket.socket()
262
263         self._sock = sock
264         self._pkt_dumps = []
265         self.master_stats = None
266
267     def connect(self, ip, port):
268         """Connect to the prox instance on the remote system"""
269         self._sock.connect((ip, port))
270
271     def get_socket(self):
272         """ get the socket connected to the remote instance """
273         return self._sock
274
275     def _parse_socket_data(self, decoded_data, pkt_dump_only):
276         def get_newline_index():
277             return decoded_data.find('\n', index)
278
279         ret_str = ''
280         index = 0
281         for newline_index in iter(get_newline_index, -1):
282             ret_str = decoded_data[index:newline_index]
283
284             try:
285                 mode, port_id, data_len = ret_str.split(',', 2)
286             except ValueError:
287                 mode, port_id, data_len = None, None, None
288
289             if mode != 'pktdump':
290                 # Regular 1-line message. Stop reading from the socket.
291                 LOG.debug("Regular response read")
292                 return ret_str
293
294             LOG.debug("Packet dump header read: [%s]", ret_str)
295
296             # The line is a packet dump header. Parse it, read the
297             # packet payload, store the dump for later retrieval.
298             # Skip over the packet dump and continue processing: a
299             # 1-line response may follow the packet dump.
300
301             data_len = int(data_len)
302             data_start = newline_index + 1  # + 1 to skip over \n
303             data_end = data_start + data_len
304             sub_data = decoded_data[data_start:data_end]
305             pkt_payload = array.array('B', (ord(v) for v in sub_data))
306             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
307             self._pkt_dumps.append(pkt_dump)
308
309             if pkt_dump_only:
310                 # Return boolean instead of string to signal
311                 # successful reception of the packet dump.
312                 LOG.debug("Packet dump stored, returning")
313                 return True
314
315             index = data_end + 1
316
317         return ret_str
318
319     def get_data(self, pkt_dump_only=False, timeout=1):
320         """ read data from the socket """
321
322         # This method behaves slightly differently depending on whether it is
323         # called to read the response to a command (pkt_dump_only = 0) or if
324         # it is called specifically to read a packet dump (pkt_dump_only = 1).
325         #
326         # Packet dumps look like:
327         #   pktdump,<port_id>,<data_len>\n
328         #   <packet contents as byte array>\n
329         # This means the total packet dump message consists of 2 lines instead
330         # of 1 line.
331         #
332         # - Response for a command (pkt_dump_only = 0):
333         #   1) Read response from the socket until \n (end of message)
334         #   2a) If the response is a packet dump header (starts with "pktdump,"):
335         #     - Read the packet payload and store the packet dump for later
336         #       retrieval.
337         #     - Reset the state and restart from 1). Eventually state 2b) will
338         #       be reached and the function will return.
339         #   2b) If the response is not a packet dump:
340         #     - Return the received message as a string
341         #
342         # - Explicit request to read a packet dump (pkt_dump_only = 1):
343         #   - Read the dump header and payload
344         #   - Store the packet dump for later retrieval
345         #   - Return True to signify a packet dump was successfully read
346
347         def is_ready():
348             # recv() is blocking, so avoid calling it when no data is waiting.
349             ready = select.select([self._sock], [], [], timeout)
350             return bool(ready[0])
351
352         status = False
353         ret_str = ""
354         for status in iter(is_ready, False):
355             decoded_data = self._sock.recv(256).decode('utf-8')
356             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
357
358         LOG.debug("Received data from socket: [%s]", ret_str)
359         return ret_str if status else ''
360
361     def put_command(self, to_send):
362         """ send data to the remote instance """
363         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
364         try:
365             # NOTE: sendall will block, we need a timeout
366             self._sock.sendall(to_send.encode('utf-8'))
367         except:  # pylint: disable=bare-except
368             pass
369
370     def get_packet_dump(self):
371         """ get the next packet dump """
372         if self._pkt_dumps:
373             return self._pkt_dumps.pop(0)
374         return None
375
376     def stop_all_reset(self):
377         """ stop the remote instance and reset stats """
378         LOG.debug("Stop all and reset stats")
379         self.stop_all()
380         self.reset_stats()
381
382     def stop_all(self):
383         """ stop all cores on the remote instance """
384         LOG.debug("Stop all")
385         self.put_command("stop all\n")
386         time.sleep(3)
387
388     def stop(self, cores, task=''):
389         """ stop specific cores on the remote instance """
390         LOG.debug("Stopping cores %s", cores)
391         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
392         time.sleep(3)
393
394     def start_all(self):
395         """ start all cores on the remote instance """
396         LOG.debug("Start all")
397         self.put_command("start all\n")
398
399     def start(self, cores):
400         """ start specific cores on the remote instance """
401         LOG.debug("Starting cores %s", cores)
402         self.put_command("start {}\n".format(join_non_strings(',', cores)))
403         time.sleep(3)
404
405     def reset_stats(self):
406         """ reset the statistics on the remote instance """
407         LOG.debug("Reset stats")
408         self.put_command("reset stats\n")
409         time.sleep(1)
410
411     def _run_template_over_cores(self, template, cores, *args):
412         for core in cores:
413             self.put_command(template.format(core, *args))
414
415     def set_pkt_size(self, cores, pkt_size):
416         """ set the packet size to generate on the remote instance """
417         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
418         pkt_size -= 4
419         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
420         time.sleep(1)
421
422     def set_value(self, cores, offset, value, length):
423         """ set value on the remote instance """
424         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
425         LOG.debug(msg, cores, value, length, offset)
426         template = "set value {} 0 {} {} {}\n"
427         self._run_template_over_cores(template, cores, offset, value, length)
428
429     def reset_values(self, cores):
430         """ reset values on the remote instance """
431         LOG.debug("Set value for core(s) %s", cores)
432         self._run_template_over_cores("reset values {} 0\n", cores)
433
434     def set_speed(self, cores, speed, tasks=None):
435         """ set speed on the remote instance """
436         if tasks is None:
437             tasks = [0] * len(cores)
438         elif len(tasks) != len(cores):
439             LOG.error("set_speed: cores and tasks must have the same len")
440         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
441         for (core, task) in list(zip(cores, tasks)):
442             self.put_command("speed {} {} {}\n".format(core, task, speed))
443
444     def slope_speed(self, cores_speed, duration, n_steps=0):
445         """will start to increase speed from 0 to N where N is taken from
446         a['speed'] for each a in cores_speed"""
447         # by default, each step will take 0.5 sec
448         if n_steps == 0:
449             n_steps = duration * 2
450
451         private_core_data = []
452         step_duration = float(duration) / n_steps
453         for core_data in cores_speed:
454             target = float(core_data['speed'])
455             private_core_data.append({
456                 'cores': core_data['cores'],
457                 'zero': 0,
458                 'delta': target / n_steps,
459                 'current': 0,
460                 'speed': target,
461             })
462
463         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
464         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
465             time.sleep(step_duration)
466             for core_data in private_core_data:
467                 core_data['current'] = core_data[key1] + core_data[key2]
468                 self.set_speed(core_data['cores'], core_data['current'])
469
470     def set_pps(self, cores, pps, pkt_size):
471         """ set packets per second for specific cores on the remote instance """
472         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
473         LOG.debug(msg, cores, pps, pkt_size)
474
475         # speed in percent of line-rate
476         speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
477         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
478
479     def lat_stats(self, cores, task=0):
480         """Get the latency statistics from the remote system"""
481         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
482         lat_min = {}
483         lat_max = {}
484         lat_avg = {}
485         for core in cores:
486             self.put_command("lat stats {} {} \n".format(core, task))
487             ret = self.get_data()
488
489             try:
490                 lat_min[core], lat_max[core], lat_avg[core] = \
491                     tuple(int(n) for n in ret.split(",")[:3])
492
493             except (AttributeError, ValueError, TypeError):
494                 pass
495
496         return lat_min, lat_max, lat_avg
497
498     def get_all_tot_stats(self):
499         self.put_command("tot stats\n")
500         all_stats_str = self.get_data().split(",")
501         if len(all_stats_str) != 4:
502             all_stats = [0] * 4
503             return all_stats
504         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
505         self.master_stats = all_stats
506         return all_stats
507
508     def hz(self):
509         return self.get_all_tot_stats()[3]
510
511     def core_stats(self, cores, task=0):
512         """Get the receive statistics from the remote system"""
513         rx = tx = drop = tsc = 0
514         for core in cores:
515             self.put_command("core stats {} {}\n".format(core, task))
516             ret = self.get_data().split(",")
517             rx += int(ret[0])
518             tx += int(ret[1])
519             drop += int(ret[2])
520             tsc = int(ret[3])
521         return rx, tx, drop, tsc
522
523     def port_stats(self, ports):
524         """get counter values from a specific port"""
525         tot_result = [0] * 12
526         for port in ports:
527             self.put_command("port_stats {}\n".format(port))
528             ret = [try_int(s, 0) for s in self.get_data().split(",")]
529             tot_result = [sum(x) for x in zip(tot_result, ret)]
530         return tot_result
531
532     @contextmanager
533     def measure_tot_stats(self):
534         start = self.get_all_tot_stats()
535         container = {'start_tot': start}
536         try:
537             yield container
538         finally:
539             container['end_tot'] = end = self.get_all_tot_stats()
540
541         container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
542
543     def tot_stats(self):
544         """Get the total statistics from the remote system"""
545         stats = self.get_all_tot_stats()
546         return stats[:3]
547
548     def tot_ierrors(self):
549         """Get the total ierrors from the remote system"""
550         self.put_command("tot ierrors tot\n")
551         recv = self.get_data().split(',')
552         tot_ierrors = int(recv[0])
553         tsc = int(recv[0])
554         return tot_ierrors, tsc
555
556     def set_count(self, count, cores):
557         """Set the number of packets to send on the specified core"""
558         self._run_template_over_cores("count {} 0 {}\n", cores, count)
559
560     def dump_rx(self, core_id, task_id=0, count=1):
561         """Activate dump on rx on the specified core"""
562         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
563         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
564         time.sleep(1.5)  # Give PROX time to set up packet dumping
565
566     def quit(self):
567         self.stop_all()
568         self._quit()
569         self.force_quit()
570
571     def _quit(self):
572         """ stop all cores on the remote instance """
573         LOG.debug("Quit prox")
574         self.put_command("quit\n")
575         time.sleep(3)
576
577     def force_quit(self):
578         """ stop all cores on the remote instance """
579         LOG.debug("Force Quit prox")
580         self.put_command("quit_force\n")
581         time.sleep(3)
582
583
584 _LOCAL_OBJECT = object()
585
586
587 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
588     # the actual app is lowercase
589     APP_NAME = 'prox'
590     # not used for Prox but added for consistency
591     VNF_TYPE = "PROX"
592
593     LUA_PARAMETER_NAME = ""
594     LUA_PARAMETER_PEER = {
595         "gen": "sut",
596         "sut": "gen",
597     }
598
599     CONFIG_QUEUE_TIMEOUT = 120
600
601     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
602         self.remote_path = None
603         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
604         self.remote_prox_file_name = None
605         self._prox_config_data = None
606         self.additional_files = {}
607         self.config_queue = Queue()
608         # allow_exit_without_flush
609         self.config_queue.cancel_join_thread()
610         self._global_section = None
611
612     @property
613     def prox_config_data(self):
614         if self._prox_config_data is None:
615             # this will block, but it needs too
616             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
617         return self._prox_config_data
618
619     @property
620     def global_section(self):
621         if self._global_section is None and self.prox_config_data:
622             self._global_section = self.find_section("global")
623         return self._global_section
624
625     def find_section(self, name, default=_LOCAL_OBJECT):
626         result = next((value for key, value in self.prox_config_data if key == name), default)
627         if result is _LOCAL_OBJECT:
628             raise KeyError('{} not found in Prox config'.format(name))
629         return result
630
631     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
632         section = self.find_section(section_name, [])
633         result = next((value for key, value in section if key == section_key), default)
634         if result is _LOCAL_OBJECT:
635             template = '{} not found in {} section of Prox config'
636             raise KeyError(template.format(section_key, section_name))
637         return result
638
639     def copy_to_target(self, config_file_path, prox_file):
640         remote_path = os.path.join("/tmp", prox_file)
641         self.ssh_helper.put(config_file_path, remote_path)
642         return remote_path
643
644     @staticmethod
645     def _get_tx_port(section, sections):
646         iface_port = [-1]
647         for item in sections[section]:
648             if item[0] == "tx port":
649                 iface_port = re.findall(r'\d+', item[1])
650                 # do we want the last one?
651                 #   if yes, then can we reverse?
652         return int(iface_port[0])
653
654     @staticmethod
655     def _replace_quoted_with_value(quoted, value, count=1):
656         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
657         return new_string
658
659     def _insert_additional_file(self, value):
660         file_str = value.split('"')
661         base_name = os.path.basename(file_str[1])
662         file_str[1] = self.additional_files[base_name]
663         return '"'.join(file_str)
664
665     def generate_prox_config_file(self, config_path):
666         sections = []
667         prox_config = ConfigParser(config_path, sections)
668         prox_config.parse()
669
670         # Ensure MAC is set "hardware"
671         all_ports = self.vnfd_helper.port_pairs.all_ports
672         # use dpdk port number
673         for port_name in all_ports:
674             port_num = self.vnfd_helper.port_num(port_name)
675             port_section_name = "port {}".format(port_num)
676             for section_name, section in sections:
677                 if port_section_name != section_name:
678                     continue
679
680                 for section_data in section:
681                     if section_data[0] == "mac":
682                         section_data[1] = "hardware"
683
684         # search for dst mac
685         for _, section in sections:
686             for section_data in section:
687                 item_key, item_val = section_data
688                 if item_val.startswith("@@dst_mac"):
689                     tx_port_iter = re.finditer(r'\d+', item_val)
690                     tx_port_no = int(next(tx_port_iter).group(0))
691                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
692                     mac = intf["virtual-interface"]["dst_mac"]
693                     section_data[1] = mac.replace(":", " ", 6)
694
695                 if item_key == "dst mac" and item_val.startswith("@@"):
696                     tx_port_iter = re.finditer(r'\d+', item_val)
697                     tx_port_no = int(next(tx_port_iter).group(0))
698                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
699                     mac = intf["virtual-interface"]["dst_mac"]
700                     section_data[1] = mac
701
702         # if addition file specified in prox config
703         if not self.additional_files:
704             return sections
705
706         for section_name, section in sections:
707             for section_data in section:
708                 try:
709                     if section_data[0].startswith("dofile"):
710                         section_data[0] = self._insert_additional_file(section_data[0])
711
712                     if section_data[1].startswith("dofile"):
713                         section_data[1] = self._insert_additional_file(section_data[1])
714                 except:  # pylint: disable=bare-except
715                     pass
716
717         return sections
718
719     @staticmethod
720     def write_prox_lua(lua_config):
721         """
722         Write an .ini-format config file for PROX (parameters.lua)
723         PROX does not allow a space before/after the =, so we need
724         a custom method
725         """
726         out = []
727         for key in lua_config:
728             value = '"' + lua_config[key] + '"'
729             if key == "__name__":
730                 continue
731             if value is not None and value != '@':
732                 key = "=".join((key, str(value).replace('\n', '\n\t')))
733                 out.append(key)
734             else:
735                 key = str(key).replace('\n', '\n\t')
736                 out.append(key)
737         return os.linesep.join(out)
738
739     @staticmethod
740     def write_prox_config(prox_config):
741         """
742         Write an .ini-format config file for PROX
743         PROX does not allow a space before/after the =, so we need
744         a custom method
745         """
746         out = []
747         for (section_name, section) in prox_config:
748             out.append("[{}]".format(section_name))
749             for item in section:
750                 key, value = item
751                 if key == "__name__":
752                     continue
753                 if value is not None and value != '@':
754                     key = "=".join((key, str(value).replace('\n', '\n\t')))
755                     out.append(key)
756                 else:
757                     key = str(key).replace('\n', '\n\t')
758                     out.append(key)
759         return os.linesep.join(out)
760
761     def put_string_to_file(self, s, remote_path):
762         file_obj = cStringIO(s)
763         self.ssh_helper.put_file_obj(file_obj, remote_path)
764         return remote_path
765
766     def generate_prox_lua_file(self):
767         p = OrderedDict()
768         all_ports = self.vnfd_helper.port_pairs.all_ports
769         for port_name in all_ports:
770             port_num = self.vnfd_helper.port_num(port_name)
771             intf = self.vnfd_helper.find_interface(name=port_name)
772             vintf = intf['virtual-interface']
773             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
774             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
775
776         return p
777
778     def upload_prox_lua(self, config_file, lua_data):
779         # prox can't handle spaces around ' = ' so use custom method
780         out = StringIO(self.write_prox_lua(lua_data))
781         out.seek(0)
782         remote_path = os.path.join("/tmp", config_file)
783         self.ssh_helper.put_file_obj(out, remote_path)
784
785         return remote_path
786
787     def upload_prox_config(self, config_file, prox_config_data):
788         # prox can't handle spaces around ' = ' so use custom method
789         out = StringIO(self.write_prox_config(prox_config_data))
790         out.seek(0)
791         remote_path = os.path.join("/tmp", config_file)
792         self.ssh_helper.put_file_obj(out, remote_path)
793
794         return remote_path
795
796     def build_config_file(self):
797         task_path = self.scenario_helper.task_path
798         options = self.scenario_helper.options
799         config_path = options['prox_config']
800         config_file = os.path.basename(config_path)
801         config_path = find_relative_file(config_path, task_path)
802         self.additional_files = {}
803
804         try:
805             if options['prox_generate_parameter']:
806                 self.lua = []
807                 self.lua = self.generate_prox_lua_file()
808                 if len(self.lua) > 0:
809                     self.upload_prox_lua("parameters.lua", self.lua)
810         except:  # pylint: disable=bare-except
811             pass
812
813         prox_files = options.get('prox_files', [])
814         if isinstance(prox_files, six.string_types):
815             prox_files = [prox_files]
816         for key_prox_file in prox_files:
817             base_prox_file = os.path.basename(key_prox_file)
818             key_prox_path = find_relative_file(key_prox_file, task_path)
819             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
820             self.additional_files[base_prox_file] = remote_prox_file
821
822         self._prox_config_data = self.generate_prox_config_file(config_path)
823         # copy config to queue so we can read it from traffic_runner process
824         self.config_queue.put(self._prox_config_data)
825         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
826
827     def build_config(self):
828         self.build_config_file()
829
830         options = self.scenario_helper.options
831         prox_args = options['prox_args']
832         tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
833
834         self.pipeline_kwargs = {
835             'tool_path': tool_path,
836             'tool_dir': os.path.dirname(tool_path),
837             'cfg_file': self.remote_path,
838             'args': ' '.join(' '.join([str(k), str(v) if v else ''])
839                              for k, v in prox_args.items())
840         }
841
842         cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
843                         "{args} -f {cfg_file} '")
844         return cmd_template.format(**self.pipeline_kwargs)
845
846
847 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
848 class ProxResourceHelper(ClientResourceHelper):
849
850     RESOURCE_WORD = 'prox'
851
852     PROX_MODE = ""
853
854     WAIT_TIME = 3
855
856     @staticmethod
857     def find_pci(pci, bound_pci):
858         # we have to substring match PCI bus address from the end
859         return any(b.endswith(pci) for b in bound_pci)
860
861     def __init__(self, setup_helper):
862         super(ProxResourceHelper, self).__init__(setup_helper)
863         self.mgmt_interface = self.vnfd_helper.mgmt_interface
864         self._user = self.mgmt_interface["user"]
865         self._ip = self.mgmt_interface["ip"]
866
867         self.done = False
868         self._vpci_to_if_name_map = None
869         self.additional_file = {}
870         self.remote_prox_file_name = None
871         self.lower = None
872         self.upper = None
873         self.step_delta = 1
874         self.step_time = 0.5
875         self._test_type = None
876
877     @property
878     def sut(self):
879         if not self.client:
880             self.client = self._connect()
881         return self.client
882
883     @property
884     def test_type(self):
885         if self._test_type is None:
886             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
887         return self._test_type
888
889     def run_traffic(self, traffic_profile):
890         self._queue.cancel_join_thread()
891         self.lower = 0.0
892         self.upper = 100.0
893
894         traffic_profile.init(self._queue)
895         # this frees up the run_traffic loop
896         self.client_started.value = 1
897
898         while not self._terminated.value:
899             # move it all to traffic_profile
900             self._run_traffic_once(traffic_profile)
901
902     def _run_traffic_once(self, traffic_profile):
903         traffic_profile.execute_traffic(self)
904         if traffic_profile.done:
905             self._queue.put({'done': True})
906             LOG.debug("tg_prox done")
907             self._terminated.value = 1
908
909     # For VNF use ResourceHelper method to collect KPIs directly.
910     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
911     def collect_collectd_kpi(self):
912         return self._collect_resource_kpi()
913
914     def collect_kpi(self):
915         result = super(ProxResourceHelper, self).collect_kpi()
916         # add in collectd kpis manually
917         if result:
918             result['collect_stats'] = self._collect_resource_kpi()
919         return result
920
921     def terminate(self):
922         # should not be called, use VNF terminate
923         raise NotImplementedError()
924
925     def up_post(self):
926         return self.sut  # force connection
927
928     def execute(self, cmd, *args, **kwargs):
929         func = getattr(self.sut, cmd, None)
930         if func:
931             return func(*args, **kwargs)
932         return None
933
934     def _connect(self, client=None):
935         """Run and connect to prox on the remote system """
936         # De-allocating a large amount of hugepages takes some time. If a new
937         # PROX instance is started immediately after killing the previous one,
938         # it might not be able to allocate hugepages, because they are still
939         # being freed. Hence the -w switch.
940         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
941         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
942         # -f ./handle_none-4.cfg"
943         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
944         #  "; " \
945         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
946         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
947         # sudo " \
948         #    + "./build/Prox " + prox_args
949         # log.debug("Starting PROX with command [%s]", prox_cmd)
950         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
951         # self._ip, prox_cmd))
952         if client is None:
953             client = ProxSocketHelper()
954
955         # try connecting to Prox for 60s
956         for _ in range(RETRY_SECONDS):
957             time.sleep(RETRY_INTERVAL)
958             try:
959                 client.connect(self._ip, PROX_PORT)
960             except (socket.gaierror, socket.error):
961                 continue
962             else:
963                 return client
964
965         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
966         raise Exception(msg.format(self._ip, PROX_PORT))
967
968
969 class ProxDataHelper(object):
970
971     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
972         super(ProxDataHelper, self).__init__()
973         self.vnfd_helper = vnfd_helper
974         self.sut = sut
975         self.pkt_size = pkt_size
976         self.value = value
977         self.tolerated_loss = tolerated_loss
978         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
979         self.tsc_hz = None
980         self.measured_stats = None
981         self.latency = None
982         self._totals_and_pps = None
983         self.result_tuple = None
984
985     @property
986     def totals_and_pps(self):
987         if self._totals_and_pps is None:
988             rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
989             pps = self.value / 100.0 * self.line_rate_to_pps()
990             self._totals_and_pps = rx_total, tx_total, pps
991         return self._totals_and_pps
992
993     @property
994     def rx_total(self):
995         return self.totals_and_pps[0]
996
997     @property
998     def tx_total(self):
999         return self.totals_and_pps[1]
1000
1001     @property
1002     def pps(self):
1003         return self.totals_and_pps[2]
1004
1005     @property
1006     def samples(self):
1007         samples = {}
1008         for port_name, port_num in self.vnfd_helper.ports_iter():
1009             try:
1010                 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1011                 samples[port_name] = {
1012                     "in_packets": port_rx_total,
1013                     "out_packets": port_tx_total,
1014                 }
1015             except (KeyError, TypeError, NameError, MemoryError, ValueError,
1016                     SystemError, BufferError):
1017                 samples[port_name] = {
1018                     "in_packets": 0,
1019                     "out_packets": 0,
1020                 }
1021         return samples
1022
1023     def __enter__(self):
1024         self.check_interface_count()
1025         return self
1026
1027     def __exit__(self, exc_type, exc_val, exc_tb):
1028         self.make_tuple()
1029
1030     def make_tuple(self):
1031         if self.result_tuple:
1032             return
1033
1034         self.result_tuple = ProxTestDataTuple(
1035             self.tolerated_loss,
1036             self.tsc_hz,
1037             self.measured_stats['delta'].rx,
1038             self.measured_stats['delta'].tx,
1039             self.measured_stats['delta'].tsc,
1040             self.latency,
1041             self.rx_total,
1042             self.tx_total,
1043             self.pps,
1044         )
1045         self.result_tuple.log_data()
1046
1047     @contextmanager
1048     def measure_tot_stats(self):
1049         with self.sut.measure_tot_stats() as self.measured_stats:
1050             yield
1051
1052     def check_interface_count(self):
1053         # do this assert in init?  unless we expect interface count to
1054         # change from one run to another run...
1055         assert self.port_count in {1, 2, 4}, \
1056             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1057
1058     def capture_tsc_hz(self):
1059         self.tsc_hz = float(self.sut.hz())
1060
1061     def line_rate_to_pps(self):
1062         # NOTE: to fix, don't hardcode 10Gb/s
1063         return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1064
1065
1066 class ProxProfileHelper(object):
1067
1068     __prox_profile_type__ = "Generic"
1069
1070     PROX_CORE_GEN_MODE = "gen"
1071     PROX_CORE_LAT_MODE = "lat"
1072
1073     @classmethod
1074     def get_cls(cls, helper_type):
1075         """Return class of specified type."""
1076         if not helper_type:
1077             return ProxProfileHelper
1078
1079         for profile_helper_class in utils.itersubclasses(cls):
1080             if helper_type == profile_helper_class.__prox_profile_type__:
1081                 return profile_helper_class
1082
1083         return ProxProfileHelper
1084
1085     @classmethod
1086     def make_profile_helper(cls, resource_helper):
1087         return cls.get_cls(resource_helper.test_type)(resource_helper)
1088
1089     def __init__(self, resource_helper):
1090         super(ProxProfileHelper, self).__init__()
1091         self.resource_helper = resource_helper
1092         self._cpu_topology = None
1093         self._test_cores = None
1094         self._latency_cores = None
1095
1096     @property
1097     def cpu_topology(self):
1098         if not self._cpu_topology:
1099             stdout = io.BytesIO()
1100             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1101             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1102         return self._cpu_topology
1103
1104     @property
1105     def test_cores(self):
1106         if not self._test_cores:
1107             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1108         return self._test_cores
1109
1110     @property
1111     def latency_cores(self):
1112         if not self._latency_cores:
1113             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1114         return self._latency_cores
1115
1116     @contextmanager
1117     def traffic_context(self, pkt_size, value):
1118         self.sut.stop_all()
1119         self.sut.reset_stats()
1120         try:
1121             self.sut.set_pkt_size(self.test_cores, pkt_size)
1122             self.sut.set_speed(self.test_cores, value)
1123             self.sut.start_all()
1124             yield
1125         finally:
1126             self.sut.stop_all()
1127
1128     def get_cores(self, mode):
1129         cores = []
1130
1131         for section_name, section in self.setup_helper.prox_config_data:
1132             if not section_name.startswith("core"):
1133                 continue
1134
1135             for key, value in section:
1136                 if key == "mode" and value == mode:
1137                     core_tuple = CoreSocketTuple(section_name)
1138                     core = core_tuple.core_id
1139                     cores.append(core)
1140
1141         return cores
1142
1143     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1144         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1145
1146         with data_helper, self.traffic_context(pkt_size, value):
1147             with data_helper.measure_tot_stats():
1148                 time.sleep(duration)
1149                 # Getting statistics to calculate PPS at right speed....
1150                 data_helper.capture_tsc_hz()
1151                 data_helper.latency = self.get_latency()
1152
1153         return data_helper.result_tuple, data_helper.samples
1154
1155     def get_latency(self):
1156         """
1157         :return: return lat_min, lat_max, lat_avg
1158         :rtype: list
1159         """
1160
1161         if not self._latency_cores:
1162             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1163
1164         if self._latency_cores:
1165             return self.sut.lat_stats(self._latency_cores)
1166         return []
1167
1168     def terminate(self):
1169         pass
1170
1171     def __getattr__(self, item):
1172         return getattr(self.resource_helper, item)
1173
1174
1175 class ProxMplsProfileHelper(ProxProfileHelper):
1176
1177     __prox_profile_type__ = "MPLS tag/untag"
1178
1179     def __init__(self, resource_helper):
1180         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1181         self._cores_tuple = None
1182
1183     @property
1184     def mpls_cores(self):
1185         if not self._cores_tuple:
1186             self._cores_tuple = self.get_cores_mpls()
1187         return self._cores_tuple
1188
1189     @property
1190     def tagged_cores(self):
1191         return self.mpls_cores[0]
1192
1193     @property
1194     def plain_cores(self):
1195         return self.mpls_cores[1]
1196
1197     def get_cores_mpls(self):
1198         cores_tagged = []
1199         cores_plain = []
1200         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1201             if not section_name.startswith("core"):
1202                 continue
1203
1204             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1205                 continue
1206
1207             for item_key, item_value in section:
1208                 if item_key != 'name':
1209                     continue
1210
1211                 if item_value.startswith("tag"):
1212                     core_tuple = CoreSocketTuple(section_name)
1213                     core_tag = core_tuple.core_id
1214                     cores_tagged.append(core_tag)
1215
1216                 elif item_value.startswith("udp"):
1217                     core_tuple = CoreSocketTuple(section_name)
1218                     core_udp = core_tuple.core_id
1219                     cores_plain.append(core_udp)
1220
1221         return cores_tagged, cores_plain
1222
1223     @contextmanager
1224     def traffic_context(self, pkt_size, value):
1225         self.sut.stop_all()
1226         self.sut.reset_stats()
1227         try:
1228             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1229             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1230             self.sut.set_speed(self.tagged_cores, value)
1231             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1232             self.sut.set_speed(self.plain_cores, value * ratio)
1233             self.sut.start_all()
1234             yield
1235         finally:
1236             self.sut.stop_all()
1237
1238
1239 class ProxBngProfileHelper(ProxProfileHelper):
1240
1241     __prox_profile_type__ = "BNG gen"
1242
1243     def __init__(self, resource_helper):
1244         super(ProxBngProfileHelper, self).__init__(resource_helper)
1245         self._cores_tuple = None
1246
1247     @property
1248     def bng_cores(self):
1249         if not self._cores_tuple:
1250             self._cores_tuple = self.get_cores_gen_bng_qos()
1251         return self._cores_tuple
1252
1253     @property
1254     def cpe_cores(self):
1255         return self.bng_cores[0]
1256
1257     @property
1258     def inet_cores(self):
1259         return self.bng_cores[1]
1260
1261     @property
1262     def arp_cores(self):
1263         return self.bng_cores[2]
1264
1265     @property
1266     def arp_task_cores(self):
1267         return self.bng_cores[3]
1268
1269     @property
1270     def all_rx_cores(self):
1271         return self.latency_cores
1272
1273     def get_cores_gen_bng_qos(self):
1274         cpe_cores = []
1275         inet_cores = []
1276         arp_cores = []
1277         arp_tasks_core = [0]
1278         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1279             if not section_name.startswith("core"):
1280                 continue
1281
1282             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1283                 continue
1284
1285             for item_key, item_value in section:
1286                 if item_key != 'name':
1287                     continue
1288
1289                 if item_value.startswith("cpe"):
1290                     core_tuple = CoreSocketTuple(section_name)
1291                     cpe_core = core_tuple.core_id
1292                     cpe_cores.append(cpe_core)
1293
1294                 elif item_value.startswith("inet"):
1295                     core_tuple = CoreSocketTuple(section_name)
1296                     inet_core = core_tuple.core_id
1297                     inet_cores.append(inet_core)
1298
1299                 elif item_value.startswith("arp"):
1300                     core_tuple = CoreSocketTuple(section_name)
1301                     arp_core = core_tuple.core_id
1302                     arp_cores.append(arp_core)
1303
1304                 # We check the tasks/core separately
1305                 if item_value.startswith("arp_task"):
1306                     core_tuple = CoreSocketTuple(section_name)
1307                     arp_task_core = core_tuple.core_id
1308                     arp_tasks_core.append(arp_task_core)
1309
1310         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1311
1312     @contextmanager
1313     def traffic_context(self, pkt_size, value):
1314         # Tester is sending packets at the required speed already after
1315         # setup_test(). Just get the current statistics, sleep the required
1316         # amount of time and calculate packet loss.
1317         inet_pkt_size = pkt_size
1318         cpe_pkt_size = pkt_size - 24
1319         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1320
1321         curr_up_speed = curr_down_speed = 0
1322         max_up_speed = max_down_speed = value
1323         if ratio < 1:
1324             max_down_speed = value * ratio
1325         else:
1326             max_up_speed = value / ratio
1327
1328         # Initialize cores
1329         self.sut.stop_all()
1330         time.sleep(0.5)
1331
1332         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1333         # wrong.
1334         self.sut.start(self.all_rx_cores)
1335         time.sleep(0.5)
1336         self.sut.stop(self.all_rx_cores)
1337         time.sleep(0.5)
1338         self.sut.reset_stats()
1339
1340         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1341         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1342
1343         self.sut.reset_values(self.cpe_cores)
1344         self.sut.reset_values(self.inet_cores)
1345
1346         # Set correct IP and UDP lengths in packet headers
1347         # CPE
1348         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1349         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1350         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1351         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1352
1353         # INET
1354         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1355         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1356         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1357         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1358         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1359         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1360
1361         # Sending ARP to initialize tables - need a few seconds of generation
1362         # to make sure all CPEs are initialized
1363         LOG.info("Initializing SUT: sending ARP packets")
1364         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1365         self.sut.set_speed(self.inet_cores, curr_up_speed)
1366         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1367         self.sut.start(self.arp_cores)
1368         time.sleep(4)
1369
1370         # Ramp up the transmission speed. First go to the common speed, then
1371         # increase steps for the faster one.
1372         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1373
1374         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1375
1376         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1377             # The min(..., ...) takes care of 1) floating point rounding errors
1378             # that could make curr_*_speed to be slightly greater than
1379             # max_*_speed and 2) max_*_speed not being an exact multiple of
1380             # self._step_delta.
1381             if curr_up_speed < max_up_speed:
1382                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1383             if curr_down_speed < max_down_speed:
1384                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1385
1386             self.sut.set_speed(self.inet_cores, curr_up_speed)
1387             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1388             time.sleep(self.step_time)
1389
1390         LOG.info("Target speeds reached. Starting real test.")
1391
1392         yield
1393
1394         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1395         LOG.info("Test ended. Flushing NIC buffers")
1396         self.sut.start(self.all_rx_cores)
1397         time.sleep(3)
1398         self.sut.stop(self.all_rx_cores)
1399
1400     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1401         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1402
1403         with data_helper, self.traffic_context(pkt_size, value):
1404             with data_helper.measure_tot_stats():
1405                 time.sleep(duration)
1406                 # Getting statistics to calculate PPS at right speed....
1407                 data_helper.capture_tsc_hz()
1408                 data_helper.latency = self.get_latency()
1409
1410         return data_helper.result_tuple, data_helper.samples
1411
1412
1413 class ProxVpeProfileHelper(ProxProfileHelper):
1414
1415     __prox_profile_type__ = "vPE gen"
1416
1417     def __init__(self, resource_helper):
1418         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1419         self._cores_tuple = None
1420         self._ports_tuple = None
1421
1422     @property
1423     def vpe_cores(self):
1424         if not self._cores_tuple:
1425             self._cores_tuple = self.get_cores_gen_vpe()
1426         return self._cores_tuple
1427
1428     @property
1429     def cpe_cores(self):
1430         return self.vpe_cores[0]
1431
1432     @property
1433     def inet_cores(self):
1434         return self.vpe_cores[1]
1435
1436     @property
1437     def all_rx_cores(self):
1438         return self.latency_cores
1439
1440     @property
1441     def vpe_ports(self):
1442         if not self._ports_tuple:
1443             self._ports_tuple = self.get_ports_gen_vpe()
1444         return self._ports_tuple
1445
1446     @property
1447     def cpe_ports(self):
1448         return self.vpe_ports[0]
1449
1450     @property
1451     def inet_ports(self):
1452         return self.vpe_ports[1]
1453
1454     def get_cores_gen_vpe(self):
1455         cpe_cores = []
1456         inet_cores = []
1457         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1458             if not section_name.startswith("core"):
1459                 continue
1460
1461             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1462                 continue
1463
1464             for item_key, item_value in section:
1465                 if item_key != 'name':
1466                     continue
1467
1468                 if item_value.startswith("cpe"):
1469                     core_tuple = CoreSocketTuple(section_name)
1470                     core_tag = core_tuple.core_id
1471                     cpe_cores.append(core_tag)
1472
1473                 elif item_value.startswith("inet"):
1474                     core_tuple = CoreSocketTuple(section_name)
1475                     inet_core = core_tuple.core_id
1476                     inet_cores.append(inet_core)
1477
1478         return cpe_cores, inet_cores
1479
1480     def get_ports_gen_vpe(self):
1481         cpe_ports = []
1482         inet_ports = []
1483
1484         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1485             if not section_name.startswith("port"):
1486                 continue
1487             tx_port_iter = re.finditer(r'\d+', section_name)
1488             tx_port_no = int(next(tx_port_iter).group(0))
1489
1490             for item_key, item_value in section:
1491                 if item_key != 'name':
1492                     continue
1493
1494                 if item_value.startswith("cpe"):
1495                     cpe_ports.append(tx_port_no)
1496
1497                 elif item_value.startswith("inet"):
1498                     inet_ports.append(tx_port_no)
1499
1500         return cpe_ports, inet_ports
1501
1502     @contextmanager
1503     def traffic_context(self, pkt_size, value):
1504         # Calculate the target upload and download speed. The upload and
1505         # download packets have different packet sizes, so in order to get
1506         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1507         # of the packet sizes.
1508         cpe_pkt_size = pkt_size
1509         inet_pkt_size = pkt_size - 4
1510         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1511
1512         curr_up_speed = curr_down_speed = 0
1513         max_up_speed = max_down_speed = value
1514         if ratio < 1:
1515             max_down_speed = value * ratio
1516         else:
1517             max_up_speed = value / ratio
1518
1519         # Adjust speed when multiple cores per port are used to generate traffic
1520         if len(self.cpe_ports) != len(self.cpe_cores):
1521             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1522         if len(self.inet_ports) != len(self.inet_cores):
1523             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1524
1525         # Initialize cores
1526         self.sut.stop_all()
1527         time.sleep(2)
1528
1529         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1530         # wrong.
1531         self.sut.start(self.all_rx_cores)
1532         time.sleep(2)
1533         self.sut.stop(self.all_rx_cores)
1534         time.sleep(2)
1535         self.sut.reset_stats()
1536
1537         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1538         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1539
1540         self.sut.reset_values(self.cpe_cores)
1541         self.sut.reset_values(self.inet_cores)
1542
1543         # Set correct IP and UDP lengths in packet headers
1544         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1545         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1546         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1547         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1548
1549         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1550         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1551         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1552         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1553
1554         self.sut.set_speed(self.inet_cores, curr_up_speed)
1555         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1556
1557         # Ramp up the transmission speed. First go to the common speed, then
1558         # increase steps for the faster one.
1559         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1560
1561         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1562
1563         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1564             # The min(..., ...) takes care of 1) floating point rounding errors
1565             # that could make curr_*_speed to be slightly greater than
1566             # max_*_speed and 2) max_*_speed not being an exact multiple of
1567             # self._step_delta.
1568             if curr_up_speed < max_up_speed:
1569                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1570             if curr_down_speed < max_down_speed:
1571                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1572
1573             self.sut.set_speed(self.inet_cores, curr_up_speed)
1574             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1575             time.sleep(self.step_time)
1576
1577         LOG.info("Target speeds reached. Starting real test.")
1578
1579         yield
1580
1581         self.sut.stop(self.cpe_cores + self.inet_cores)
1582         LOG.info("Test ended. Flushing NIC buffers")
1583         self.sut.start(self.all_rx_cores)
1584         time.sleep(3)
1585         self.sut.stop(self.all_rx_cores)
1586
1587     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1588         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1589
1590         with data_helper, self.traffic_context(pkt_size, value):
1591             with data_helper.measure_tot_stats():
1592                 time.sleep(duration)
1593                 # Getting statistics to calculate PPS at right speed....
1594                 data_helper.capture_tsc_hz()
1595                 data_helper.latency = self.get_latency()
1596
1597         return data_helper.result_tuple, data_helper.samples
1598
1599
1600 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1601
1602     __prox_profile_type__ = "lwAFTR gen"
1603
1604     def __init__(self, resource_helper):
1605         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1606         self._cores_tuple = None
1607         self._ports_tuple = None
1608         self.step_delta = 5
1609         self.step_time = 0.5
1610
1611     @property
1612     def _lwaftr_cores(self):
1613         if not self._cores_tuple:
1614             self._cores_tuple = self._get_cores_gen_lwaftr()
1615         return self._cores_tuple
1616
1617     @property
1618     def tun_cores(self):
1619         return self._lwaftr_cores[0]
1620
1621     @property
1622     def inet_cores(self):
1623         return self._lwaftr_cores[1]
1624
1625     @property
1626     def _lwaftr_ports(self):
1627         if not self._ports_tuple:
1628             self._ports_tuple = self._get_ports_gen_lw_aftr()
1629         return self._ports_tuple
1630
1631     @property
1632     def tun_ports(self):
1633         return self._lwaftr_ports[0]
1634
1635     @property
1636     def inet_ports(self):
1637         return self._lwaftr_ports[1]
1638
1639     @property
1640     def all_rx_cores(self):
1641         return self.latency_cores
1642
1643     def _get_cores_gen_lwaftr(self):
1644         tun_cores = []
1645         inet_cores = []
1646         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1647             if not section_name.startswith("core"):
1648                 continue
1649
1650             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1651                 continue
1652
1653             core_tuple = CoreSocketTuple(section_name)
1654             core_tag = core_tuple.core_id
1655             for item_value in (v for k, v in section if k == 'name'):
1656                 if item_value.startswith('tun'):
1657                     tun_cores.append(core_tag)
1658                 elif item_value.startswith('inet'):
1659                     inet_cores.append(core_tag)
1660
1661         return tun_cores, inet_cores
1662
1663     def _get_ports_gen_lw_aftr(self):
1664         tun_ports = []
1665         inet_ports = []
1666
1667         re_port = re.compile(r'port (\d+)')
1668         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1669             match = re_port.search(section_name)
1670             if not match:
1671                 continue
1672
1673             tx_port_no = int(match.group(1))
1674             for item_value in (v for k, v in section if k == 'name'):
1675                 if item_value.startswith('lwB4'):
1676                     tun_ports.append(tx_port_no)
1677                 elif item_value.startswith('inet'):
1678                     inet_ports.append(tx_port_no)
1679
1680         return tun_ports, inet_ports
1681
1682     @staticmethod
1683     def _resize(len1, len2):
1684         if len1 == len2:
1685             return 1.0
1686         return 1.0 * len1 / len2
1687
1688     @contextmanager
1689     def traffic_context(self, pkt_size, value):
1690         # Tester is sending packets at the required speed already after
1691         # setup_test(). Just get the current statistics, sleep the required
1692         # amount of time and calculate packet loss.
1693         tun_pkt_size = pkt_size
1694         inet_pkt_size = pkt_size - 40
1695         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1696
1697         curr_up_speed = curr_down_speed = 0
1698         max_up_speed = max_down_speed = value
1699
1700         max_up_speed = value / ratio
1701
1702         # Adjust speed when multiple cores per port are used to generate traffic
1703         if len(self.tun_ports) != len(self.tun_cores):
1704             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1705         if len(self.inet_ports) != len(self.inet_cores):
1706             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1707
1708         # Initialize cores
1709         self.sut.stop_all()
1710         time.sleep(0.5)
1711
1712         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1713         # wrong.
1714         self.sut.start(self.all_rx_cores)
1715         time.sleep(0.5)
1716         self.sut.stop(self.all_rx_cores)
1717         time.sleep(0.5)
1718         self.sut.reset_stats()
1719
1720         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1721         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1722
1723         self.sut.reset_values(self.tun_cores)
1724         self.sut.reset_values(self.inet_cores)
1725
1726         # Set correct IP and UDP lengths in packet headers
1727         # tun
1728         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1729         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1730         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1731         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1732         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1733         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1734
1735         # INET
1736         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1737         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1738         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1739         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1740
1741         LOG.info("Initializing SUT: sending lwAFTR packets")
1742         self.sut.set_speed(self.inet_cores, curr_up_speed)
1743         self.sut.set_speed(self.tun_cores, curr_down_speed)
1744         time.sleep(4)
1745
1746         # Ramp up the transmission speed. First go to the common speed, then
1747         # increase steps for the faster one.
1748         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1749
1750         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1751
1752         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1753             # The min(..., ...) takes care of 1) floating point rounding errors
1754             # that could make curr_*_speed to be slightly greater than
1755             # max_*_speed and 2) max_*_speed not being an exact multiple of
1756             # self._step_delta.
1757             if curr_up_speed < max_up_speed:
1758                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1759             if curr_down_speed < max_down_speed:
1760                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1761
1762             self.sut.set_speed(self.inet_cores, curr_up_speed)
1763             self.sut.set_speed(self.tun_cores, curr_down_speed)
1764             time.sleep(self.step_time)
1765
1766         LOG.info("Target speeds reached. Starting real test.")
1767
1768         yield
1769
1770         self.sut.stop(self.tun_cores + self.inet_cores)
1771         LOG.info("Test ended. Flushing NIC buffers")
1772         self.sut.start(self.all_rx_cores)
1773         time.sleep(3)
1774         self.sut.stop(self.all_rx_cores)
1775
1776     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1777         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1778
1779         with data_helper, self.traffic_context(pkt_size, value):
1780             with data_helper.measure_tot_stats():
1781                 time.sleep(duration)
1782                 # Getting statistics to calculate PPS at right speed....
1783                 data_helper.capture_tsc_hz()
1784                 data_helper.latency = self.get_latency()
1785
1786         return data_helper.result_tuple, data_helper.samples