Merge changes from topic 'YARDSTICK-1040'
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import array
16 import io
17 import logging
18 import operator
19 import os
20 import re
21 import select
22 import socket
23 import time
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
28
29 import six
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
32
33 from yardstick.common import utils
34 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
35 from yardstick.network_services.helpers.iniparser import ConfigParser
36 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
38 from yardstick.network_services import constants
39
40 PROX_PORT = 8474
41
42 SECTION_NAME = 0
43 SECTION_CONTENTS = 1
44
45 LOG = logging.getLogger(__name__)
46 LOG.setLevel(logging.DEBUG)
47
48 BITS_PER_BYTE = 8
49 RETRY_SECONDS = 60
50 RETRY_INTERVAL = 1
51
52 CONFIGURATION_OPTIONS = (
53     # dict key           section     key               default value
54     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
55     ('testDuration', 'general', 'test_duration', 5.0),
56     ('testPrecision', 'general', 'test_precision', 1.0),
57     ('tests', 'general', 'tests', None),
58     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
59
60     ('logFile', 'logging', 'file', 'dats.log'),
61     ('logDateFormat', 'logging', 'datefmt', None),
62     ('logLevel', 'logging', 'level', 'INFO'),
63     ('logOverwrite', 'logging', 'overwrite', 1),
64
65     ('testerIp', 'tester', 'ip', None),
66     ('testerUser', 'tester', 'user', 'root'),
67     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
68     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
69     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
70     ('testerSocketId', 'tester', 'socket_id', 0),
71
72     ('sutIp', 'sut', 'ip', None),
73     ('sutUser', 'sut', 'user', 'root'),
74     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
75     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
76     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
77     ('sutSocketId', 'sut', 'socket_id', 0),
78 )
79
80
81 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
82     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
83
84     def __new__(cls, *args):
85         try:
86             matches = cls.CORE_RE.search(str(args[0]))
87             if matches:
88                 args = matches.groups()
89
90             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
91                                                        'h' if args[2] else '')
92
93         except (AttributeError, TypeError, IndexError, ValueError):
94             raise ValueError('Invalid core spec {}'.format(args))
95
96     def is_hyperthread(self):
97         return self.hyperthread == 'h'
98
99     @property
100     def index(self):
101         return int(self.is_hyperthread())
102
103     def find_in_topology(self, cpu_topology):
104         try:
105             socket_core_match = cpu_topology[self.socket_id][self.core_id]
106             sorted_match = sorted(socket_core_match.values())
107             return sorted_match[self.index][0]
108         except (KeyError, IndexError):
109             template = "Core {}{} on socket {} does not exist"
110             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
111
112
113 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
114     def __new__(cls, *args):
115         try:
116             assert args[0] is not str(args[0])
117             args = tuple(args[0])
118         except (AssertionError, IndexError, TypeError):
119             pass
120
121         return super(TotStatsTuple, cls).__new__(cls, *args)
122
123
124 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
125                                                         'delta_tx,delta_tsc,'
126                                                         'latency,rx_total,tx_total,pps')):
127     @property
128     def pkt_loss(self):
129         try:
130             return 1e2 * self.drop_total / float(self.tx_total)
131         except ZeroDivisionError:
132             return 100.0
133
134     @property
135     def mpps(self):
136         # calculate the effective throughput in Mpps
137         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
138
139     @property
140     def can_be_lost(self):
141         return int(self.tx_total * self.tolerated / 1e2)
142
143     @property
144     def drop_total(self):
145         return self.tx_total - self.rx_total
146
147     @property
148     def success(self):
149         return self.drop_total <= self.can_be_lost
150
151     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
152         if pkt_loss is None:
153             pkt_loss = self.pkt_loss
154
155         if port_samples is None:
156             port_samples = {}
157
158         latency_keys = [
159             "LatencyMin",
160             "LatencyMax",
161             "LatencyAvg",
162         ]
163
164         samples = {
165             "Throughput": self.mpps,
166             "DropPackets": pkt_loss,
167             "CurrentDropPackets": pkt_loss,
168             "TxThroughput": self.pps / 1e6,
169             "RxThroughput": self.mpps,
170             "PktSize": pkt_size,
171         }
172         if port_samples:
173             samples.update(port_samples)
174
175         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
176         return samples
177
178     def log_data(self, logger=None):
179         if logger is None:
180             logger = LOG
181
182         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
183         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
184         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
185
186
187 class PacketDump(object):
188     @staticmethod
189     def assert_func(func, value1, value2, template=None):
190         assert func(value1, value2), template.format(value1, value2)
191
192     def __init__(self, port_id, data_len, payload):
193         template = "Packet dump has specified length {}, but payload is {} bytes long"
194         self.assert_func(operator.eq, data_len, len(payload), template)
195         self._port_id = port_id
196         self._data_len = data_len
197         self._payload = payload
198
199     @property
200     def port_id(self):
201         """Get the port id of the packet dump"""
202         return self._port_id
203
204     @property
205     def data_len(self):
206         """Get the length of the data received"""
207         return self._data_len
208
209     def __str__(self):
210         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
211
212     def payload(self, start=None, end=None):
213         """Get part of the payload as a list of ordinals.
214
215         Returns a list of byte values, matching the contents of the packet dump.
216         Optional start and end parameters can be specified to retrieve only a
217         part of the packet contents.
218
219         The number of elements in the list is equal to end - start + 1, so end
220         is the offset of the last character.
221
222         Args:
223             start (pos. int): the starting offset in the payload. If it is not
224                 specified or None, offset 0 is assumed.
225             end (pos. int): the ending offset of the payload. If it is not
226                 specified or None, the contents until the end of the packet are
227                 returned.
228
229         Returns:
230             [int, int, ...]. Each int represents the ordinal value of a byte in
231             the packet payload.
232         """
233         if start is None:
234             start = 0
235
236         if end is None:
237             end = self.data_len - 1
238
239         # Bounds checking on offsets
240         template = "Start offset must be non-negative"
241         self.assert_func(operator.ge, start, 0, template)
242
243         template = "End offset must be less than {1}"
244         self.assert_func(operator.lt, end, self.data_len, template)
245
246         # Adjust for splice operation: end offset must be 1 more than the offset
247         # of the last desired character.
248         end += 1
249
250         return self._payload[start:end]
251
252
253 class ProxSocketHelper(object):
254
255     def __init__(self, sock=None):
256         """ creates new prox instance """
257         super(ProxSocketHelper, self).__init__()
258
259         if sock is None:
260             sock = socket.socket()
261
262         self._sock = sock
263         self._pkt_dumps = []
264         self.master_stats = None
265
266     def connect(self, ip, port):
267         """Connect to the prox instance on the remote system"""
268         self._sock.connect((ip, port))
269
270     def get_socket(self):
271         """ get the socket connected to the remote instance """
272         return self._sock
273
274     def _parse_socket_data(self, decoded_data, pkt_dump_only):
275         def get_newline_index():
276             return decoded_data.find('\n', index)
277
278         ret_str = ''
279         index = 0
280         for newline_index in iter(get_newline_index, -1):
281             ret_str = decoded_data[index:newline_index]
282
283             try:
284                 mode, port_id, data_len = ret_str.split(',', 2)
285             except ValueError:
286                 mode, port_id, data_len = None, None, None
287
288             if mode != 'pktdump':
289                 # Regular 1-line message. Stop reading from the socket.
290                 LOG.debug("Regular response read")
291                 return ret_str
292
293             LOG.debug("Packet dump header read: [%s]", ret_str)
294
295             # The line is a packet dump header. Parse it, read the
296             # packet payload, store the dump for later retrieval.
297             # Skip over the packet dump and continue processing: a
298             # 1-line response may follow the packet dump.
299
300             data_len = int(data_len)
301             data_start = newline_index + 1  # + 1 to skip over \n
302             data_end = data_start + data_len
303             sub_data = decoded_data[data_start:data_end]
304             pkt_payload = array.array('B', (ord(v) for v in sub_data))
305             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
306             self._pkt_dumps.append(pkt_dump)
307
308             if pkt_dump_only:
309                 # Return boolean instead of string to signal
310                 # successful reception of the packet dump.
311                 LOG.debug("Packet dump stored, returning")
312                 return True
313
314             index = data_end + 1
315
316         return ret_str
317
318     def get_data(self, pkt_dump_only=False, timeout=1):
319         """ read data from the socket """
320
321         # This method behaves slightly differently depending on whether it is
322         # called to read the response to a command (pkt_dump_only = 0) or if
323         # it is called specifically to read a packet dump (pkt_dump_only = 1).
324         #
325         # Packet dumps look like:
326         #   pktdump,<port_id>,<data_len>\n
327         #   <packet contents as byte array>\n
328         # This means the total packet dump message consists of 2 lines instead
329         # of 1 line.
330         #
331         # - Response for a command (pkt_dump_only = 0):
332         #   1) Read response from the socket until \n (end of message)
333         #   2a) If the response is a packet dump header (starts with "pktdump,"):
334         #     - Read the packet payload and store the packet dump for later
335         #       retrieval.
336         #     - Reset the state and restart from 1). Eventually state 2b) will
337         #       be reached and the function will return.
338         #   2b) If the response is not a packet dump:
339         #     - Return the received message as a string
340         #
341         # - Explicit request to read a packet dump (pkt_dump_only = 1):
342         #   - Read the dump header and payload
343         #   - Store the packet dump for later retrieval
344         #   - Return True to signify a packet dump was successfully read
345
346         def is_ready():
347             # recv() is blocking, so avoid calling it when no data is waiting.
348             ready = select.select([self._sock], [], [], timeout)
349             return bool(ready[0])
350
351         status = False
352         ret_str = ""
353         for status in iter(is_ready, False):
354             decoded_data = self._sock.recv(256).decode('utf-8')
355             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
356
357         LOG.debug("Received data from socket: [%s]", ret_str)
358         return ret_str if status else ''
359
360     def put_command(self, to_send):
361         """ send data to the remote instance """
362         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
363         try:
364             # NOTE: sendall will block, we need a timeout
365             self._sock.sendall(to_send.encode('utf-8'))
366         except:  # pylint: disable=bare-except
367             pass
368
369     def get_packet_dump(self):
370         """ get the next packet dump """
371         if self._pkt_dumps:
372             return self._pkt_dumps.pop(0)
373         return None
374
375     def stop_all_reset(self):
376         """ stop the remote instance and reset stats """
377         LOG.debug("Stop all and reset stats")
378         self.stop_all()
379         self.reset_stats()
380
381     def stop_all(self):
382         """ stop all cores on the remote instance """
383         LOG.debug("Stop all")
384         self.put_command("stop all\n")
385         time.sleep(3)
386
387     def stop(self, cores, task=''):
388         """ stop specific cores on the remote instance """
389         LOG.debug("Stopping cores %s", cores)
390         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
391         time.sleep(3)
392
393     def start_all(self):
394         """ start all cores on the remote instance """
395         LOG.debug("Start all")
396         self.put_command("start all\n")
397
398     def start(self, cores):
399         """ start specific cores on the remote instance """
400         LOG.debug("Starting cores %s", cores)
401         self.put_command("start {}\n".format(join_non_strings(',', cores)))
402         time.sleep(3)
403
404     def reset_stats(self):
405         """ reset the statistics on the remote instance """
406         LOG.debug("Reset stats")
407         self.put_command("reset stats\n")
408         time.sleep(1)
409
410     def _run_template_over_cores(self, template, cores, *args):
411         for core in cores:
412             self.put_command(template.format(core, *args))
413
414     def set_pkt_size(self, cores, pkt_size):
415         """ set the packet size to generate on the remote instance """
416         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
417         pkt_size -= 4
418         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
419         time.sleep(1)
420
421     def set_value(self, cores, offset, value, length):
422         """ set value on the remote instance """
423         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
424         LOG.debug(msg, cores, value, length, offset)
425         template = "set value {} 0 {} {} {}\n"
426         self._run_template_over_cores(template, cores, offset, value, length)
427
428     def reset_values(self, cores):
429         """ reset values on the remote instance """
430         LOG.debug("Set value for core(s) %s", cores)
431         self._run_template_over_cores("reset values {} 0\n", cores)
432
433     def set_speed(self, cores, speed, tasks=None):
434         """ set speed on the remote instance """
435         if tasks is None:
436             tasks = [0] * len(cores)
437         elif len(tasks) != len(cores):
438             LOG.error("set_speed: cores and tasks must have the same len")
439         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
440         for (core, task) in list(zip(cores, tasks)):
441             self.put_command("speed {} {} {}\n".format(core, task, speed))
442
443     def slope_speed(self, cores_speed, duration, n_steps=0):
444         """will start to increase speed from 0 to N where N is taken from
445         a['speed'] for each a in cores_speed"""
446         # by default, each step will take 0.5 sec
447         if n_steps == 0:
448             n_steps = duration * 2
449
450         private_core_data = []
451         step_duration = float(duration) / n_steps
452         for core_data in cores_speed:
453             target = float(core_data['speed'])
454             private_core_data.append({
455                 'cores': core_data['cores'],
456                 'zero': 0,
457                 'delta': target / n_steps,
458                 'current': 0,
459                 'speed': target,
460             })
461
462         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
463         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
464             time.sleep(step_duration)
465             for core_data in private_core_data:
466                 core_data['current'] = core_data[key1] + core_data[key2]
467                 self.set_speed(core_data['cores'], core_data['current'])
468
469     def set_pps(self, cores, pps, pkt_size,
470                 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
471         """ set packets per second for specific cores on the remote instance """
472         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
473         LOG.debug(msg, cores, pps, pkt_size)
474
475         # speed in percent of line-rate
476         speed = float(pps) * (pkt_size + 20) / line_speed / BITS_PER_BYTE
477         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
478
479     def lat_stats(self, cores, task=0):
480         """Get the latency statistics from the remote system"""
481         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
482         lat_min = {}
483         lat_max = {}
484         lat_avg = {}
485         for core in cores:
486             self.put_command("lat stats {} {} \n".format(core, task))
487             ret = self.get_data()
488
489             try:
490                 lat_min[core], lat_max[core], lat_avg[core] = \
491                     tuple(int(n) for n in ret.split(",")[:3])
492
493             except (AttributeError, ValueError, TypeError):
494                 pass
495
496         return lat_min, lat_max, lat_avg
497
498     def get_all_tot_stats(self):
499         self.put_command("tot stats\n")
500         all_stats_str = self.get_data().split(",")
501         if len(all_stats_str) != 4:
502             all_stats = [0] * 4
503             return all_stats
504         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
505         self.master_stats = all_stats
506         return all_stats
507
508     def hz(self):
509         return self.get_all_tot_stats()[3]
510
511     def core_stats(self, cores, task=0):
512         """Get the receive statistics from the remote system"""
513         rx = tx = drop = tsc = 0
514         for core in cores:
515             self.put_command("core stats {} {}\n".format(core, task))
516             ret = self.get_data().split(",")
517             rx += int(ret[0])
518             tx += int(ret[1])
519             drop += int(ret[2])
520             tsc = int(ret[3])
521         return rx, tx, drop, tsc
522
523     def port_stats(self, ports):
524         """get counter values from a specific port"""
525         tot_result = [0] * 12
526         for port in ports:
527             self.put_command("port_stats {}\n".format(port))
528             ret = [try_int(s, 0) for s in self.get_data().split(",")]
529             tot_result = [sum(x) for x in zip(tot_result, ret)]
530         return tot_result
531
532     @contextmanager
533     def measure_tot_stats(self):
534         start = self.get_all_tot_stats()
535         container = {'start_tot': start}
536         try:
537             yield container
538         finally:
539             container['end_tot'] = end = self.get_all_tot_stats()
540
541         container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
542
543     def tot_stats(self):
544         """Get the total statistics from the remote system"""
545         stats = self.get_all_tot_stats()
546         return stats[:3]
547
548     def tot_ierrors(self):
549         """Get the total ierrors from the remote system"""
550         self.put_command("tot ierrors tot\n")
551         recv = self.get_data().split(',')
552         tot_ierrors = int(recv[0])
553         tsc = int(recv[0])
554         return tot_ierrors, tsc
555
556     def set_count(self, count, cores):
557         """Set the number of packets to send on the specified core"""
558         self._run_template_over_cores("count {} 0 {}\n", cores, count)
559
560     def dump_rx(self, core_id, task_id=0, count=1):
561         """Activate dump on rx on the specified core"""
562         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
563         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
564         time.sleep(1.5)  # Give PROX time to set up packet dumping
565
566     def quit(self):
567         self.stop_all()
568         self._quit()
569         self.force_quit()
570
571     def _quit(self):
572         """ stop all cores on the remote instance """
573         LOG.debug("Quit prox")
574         self.put_command("quit\n")
575         time.sleep(3)
576
577     def force_quit(self):
578         """ stop all cores on the remote instance """
579         LOG.debug("Force Quit prox")
580         self.put_command("quit_force\n")
581         time.sleep(3)
582
583
584 _LOCAL_OBJECT = object()
585
586
587 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
588     # the actual app is lowercase
589     APP_NAME = 'prox'
590     # not used for Prox but added for consistency
591     VNF_TYPE = "PROX"
592
593     LUA_PARAMETER_NAME = ""
594     LUA_PARAMETER_PEER = {
595         "gen": "sut",
596         "sut": "gen",
597     }
598
599     CONFIG_QUEUE_TIMEOUT = 120
600
601     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
602         self.remote_path = None
603         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
604         self.remote_prox_file_name = None
605         self._prox_config_data = None
606         self.additional_files = {}
607         self.config_queue = Queue()
608         # allow_exit_without_flush
609         self.config_queue.cancel_join_thread()
610         self._global_section = None
611
612     @property
613     def prox_config_data(self):
614         if self._prox_config_data is None:
615             # this will block, but it needs too
616             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
617         return self._prox_config_data
618
619     @property
620     def global_section(self):
621         if self._global_section is None and self.prox_config_data:
622             self._global_section = self.find_section("global")
623         return self._global_section
624
625     def find_section(self, name, default=_LOCAL_OBJECT):
626         result = next((value for key, value in self.prox_config_data if key == name), default)
627         if result is _LOCAL_OBJECT:
628             raise KeyError('{} not found in Prox config'.format(name))
629         return result
630
631     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
632         section = self.find_section(section_name, [])
633         result = next((value for key, value in section if key == section_key), default)
634         if result is _LOCAL_OBJECT:
635             template = '{} not found in {} section of Prox config'
636             raise KeyError(template.format(section_key, section_name))
637         return result
638
639     def copy_to_target(self, config_file_path, prox_file):
640         remote_path = os.path.join("/tmp", prox_file)
641         self.ssh_helper.put(config_file_path, remote_path)
642         return remote_path
643
644     @staticmethod
645     def _get_tx_port(section, sections):
646         iface_port = [-1]
647         for item in sections[section]:
648             if item[0] == "tx port":
649                 iface_port = re.findall(r'\d+', item[1])
650                 # do we want the last one?
651                 #   if yes, then can we reverse?
652         return int(iface_port[0])
653
654     @staticmethod
655     def _replace_quoted_with_value(quoted, value, count=1):
656         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
657         return new_string
658
659     def _insert_additional_file(self, value):
660         file_str = value.split('"')
661         base_name = os.path.basename(file_str[1])
662         file_str[1] = self.additional_files[base_name]
663         return '"'.join(file_str)
664
665     def generate_prox_config_file(self, config_path):
666         sections = []
667         prox_config = ConfigParser(config_path, sections)
668         prox_config.parse()
669
670         # Ensure MAC is set "hardware"
671         all_ports = self.vnfd_helper.port_pairs.all_ports
672         # use dpdk port number
673         for port_name in all_ports:
674             port_num = self.vnfd_helper.port_num(port_name)
675             port_section_name = "port {}".format(port_num)
676             for section_name, section in sections:
677                 if port_section_name != section_name:
678                     continue
679
680                 for section_data in section:
681                     if section_data[0] == "mac":
682                         section_data[1] = "hardware"
683
684         # search for dst mac
685         for _, section in sections:
686             for section_data in section:
687                 item_key, item_val = section_data
688                 if item_val.startswith("@@dst_mac"):
689                     tx_port_iter = re.finditer(r'\d+', item_val)
690                     tx_port_no = int(next(tx_port_iter).group(0))
691                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
692                     mac = intf["virtual-interface"]["dst_mac"]
693                     section_data[1] = mac.replace(":", " ", 6)
694
695                 if item_key == "dst mac" and item_val.startswith("@@"):
696                     tx_port_iter = re.finditer(r'\d+', item_val)
697                     tx_port_no = int(next(tx_port_iter).group(0))
698                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
699                     mac = intf["virtual-interface"]["dst_mac"]
700                     section_data[1] = mac
701
702         # if addition file specified in prox config
703         if not self.additional_files:
704             return sections
705
706         for section_name, section in sections:
707             for section_data in section:
708                 try:
709                     if section_data[0].startswith("dofile"):
710                         section_data[0] = self._insert_additional_file(section_data[0])
711
712                     if section_data[1].startswith("dofile"):
713                         section_data[1] = self._insert_additional_file(section_data[1])
714                 except:  # pylint: disable=bare-except
715                     pass
716
717         return sections
718
719     @staticmethod
720     def write_prox_lua(lua_config):
721         """
722         Write an .ini-format config file for PROX (parameters.lua)
723         PROX does not allow a space before/after the =, so we need
724         a custom method
725         """
726         out = []
727         for key in lua_config:
728             value = '"' + lua_config[key] + '"'
729             if key == "__name__":
730                 continue
731             if value is not None and value != '@':
732                 key = "=".join((key, str(value).replace('\n', '\n\t')))
733                 out.append(key)
734             else:
735                 key = str(key).replace('\n', '\n\t')
736                 out.append(key)
737         return os.linesep.join(out)
738
739     @staticmethod
740     def write_prox_config(prox_config):
741         """
742         Write an .ini-format config file for PROX
743         PROX does not allow a space before/after the =, so we need
744         a custom method
745         """
746         out = []
747         for (section_name, section) in prox_config:
748             out.append("[{}]".format(section_name))
749             for item in section:
750                 key, value = item
751                 if key == "__name__":
752                     continue
753                 if value is not None and value != '@':
754                     key = "=".join((key, str(value).replace('\n', '\n\t')))
755                     out.append(key)
756                 else:
757                     key = str(key).replace('\n', '\n\t')
758                     out.append(key)
759         return os.linesep.join(out)
760
761     def put_string_to_file(self, s, remote_path):
762         file_obj = cStringIO(s)
763         self.ssh_helper.put_file_obj(file_obj, remote_path)
764         return remote_path
765
766     def generate_prox_lua_file(self):
767         p = OrderedDict()
768         all_ports = self.vnfd_helper.port_pairs.all_ports
769         for port_name in all_ports:
770             port_num = self.vnfd_helper.port_num(port_name)
771             intf = self.vnfd_helper.find_interface(name=port_name)
772             vintf = intf['virtual-interface']
773             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
774             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
775
776         return p
777
778     def upload_prox_lua(self, config_file, lua_data):
779         # prox can't handle spaces around ' = ' so use custom method
780         out = StringIO(self.write_prox_lua(lua_data))
781         out.seek(0)
782         remote_path = os.path.join("/tmp", config_file)
783         self.ssh_helper.put_file_obj(out, remote_path)
784
785         return remote_path
786
787     def upload_prox_config(self, config_file, prox_config_data):
788         # prox can't handle spaces around ' = ' so use custom method
789         out = StringIO(self.write_prox_config(prox_config_data))
790         out.seek(0)
791         remote_path = os.path.join("/tmp", config_file)
792         self.ssh_helper.put_file_obj(out, remote_path)
793
794         return remote_path
795
796     def build_config_file(self):
797         task_path = self.scenario_helper.task_path
798         options = self.scenario_helper.options
799         config_path = options['prox_config']
800         config_file = os.path.basename(config_path)
801         config_path = utils.find_relative_file(config_path, task_path)
802         self.additional_files = {}
803
804         try:
805             if options['prox_generate_parameter']:
806                 self.lua = []
807                 self.lua = self.generate_prox_lua_file()
808                 if len(self.lua) > 0:
809                     self.upload_prox_lua("parameters.lua", self.lua)
810         except:  # pylint: disable=bare-except
811             pass
812
813         prox_files = options.get('prox_files', [])
814         if isinstance(prox_files, six.string_types):
815             prox_files = [prox_files]
816         for key_prox_file in prox_files:
817             base_prox_file = os.path.basename(key_prox_file)
818             key_prox_path = utils.find_relative_file(key_prox_file, task_path)
819             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
820             self.additional_files[base_prox_file] = remote_prox_file
821
822         self._prox_config_data = self.generate_prox_config_file(config_path)
823         # copy config to queue so we can read it from traffic_runner process
824         self.config_queue.put(self._prox_config_data)
825         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
826
827     def build_config(self):
828         self.build_config_file()
829
830         options = self.scenario_helper.options
831         prox_args = options['prox_args']
832         tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
833
834         self.pipeline_kwargs = {
835             'tool_path': tool_path,
836             'tool_dir': os.path.dirname(tool_path),
837             'cfg_file': self.remote_path,
838             'args': ' '.join(' '.join([str(k), str(v) if v else ''])
839                              for k, v in prox_args.items())
840         }
841
842         cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
843                         "{args} -f {cfg_file} '")
844         return cmd_template.format(**self.pipeline_kwargs)
845
846
847 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
848 class ProxResourceHelper(ClientResourceHelper):
849
850     RESOURCE_WORD = 'prox'
851
852     PROX_MODE = ""
853
854     WAIT_TIME = 3
855
856     @staticmethod
857     def find_pci(pci, bound_pci):
858         # we have to substring match PCI bus address from the end
859         return any(b.endswith(pci) for b in bound_pci)
860
861     def __init__(self, setup_helper):
862         super(ProxResourceHelper, self).__init__(setup_helper)
863         self.mgmt_interface = self.vnfd_helper.mgmt_interface
864         self._user = self.mgmt_interface["user"]
865         self._ip = self.mgmt_interface["ip"]
866
867         self.done = False
868         self._vpci_to_if_name_map = None
869         self.additional_file = {}
870         self.remote_prox_file_name = None
871         self.lower = None
872         self.upper = None
873         self.step_delta = 1
874         self.step_time = 0.5
875         self._test_type = None
876
877     @property
878     def sut(self):
879         if not self.client:
880             self.client = self._connect()
881         return self.client
882
883     @property
884     def test_type(self):
885         if self._test_type is None:
886             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
887         return self._test_type
888
889     def run_traffic(self, traffic_profile):
890         self._queue.cancel_join_thread()
891         self.lower = 0.0
892         self.upper = 100.0
893
894         traffic_profile.init(self._queue)
895         # this frees up the run_traffic loop
896         self.client_started.value = 1
897
898         while not self._terminated.value:
899             # move it all to traffic_profile
900             self._run_traffic_once(traffic_profile)
901
902     def _run_traffic_once(self, traffic_profile):
903         traffic_profile.execute_traffic(self)
904         if traffic_profile.done:
905             self._queue.put({'done': True})
906             LOG.debug("tg_prox done")
907             self._terminated.value = 1
908
909     # For VNF use ResourceHelper method to collect KPIs directly.
910     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
911     def collect_collectd_kpi(self):
912         return self._collect_resource_kpi()
913
914     def collect_kpi(self):
915         result = super(ProxResourceHelper, self).collect_kpi()
916         # add in collectd kpis manually
917         if result:
918             result['collect_stats'] = self._collect_resource_kpi()
919         return result
920
921     def terminate(self):
922         # should not be called, use VNF terminate
923         raise NotImplementedError()
924
925     def up_post(self):
926         return self.sut  # force connection
927
928     def execute(self, cmd, *args, **kwargs):
929         func = getattr(self.sut, cmd, None)
930         if func:
931             return func(*args, **kwargs)
932         return None
933
934     def _connect(self, client=None):
935         """Run and connect to prox on the remote system """
936         # De-allocating a large amount of hugepages takes some time. If a new
937         # PROX instance is started immediately after killing the previous one,
938         # it might not be able to allocate hugepages, because they are still
939         # being freed. Hence the -w switch.
940         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
941         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
942         # -f ./handle_none-4.cfg"
943         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
944         #  "; " \
945         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
946         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
947         # sudo " \
948         #    + "./build/Prox " + prox_args
949         # log.debug("Starting PROX with command [%s]", prox_cmd)
950         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
951         # self._ip, prox_cmd))
952         if client is None:
953             client = ProxSocketHelper()
954
955         # try connecting to Prox for 60s
956         for _ in range(RETRY_SECONDS):
957             time.sleep(RETRY_INTERVAL)
958             try:
959                 client.connect(self._ip, PROX_PORT)
960             except (socket.gaierror, socket.error):
961                 continue
962             else:
963                 return client
964
965         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
966         raise Exception(msg.format(self._ip, PROX_PORT))
967
968
969 class ProxDataHelper(object):
970
971     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss, line_speed):
972         super(ProxDataHelper, self).__init__()
973         self.vnfd_helper = vnfd_helper
974         self.sut = sut
975         self.pkt_size = pkt_size
976         self.value = value
977         self.line_speed = line_speed
978         self.tolerated_loss = tolerated_loss
979         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
980         self.tsc_hz = None
981         self.measured_stats = None
982         self.latency = None
983         self._totals_and_pps = None
984         self.result_tuple = None
985
986     @property
987     def totals_and_pps(self):
988         if self._totals_and_pps is None:
989             rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
990             pps = self.value / 100.0 * self.line_rate_to_pps()
991             self._totals_and_pps = rx_total, tx_total, pps
992         return self._totals_and_pps
993
994     @property
995     def rx_total(self):
996         return self.totals_and_pps[0]
997
998     @property
999     def tx_total(self):
1000         return self.totals_and_pps[1]
1001
1002     @property
1003     def pps(self):
1004         return self.totals_and_pps[2]
1005
1006     @property
1007     def samples(self):
1008         samples = {}
1009         for port_name, port_num in self.vnfd_helper.ports_iter():
1010             try:
1011                 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1012                 samples[port_name] = {
1013                     "in_packets": port_rx_total,
1014                     "out_packets": port_tx_total,
1015                 }
1016             except (KeyError, TypeError, NameError, MemoryError, ValueError,
1017                     SystemError, BufferError):
1018                 samples[port_name] = {
1019                     "in_packets": 0,
1020                     "out_packets": 0,
1021                 }
1022         return samples
1023
1024     def __enter__(self):
1025         self.check_interface_count()
1026         return self
1027
1028     def __exit__(self, exc_type, exc_val, exc_tb):
1029         self.make_tuple()
1030
1031     def make_tuple(self):
1032         if self.result_tuple:
1033             return
1034
1035         self.result_tuple = ProxTestDataTuple(
1036             self.tolerated_loss,
1037             self.tsc_hz,
1038             self.measured_stats['delta'].rx,
1039             self.measured_stats['delta'].tx,
1040             self.measured_stats['delta'].tsc,
1041             self.latency,
1042             self.rx_total,
1043             self.tx_total,
1044             self.pps,
1045         )
1046         self.result_tuple.log_data()
1047
1048     @contextmanager
1049     def measure_tot_stats(self):
1050         with self.sut.measure_tot_stats() as self.measured_stats:
1051             yield
1052
1053     def check_interface_count(self):
1054         # do this assert in init?  unless we expect interface count to
1055         # change from one run to another run...
1056         assert self.port_count in {1, 2, 4}, \
1057             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1058
1059     def capture_tsc_hz(self):
1060         self.tsc_hz = float(self.sut.hz())
1061
1062     def line_rate_to_pps(self):
1063       return self.port_count * self.line_speed  / BITS_PER_BYTE / (self.pkt_size + 20)
1064
1065 class ProxProfileHelper(object):
1066
1067     __prox_profile_type__ = "Generic"
1068
1069     PROX_CORE_GEN_MODE = "gen"
1070     PROX_CORE_LAT_MODE = "lat"
1071
1072     @classmethod
1073     def get_cls(cls, helper_type):
1074         """Return class of specified type."""
1075         if not helper_type:
1076             return ProxProfileHelper
1077
1078         for profile_helper_class in utils.itersubclasses(cls):
1079             if helper_type == profile_helper_class.__prox_profile_type__:
1080                 return profile_helper_class
1081
1082         return ProxProfileHelper
1083
1084     @classmethod
1085     def make_profile_helper(cls, resource_helper):
1086         return cls.get_cls(resource_helper.test_type)(resource_helper)
1087
1088     def __init__(self, resource_helper):
1089         super(ProxProfileHelper, self).__init__()
1090         self.resource_helper = resource_helper
1091         self._cpu_topology = None
1092         self._test_cores = None
1093         self._latency_cores = None
1094
1095     @property
1096     def cpu_topology(self):
1097         if not self._cpu_topology:
1098             stdout = io.BytesIO()
1099             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1100             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1101         return self._cpu_topology
1102
1103     @property
1104     def test_cores(self):
1105         if not self._test_cores:
1106             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1107         return self._test_cores
1108
1109     @property
1110     def latency_cores(self):
1111         if not self._latency_cores:
1112             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1113         return self._latency_cores
1114
1115     @contextmanager
1116     def traffic_context(self, pkt_size, value):
1117         self.sut.stop_all()
1118         self.sut.reset_stats()
1119         try:
1120             self.sut.set_pkt_size(self.test_cores, pkt_size)
1121             self.sut.set_speed(self.test_cores, value)
1122             self.sut.start_all()
1123             yield
1124         finally:
1125             self.sut.stop_all()
1126
1127     def get_cores(self, mode):
1128         cores = []
1129
1130         for section_name, section in self.setup_helper.prox_config_data:
1131             if not section_name.startswith("core"):
1132                 continue
1133
1134             for key, value in section:
1135                 if key == "mode" and value == mode:
1136                     core_tuple = CoreSocketTuple(section_name)
1137                     core = core_tuple.core_id
1138                     cores.append(core)
1139
1140         return cores
1141
1142     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1143                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1144         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1145                                      value, tolerated_loss, line_speed)
1146
1147         with data_helper, self.traffic_context(pkt_size, value):
1148             with data_helper.measure_tot_stats():
1149                 time.sleep(duration)
1150                 # Getting statistics to calculate PPS at right speed....
1151                 data_helper.capture_tsc_hz()
1152                 data_helper.latency = self.get_latency()
1153
1154         return data_helper.result_tuple, data_helper.samples
1155
1156     def get_latency(self):
1157         """
1158         :return: return lat_min, lat_max, lat_avg
1159         :rtype: list
1160         """
1161
1162         if not self._latency_cores:
1163             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1164
1165         if self._latency_cores:
1166             return self.sut.lat_stats(self._latency_cores)
1167         return []
1168
1169     def terminate(self):
1170         pass
1171
1172     def __getattr__(self, item):
1173         return getattr(self.resource_helper, item)
1174
1175
1176 class ProxMplsProfileHelper(ProxProfileHelper):
1177
1178     __prox_profile_type__ = "MPLS tag/untag"
1179
1180     def __init__(self, resource_helper):
1181         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1182         self._cores_tuple = None
1183
1184     @property
1185     def mpls_cores(self):
1186         if not self._cores_tuple:
1187             self._cores_tuple = self.get_cores_mpls()
1188         return self._cores_tuple
1189
1190     @property
1191     def tagged_cores(self):
1192         return self.mpls_cores[0]
1193
1194     @property
1195     def plain_cores(self):
1196         return self.mpls_cores[1]
1197
1198     def get_cores_mpls(self):
1199         cores_tagged = []
1200         cores_plain = []
1201         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1202             if not section_name.startswith("core"):
1203                 continue
1204
1205             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1206                 continue
1207
1208             for item_key, item_value in section:
1209                 if item_key != 'name':
1210                     continue
1211
1212                 if item_value.startswith("tag"):
1213                     core_tuple = CoreSocketTuple(section_name)
1214                     core_tag = core_tuple.core_id
1215                     cores_tagged.append(core_tag)
1216
1217                 elif item_value.startswith("udp"):
1218                     core_tuple = CoreSocketTuple(section_name)
1219                     core_udp = core_tuple.core_id
1220                     cores_plain.append(core_udp)
1221
1222         return cores_tagged, cores_plain
1223
1224     @contextmanager
1225     def traffic_context(self, pkt_size, value):
1226         self.sut.stop_all()
1227         self.sut.reset_stats()
1228         try:
1229             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1230             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1231             self.sut.set_speed(self.tagged_cores, value)
1232             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1233             self.sut.set_speed(self.plain_cores, value * ratio)
1234             self.sut.start_all()
1235             yield
1236         finally:
1237             self.sut.stop_all()
1238
1239
1240 class ProxBngProfileHelper(ProxProfileHelper):
1241
1242     __prox_profile_type__ = "BNG gen"
1243
1244     def __init__(self, resource_helper):
1245         super(ProxBngProfileHelper, self).__init__(resource_helper)
1246         self._cores_tuple = None
1247
1248     @property
1249     def bng_cores(self):
1250         if not self._cores_tuple:
1251             self._cores_tuple = self.get_cores_gen_bng_qos()
1252         return self._cores_tuple
1253
1254     @property
1255     def cpe_cores(self):
1256         return self.bng_cores[0]
1257
1258     @property
1259     def inet_cores(self):
1260         return self.bng_cores[1]
1261
1262     @property
1263     def arp_cores(self):
1264         return self.bng_cores[2]
1265
1266     @property
1267     def arp_task_cores(self):
1268         return self.bng_cores[3]
1269
1270     @property
1271     def all_rx_cores(self):
1272         return self.latency_cores
1273
1274     def get_cores_gen_bng_qos(self):
1275         cpe_cores = []
1276         inet_cores = []
1277         arp_cores = []
1278         arp_tasks_core = [0]
1279         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1280             if not section_name.startswith("core"):
1281                 continue
1282
1283             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1284                 continue
1285
1286             for item_key, item_value in section:
1287                 if item_key != 'name':
1288                     continue
1289
1290                 if item_value.startswith("cpe"):
1291                     core_tuple = CoreSocketTuple(section_name)
1292                     cpe_core = core_tuple.core_id
1293                     cpe_cores.append(cpe_core)
1294
1295                 elif item_value.startswith("inet"):
1296                     core_tuple = CoreSocketTuple(section_name)
1297                     inet_core = core_tuple.core_id
1298                     inet_cores.append(inet_core)
1299
1300                 elif item_value.startswith("arp"):
1301                     core_tuple = CoreSocketTuple(section_name)
1302                     arp_core = core_tuple.core_id
1303                     arp_cores.append(arp_core)
1304
1305                 # We check the tasks/core separately
1306                 if item_value.startswith("arp_task"):
1307                     core_tuple = CoreSocketTuple(section_name)
1308                     arp_task_core = core_tuple.core_id
1309                     arp_tasks_core.append(arp_task_core)
1310
1311         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1312
1313     @contextmanager
1314     def traffic_context(self, pkt_size, value):
1315         # Tester is sending packets at the required speed already after
1316         # setup_test(). Just get the current statistics, sleep the required
1317         # amount of time and calculate packet loss.
1318         inet_pkt_size = pkt_size
1319         cpe_pkt_size = pkt_size - 24
1320         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1321
1322         curr_up_speed = curr_down_speed = 0
1323         max_up_speed = max_down_speed = value
1324         if ratio < 1:
1325             max_down_speed = value * ratio
1326         else:
1327             max_up_speed = value / ratio
1328
1329         # Initialize cores
1330         self.sut.stop_all()
1331         time.sleep(0.5)
1332
1333         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1334         # wrong.
1335         self.sut.start(self.all_rx_cores)
1336         time.sleep(0.5)
1337         self.sut.stop(self.all_rx_cores)
1338         time.sleep(0.5)
1339         self.sut.reset_stats()
1340
1341         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1342         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1343
1344         self.sut.reset_values(self.cpe_cores)
1345         self.sut.reset_values(self.inet_cores)
1346
1347         # Set correct IP and UDP lengths in packet headers
1348         # CPE
1349         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1350         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1351         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1352         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1353
1354         # INET
1355         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1356         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1357         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1358         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1359         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1360         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1361
1362         # Sending ARP to initialize tables - need a few seconds of generation
1363         # to make sure all CPEs are initialized
1364         LOG.info("Initializing SUT: sending ARP packets")
1365         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1366         self.sut.set_speed(self.inet_cores, curr_up_speed)
1367         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1368         self.sut.start(self.arp_cores)
1369         time.sleep(4)
1370
1371         # Ramp up the transmission speed. First go to the common speed, then
1372         # increase steps for the faster one.
1373         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1374
1375         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1376
1377         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1378             # The min(..., ...) takes care of 1) floating point rounding errors
1379             # that could make curr_*_speed to be slightly greater than
1380             # max_*_speed and 2) max_*_speed not being an exact multiple of
1381             # self._step_delta.
1382             if curr_up_speed < max_up_speed:
1383                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1384             if curr_down_speed < max_down_speed:
1385                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1386
1387             self.sut.set_speed(self.inet_cores, curr_up_speed)
1388             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1389             time.sleep(self.step_time)
1390
1391         LOG.info("Target speeds reached. Starting real test.")
1392
1393         yield
1394
1395         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1396         LOG.info("Test ended. Flushing NIC buffers")
1397         self.sut.start(self.all_rx_cores)
1398         time.sleep(3)
1399         self.sut.stop(self.all_rx_cores)
1400
1401     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1402                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1403         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1404                                      value, tolerated_loss, line_speed)
1405
1406         with data_helper, self.traffic_context(pkt_size, value):
1407             with data_helper.measure_tot_stats():
1408                 time.sleep(duration)
1409                 # Getting statistics to calculate PPS at right speed....
1410                 data_helper.capture_tsc_hz()
1411                 data_helper.latency = self.get_latency()
1412
1413         return data_helper.result_tuple, data_helper.samples
1414
1415
1416 class ProxVpeProfileHelper(ProxProfileHelper):
1417
1418     __prox_profile_type__ = "vPE gen"
1419
1420     def __init__(self, resource_helper):
1421         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1422         self._cores_tuple = None
1423         self._ports_tuple = None
1424
1425     @property
1426     def vpe_cores(self):
1427         if not self._cores_tuple:
1428             self._cores_tuple = self.get_cores_gen_vpe()
1429         return self._cores_tuple
1430
1431     @property
1432     def cpe_cores(self):
1433         return self.vpe_cores[0]
1434
1435     @property
1436     def inet_cores(self):
1437         return self.vpe_cores[1]
1438
1439     @property
1440     def all_rx_cores(self):
1441         return self.latency_cores
1442
1443     @property
1444     def vpe_ports(self):
1445         if not self._ports_tuple:
1446             self._ports_tuple = self.get_ports_gen_vpe()
1447         return self._ports_tuple
1448
1449     @property
1450     def cpe_ports(self):
1451         return self.vpe_ports[0]
1452
1453     @property
1454     def inet_ports(self):
1455         return self.vpe_ports[1]
1456
1457     def get_cores_gen_vpe(self):
1458         cpe_cores = []
1459         inet_cores = []
1460         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1461             if not section_name.startswith("core"):
1462                 continue
1463
1464             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1465                 continue
1466
1467             for item_key, item_value in section:
1468                 if item_key != 'name':
1469                     continue
1470
1471                 if item_value.startswith("cpe"):
1472                     core_tuple = CoreSocketTuple(section_name)
1473                     core_tag = core_tuple.core_id
1474                     cpe_cores.append(core_tag)
1475
1476                 elif item_value.startswith("inet"):
1477                     core_tuple = CoreSocketTuple(section_name)
1478                     inet_core = core_tuple.core_id
1479                     inet_cores.append(inet_core)
1480
1481         return cpe_cores, inet_cores
1482
1483     def get_ports_gen_vpe(self):
1484         cpe_ports = []
1485         inet_ports = []
1486
1487         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1488             if not section_name.startswith("port"):
1489                 continue
1490             tx_port_iter = re.finditer(r'\d+', section_name)
1491             tx_port_no = int(next(tx_port_iter).group(0))
1492
1493             for item_key, item_value in section:
1494                 if item_key != 'name':
1495                     continue
1496
1497                 if item_value.startswith("cpe"):
1498                     cpe_ports.append(tx_port_no)
1499
1500                 elif item_value.startswith("inet"):
1501                     inet_ports.append(tx_port_no)
1502
1503         return cpe_ports, inet_ports
1504
1505     @contextmanager
1506     def traffic_context(self, pkt_size, value):
1507         # Calculate the target upload and download speed. The upload and
1508         # download packets have different packet sizes, so in order to get
1509         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1510         # of the packet sizes.
1511         cpe_pkt_size = pkt_size
1512         inet_pkt_size = pkt_size - 4
1513         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1514
1515         curr_up_speed = curr_down_speed = 0
1516         max_up_speed = max_down_speed = value
1517         if ratio < 1:
1518             max_down_speed = value * ratio
1519         else:
1520             max_up_speed = value / ratio
1521
1522         # Adjust speed when multiple cores per port are used to generate traffic
1523         if len(self.cpe_ports) != len(self.cpe_cores):
1524             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1525         if len(self.inet_ports) != len(self.inet_cores):
1526             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1527
1528         # Initialize cores
1529         self.sut.stop_all()
1530         time.sleep(2)
1531
1532         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1533         # wrong.
1534         self.sut.start(self.all_rx_cores)
1535         time.sleep(2)
1536         self.sut.stop(self.all_rx_cores)
1537         time.sleep(2)
1538         self.sut.reset_stats()
1539
1540         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1541         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1542
1543         self.sut.reset_values(self.cpe_cores)
1544         self.sut.reset_values(self.inet_cores)
1545
1546         # Set correct IP and UDP lengths in packet headers
1547         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1548         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1549         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1550         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1551
1552         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1553         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1554         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1555         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1556
1557         self.sut.set_speed(self.inet_cores, curr_up_speed)
1558         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1559
1560         # Ramp up the transmission speed. First go to the common speed, then
1561         # increase steps for the faster one.
1562         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1563
1564         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1565
1566         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1567             # The min(..., ...) takes care of 1) floating point rounding errors
1568             # that could make curr_*_speed to be slightly greater than
1569             # max_*_speed and 2) max_*_speed not being an exact multiple of
1570             # self._step_delta.
1571             if curr_up_speed < max_up_speed:
1572                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1573             if curr_down_speed < max_down_speed:
1574                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1575
1576             self.sut.set_speed(self.inet_cores, curr_up_speed)
1577             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1578             time.sleep(self.step_time)
1579
1580         LOG.info("Target speeds reached. Starting real test.")
1581
1582         yield
1583
1584         self.sut.stop(self.cpe_cores + self.inet_cores)
1585         LOG.info("Test ended. Flushing NIC buffers")
1586         self.sut.start(self.all_rx_cores)
1587         time.sleep(3)
1588         self.sut.stop(self.all_rx_cores)
1589
1590     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1591                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1592         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1593                                      value, tolerated_loss, line_speed)
1594
1595         with data_helper, self.traffic_context(pkt_size, value):
1596             with data_helper.measure_tot_stats():
1597                 time.sleep(duration)
1598                 # Getting statistics to calculate PPS at right speed....
1599                 data_helper.capture_tsc_hz()
1600                 data_helper.latency = self.get_latency()
1601
1602         return data_helper.result_tuple, data_helper.samples
1603
1604
1605 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1606
1607     __prox_profile_type__ = "lwAFTR gen"
1608
1609     def __init__(self, resource_helper):
1610         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1611         self._cores_tuple = None
1612         self._ports_tuple = None
1613         self.step_delta = 5
1614         self.step_time = 0.5
1615
1616     @property
1617     def _lwaftr_cores(self):
1618         if not self._cores_tuple:
1619             self._cores_tuple = self._get_cores_gen_lwaftr()
1620         return self._cores_tuple
1621
1622     @property
1623     def tun_cores(self):
1624         return self._lwaftr_cores[0]
1625
1626     @property
1627     def inet_cores(self):
1628         return self._lwaftr_cores[1]
1629
1630     @property
1631     def _lwaftr_ports(self):
1632         if not self._ports_tuple:
1633             self._ports_tuple = self._get_ports_gen_lw_aftr()
1634         return self._ports_tuple
1635
1636     @property
1637     def tun_ports(self):
1638         return self._lwaftr_ports[0]
1639
1640     @property
1641     def inet_ports(self):
1642         return self._lwaftr_ports[1]
1643
1644     @property
1645     def all_rx_cores(self):
1646         return self.latency_cores
1647
1648     def _get_cores_gen_lwaftr(self):
1649         tun_cores = []
1650         inet_cores = []
1651         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1652             if not section_name.startswith("core"):
1653                 continue
1654
1655             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1656                 continue
1657
1658             core_tuple = CoreSocketTuple(section_name)
1659             core_tag = core_tuple.core_id
1660             for item_value in (v for k, v in section if k == 'name'):
1661                 if item_value.startswith('tun'):
1662                     tun_cores.append(core_tag)
1663                 elif item_value.startswith('inet'):
1664                     inet_cores.append(core_tag)
1665
1666         return tun_cores, inet_cores
1667
1668     def _get_ports_gen_lw_aftr(self):
1669         tun_ports = []
1670         inet_ports = []
1671
1672         re_port = re.compile(r'port (\d+)')
1673         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1674             match = re_port.search(section_name)
1675             if not match:
1676                 continue
1677
1678             tx_port_no = int(match.group(1))
1679             for item_value in (v for k, v in section if k == 'name'):
1680                 if item_value.startswith('lwB4'):
1681                     tun_ports.append(tx_port_no)
1682                 elif item_value.startswith('inet'):
1683                     inet_ports.append(tx_port_no)
1684
1685         return tun_ports, inet_ports
1686
1687     @staticmethod
1688     def _resize(len1, len2):
1689         if len1 == len2:
1690             return 1.0
1691         return 1.0 * len1 / len2
1692
1693     @contextmanager
1694     def traffic_context(self, pkt_size, value):
1695         # Tester is sending packets at the required speed already after
1696         # setup_test(). Just get the current statistics, sleep the required
1697         # amount of time and calculate packet loss.
1698         tun_pkt_size = pkt_size
1699         inet_pkt_size = pkt_size - 40
1700         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1701
1702         curr_up_speed = curr_down_speed = 0
1703         max_up_speed = max_down_speed = value
1704
1705         max_up_speed = value / ratio
1706
1707         # Adjust speed when multiple cores per port are used to generate traffic
1708         if len(self.tun_ports) != len(self.tun_cores):
1709             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1710         if len(self.inet_ports) != len(self.inet_cores):
1711             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1712
1713         # Initialize cores
1714         self.sut.stop_all()
1715         time.sleep(0.5)
1716
1717         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1718         # wrong.
1719         self.sut.start(self.all_rx_cores)
1720         time.sleep(0.5)
1721         self.sut.stop(self.all_rx_cores)
1722         time.sleep(0.5)
1723         self.sut.reset_stats()
1724
1725         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1726         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1727
1728         self.sut.reset_values(self.tun_cores)
1729         self.sut.reset_values(self.inet_cores)
1730
1731         # Set correct IP and UDP lengths in packet headers
1732         # tun
1733         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1734         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1735         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1736         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1737         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1738         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1739
1740         # INET
1741         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1742         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1743         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1744         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1745
1746         LOG.info("Initializing SUT: sending lwAFTR packets")
1747         self.sut.set_speed(self.inet_cores, curr_up_speed)
1748         self.sut.set_speed(self.tun_cores, curr_down_speed)
1749         time.sleep(4)
1750
1751         # Ramp up the transmission speed. First go to the common speed, then
1752         # increase steps for the faster one.
1753         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1754
1755         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1756
1757         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1758             # The min(..., ...) takes care of 1) floating point rounding errors
1759             # that could make curr_*_speed to be slightly greater than
1760             # max_*_speed and 2) max_*_speed not being an exact multiple of
1761             # self._step_delta.
1762             if curr_up_speed < max_up_speed:
1763                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1764             if curr_down_speed < max_down_speed:
1765                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1766
1767             self.sut.set_speed(self.inet_cores, curr_up_speed)
1768             self.sut.set_speed(self.tun_cores, curr_down_speed)
1769             time.sleep(self.step_time)
1770
1771         LOG.info("Target speeds reached. Starting real test.")
1772
1773         yield
1774
1775         self.sut.stop(self.tun_cores + self.inet_cores)
1776         LOG.info("Test ended. Flushing NIC buffers")
1777         self.sut.start(self.all_rx_cores)
1778         time.sleep(3)
1779         self.sut.stop(self.all_rx_cores)
1780
1781     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1782                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1783         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1784                                      value, tolerated_loss, line_speed)
1785
1786         with data_helper, self.traffic_context(pkt_size, value):
1787             with data_helper.measure_tot_stats():
1788                 time.sleep(duration)
1789                 # Getting statistics to calculate PPS at right speed....
1790                 data_helper.capture_tsc_hz()
1791                 data_helper.latency = self.get_latency()
1792
1793         return data_helper.result_tuple, data_helper.samples