Merge "Make Sample VNF hugepages size configurable"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import array
16 import io
17 import logging
18 import operator
19 import os
20 import re
21 import select
22 import socket
23 import time
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
28
29 import six
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
32
33 from yardstick.common import utils
34 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
35 from yardstick.network_services.helpers.iniparser import ConfigParser
36 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
38 from yardstick.network_services import constants
39
40 PROX_PORT = 8474
41
42 SECTION_NAME = 0
43 SECTION_CONTENTS = 1
44
45 LOG = logging.getLogger(__name__)
46 LOG.setLevel(logging.DEBUG)
47
48 BITS_PER_BYTE = 8
49 RETRY_SECONDS = 60
50 RETRY_INTERVAL = 1
51
52 CONFIGURATION_OPTIONS = (
53     # dict key           section     key               default value
54     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
55     ('testDuration', 'general', 'test_duration', 5.0),
56     ('testPrecision', 'general', 'test_precision', 1.0),
57     ('tests', 'general', 'tests', None),
58     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
59
60     ('logFile', 'logging', 'file', 'dats.log'),
61     ('logDateFormat', 'logging', 'datefmt', None),
62     ('logLevel', 'logging', 'level', 'INFO'),
63     ('logOverwrite', 'logging', 'overwrite', 1),
64
65     ('testerIp', 'tester', 'ip', None),
66     ('testerUser', 'tester', 'user', 'root'),
67     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
68     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
69     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
70     ('testerSocketId', 'tester', 'socket_id', 0),
71
72     ('sutIp', 'sut', 'ip', None),
73     ('sutUser', 'sut', 'user', 'root'),
74     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
75     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
76     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
77     ('sutSocketId', 'sut', 'socket_id', 0),
78 )
79
80
81 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
82     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
83
84     def __new__(cls, *args):
85         try:
86             matches = cls.CORE_RE.search(str(args[0]))
87             if matches:
88                 args = matches.groups()
89
90             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
91                                                        'h' if args[2] else '')
92
93         except (AttributeError, TypeError, IndexError, ValueError):
94             raise ValueError('Invalid core spec {}'.format(args))
95
96     def is_hyperthread(self):
97         return self.hyperthread == 'h'
98
99     @property
100     def index(self):
101         return int(self.is_hyperthread())
102
103     def find_in_topology(self, cpu_topology):
104         try:
105             socket_core_match = cpu_topology[self.socket_id][self.core_id]
106             sorted_match = sorted(socket_core_match.values())
107             return sorted_match[self.index][0]
108         except (KeyError, IndexError):
109             template = "Core {}{} on socket {} does not exist"
110             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
111
112
113 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
114     def __new__(cls, *args):
115         try:
116             assert args[0] is not str(args[0])
117             args = tuple(args[0])
118         except (AssertionError, IndexError, TypeError):
119             pass
120
121         return super(TotStatsTuple, cls).__new__(cls, *args)
122
123
124 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
125                                                         'delta_tx,delta_tsc,'
126                                                         'latency,rx_total,tx_total,pps')):
127     @property
128     def pkt_loss(self):
129         try:
130             return 1e2 * self.drop_total / float(self.tx_total)
131         except ZeroDivisionError:
132             return 100.0
133
134     @property
135     def mpps(self):
136         # calculate the effective throughput in Mpps
137         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
138
139     @property
140     def can_be_lost(self):
141         return int(self.tx_total * self.tolerated / 1e2)
142
143     @property
144     def drop_total(self):
145         return self.tx_total - self.rx_total
146
147     @property
148     def success(self):
149         return self.drop_total <= self.can_be_lost
150
151     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
152         if pkt_loss is None:
153             pkt_loss = self.pkt_loss
154
155         if port_samples is None:
156             port_samples = {}
157
158         latency_keys = [
159             "LatencyMin",
160             "LatencyMax",
161             "LatencyAvg",
162         ]
163
164         samples = {
165             "Throughput": self.mpps,
166             "DropPackets": pkt_loss,
167             "CurrentDropPackets": pkt_loss,
168             "TxThroughput": self.pps / 1e6,
169             "RxThroughput": self.mpps,
170             "PktSize": pkt_size,
171         }
172         if port_samples:
173             samples.update(port_samples)
174
175         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
176         return samples
177
178     def log_data(self, logger=None):
179         if logger is None:
180             logger = LOG
181
182         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
183         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
184         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
185
186
187 class PacketDump(object):
188     @staticmethod
189     def assert_func(func, value1, value2, template=None):
190         assert func(value1, value2), template.format(value1, value2)
191
192     def __init__(self, port_id, data_len, payload):
193         template = "Packet dump has specified length {}, but payload is {} bytes long"
194         self.assert_func(operator.eq, data_len, len(payload), template)
195         self._port_id = port_id
196         self._data_len = data_len
197         self._payload = payload
198
199     @property
200     def port_id(self):
201         """Get the port id of the packet dump"""
202         return self._port_id
203
204     @property
205     def data_len(self):
206         """Get the length of the data received"""
207         return self._data_len
208
209     def __str__(self):
210         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
211
212     def payload(self, start=None, end=None):
213         """Get part of the payload as a list of ordinals.
214
215         Returns a list of byte values, matching the contents of the packet dump.
216         Optional start and end parameters can be specified to retrieve only a
217         part of the packet contents.
218
219         The number of elements in the list is equal to end - start + 1, so end
220         is the offset of the last character.
221
222         Args:
223             start (pos. int): the starting offset in the payload. If it is not
224                 specified or None, offset 0 is assumed.
225             end (pos. int): the ending offset of the payload. If it is not
226                 specified or None, the contents until the end of the packet are
227                 returned.
228
229         Returns:
230             [int, int, ...]. Each int represents the ordinal value of a byte in
231             the packet payload.
232         """
233         if start is None:
234             start = 0
235
236         if end is None:
237             end = self.data_len - 1
238
239         # Bounds checking on offsets
240         template = "Start offset must be non-negative"
241         self.assert_func(operator.ge, start, 0, template)
242
243         template = "End offset must be less than {1}"
244         self.assert_func(operator.lt, end, self.data_len, template)
245
246         # Adjust for splice operation: end offset must be 1 more than the offset
247         # of the last desired character.
248         end += 1
249
250         return self._payload[start:end]
251
252
253 class ProxSocketHelper(object):
254
255     def __init__(self, sock=None):
256         """ creates new prox instance """
257         super(ProxSocketHelper, self).__init__()
258
259         if sock is None:
260             sock = socket.socket()
261
262         self._sock = sock
263         self._pkt_dumps = []
264         self.master_stats = None
265
266     def connect(self, ip, port):
267         """Connect to the prox instance on the remote system"""
268         self._sock.connect((ip, port))
269
270     def get_socket(self):
271         """ get the socket connected to the remote instance """
272         return self._sock
273
274     def _parse_socket_data(self, decoded_data, pkt_dump_only):
275         def get_newline_index():
276             return decoded_data.find('\n', index)
277
278         ret_str = ''
279         index = 0
280         for newline_index in iter(get_newline_index, -1):
281             ret_str = decoded_data[index:newline_index]
282
283             try:
284                 mode, port_id, data_len = ret_str.split(',', 2)
285             except ValueError:
286                 mode, port_id, data_len = None, None, None
287
288             if mode != 'pktdump':
289                 # Regular 1-line message. Stop reading from the socket.
290                 LOG.debug("Regular response read")
291                 return ret_str
292
293             LOG.debug("Packet dump header read: [%s]", ret_str)
294
295             # The line is a packet dump header. Parse it, read the
296             # packet payload, store the dump for later retrieval.
297             # Skip over the packet dump and continue processing: a
298             # 1-line response may follow the packet dump.
299
300             data_len = int(data_len)
301             data_start = newline_index + 1  # + 1 to skip over \n
302             data_end = data_start + data_len
303             sub_data = decoded_data[data_start:data_end]
304             pkt_payload = array.array('B', (ord(v) for v in sub_data))
305             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
306             self._pkt_dumps.append(pkt_dump)
307
308             if pkt_dump_only:
309                 # Return boolean instead of string to signal
310                 # successful reception of the packet dump.
311                 LOG.debug("Packet dump stored, returning")
312                 return True
313
314             index = data_end + 1
315
316         return ret_str
317
318     def get_data(self, pkt_dump_only=False, timeout=1):
319         """ read data from the socket """
320
321         # This method behaves slightly differently depending on whether it is
322         # called to read the response to a command (pkt_dump_only = 0) or if
323         # it is called specifically to read a packet dump (pkt_dump_only = 1).
324         #
325         # Packet dumps look like:
326         #   pktdump,<port_id>,<data_len>\n
327         #   <packet contents as byte array>\n
328         # This means the total packet dump message consists of 2 lines instead
329         # of 1 line.
330         #
331         # - Response for a command (pkt_dump_only = 0):
332         #   1) Read response from the socket until \n (end of message)
333         #   2a) If the response is a packet dump header (starts with "pktdump,"):
334         #     - Read the packet payload and store the packet dump for later
335         #       retrieval.
336         #     - Reset the state and restart from 1). Eventually state 2b) will
337         #       be reached and the function will return.
338         #   2b) If the response is not a packet dump:
339         #     - Return the received message as a string
340         #
341         # - Explicit request to read a packet dump (pkt_dump_only = 1):
342         #   - Read the dump header and payload
343         #   - Store the packet dump for later retrieval
344         #   - Return True to signify a packet dump was successfully read
345
346         def is_ready():
347             # recv() is blocking, so avoid calling it when no data is waiting.
348             ready = select.select([self._sock], [], [], timeout)
349             return bool(ready[0])
350
351         status = False
352         ret_str = ""
353         for status in iter(is_ready, False):
354             decoded_data = self._sock.recv(256).decode('utf-8')
355             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
356
357         LOG.debug("Received data from socket: [%s]", ret_str)
358         return ret_str if status else ''
359
360     def put_command(self, to_send):
361         """ send data to the remote instance """
362         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
363         try:
364             # NOTE: sendall will block, we need a timeout
365             self._sock.sendall(to_send.encode('utf-8'))
366         except:  # pylint: disable=bare-except
367             pass
368
369     def get_packet_dump(self):
370         """ get the next packet dump """
371         if self._pkt_dumps:
372             return self._pkt_dumps.pop(0)
373         return None
374
375     def stop_all_reset(self):
376         """ stop the remote instance and reset stats """
377         LOG.debug("Stop all and reset stats")
378         self.stop_all()
379         self.reset_stats()
380
381     def stop_all(self):
382         """ stop all cores on the remote instance """
383         LOG.debug("Stop all")
384         self.put_command("stop all\n")
385         time.sleep(3)
386
387     def stop(self, cores, task=''):
388         """ stop specific cores on the remote instance """
389         LOG.debug("Stopping cores %s", cores)
390         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
391         time.sleep(3)
392
393     def start_all(self):
394         """ start all cores on the remote instance """
395         LOG.debug("Start all")
396         self.put_command("start all\n")
397
398     def start(self, cores):
399         """ start specific cores on the remote instance """
400         LOG.debug("Starting cores %s", cores)
401         self.put_command("start {}\n".format(join_non_strings(',', cores)))
402         time.sleep(3)
403
404     def reset_stats(self):
405         """ reset the statistics on the remote instance """
406         LOG.debug("Reset stats")
407         self.put_command("reset stats\n")
408         time.sleep(1)
409
410     def _run_template_over_cores(self, template, cores, *args):
411         for core in cores:
412             self.put_command(template.format(core, *args))
413
414     def set_pkt_size(self, cores, pkt_size):
415         """ set the packet size to generate on the remote instance """
416         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
417         pkt_size -= 4
418         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
419         time.sleep(1)
420
421     def set_value(self, cores, offset, value, length):
422         """ set value on the remote instance """
423         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
424         LOG.debug(msg, cores, value, length, offset)
425         template = "set value {} 0 {} {} {}\n"
426         self._run_template_over_cores(template, cores, offset, value, length)
427
428     def reset_values(self, cores):
429         """ reset values on the remote instance """
430         LOG.debug("Set value for core(s) %s", cores)
431         self._run_template_over_cores("reset values {} 0\n", cores)
432
433     def set_speed(self, cores, speed, tasks=None):
434         """ set speed on the remote instance """
435         if tasks is None:
436             tasks = [0] * len(cores)
437         elif len(tasks) != len(cores):
438             LOG.error("set_speed: cores and tasks must have the same len")
439         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
440         for (core, task) in list(zip(cores, tasks)):
441             self.put_command("speed {} {} {}\n".format(core, task, speed))
442
443     def slope_speed(self, cores_speed, duration, n_steps=0):
444         """will start to increase speed from 0 to N where N is taken from
445         a['speed'] for each a in cores_speed"""
446         # by default, each step will take 0.5 sec
447         if n_steps == 0:
448             n_steps = duration * 2
449
450         private_core_data = []
451         step_duration = float(duration) / n_steps
452         for core_data in cores_speed:
453             target = float(core_data['speed'])
454             private_core_data.append({
455                 'cores': core_data['cores'],
456                 'zero': 0,
457                 'delta': target / n_steps,
458                 'current': 0,
459                 'speed': target,
460             })
461
462         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
463         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
464             time.sleep(step_duration)
465             for core_data in private_core_data:
466                 core_data['current'] = core_data[key1] + core_data[key2]
467                 self.set_speed(core_data['cores'], core_data['current'])
468
469     def set_pps(self, cores, pps, pkt_size,
470                 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
471         """ set packets per second for specific cores on the remote instance """
472         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
473         LOG.debug(msg, cores, pps, pkt_size)
474
475         # speed in percent of line-rate
476         speed = float(pps) * (pkt_size + 20) / line_speed / BITS_PER_BYTE
477         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
478
479     def lat_stats(self, cores, task=0):
480         """Get the latency statistics from the remote system"""
481         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
482         lat_min = {}
483         lat_max = {}
484         lat_avg = {}
485         for core in cores:
486             self.put_command("lat stats {} {} \n".format(core, task))
487             ret = self.get_data()
488
489             try:
490                 lat_min[core], lat_max[core], lat_avg[core] = \
491                     tuple(int(n) for n in ret.split(",")[:3])
492
493             except (AttributeError, ValueError, TypeError):
494                 pass
495
496         return lat_min, lat_max, lat_avg
497
498     def get_all_tot_stats(self):
499         self.put_command("tot stats\n")
500         all_stats_str = self.get_data().split(",")
501         if len(all_stats_str) != 4:
502             all_stats = [0] * 4
503             return all_stats
504         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
505         self.master_stats = all_stats
506         return all_stats
507
508     def hz(self):
509         return self.get_all_tot_stats()[3]
510
511     def core_stats(self, cores, task=0):
512         """Get the receive statistics from the remote system"""
513         rx = tx = drop = tsc = 0
514         for core in cores:
515             self.put_command("core stats {} {}\n".format(core, task))
516             ret = self.get_data().split(",")
517             rx += int(ret[0])
518             tx += int(ret[1])
519             drop += int(ret[2])
520             tsc = int(ret[3])
521         return rx, tx, drop, tsc
522
523     def port_stats(self, ports):
524         """get counter values from a specific port"""
525         tot_result = [0] * 12
526         for port in ports:
527             self.put_command("port_stats {}\n".format(port))
528             ret = [try_int(s, 0) for s in self.get_data().split(",")]
529             tot_result = [sum(x) for x in zip(tot_result, ret)]
530         return tot_result
531
532     @contextmanager
533     def measure_tot_stats(self):
534         start = self.get_all_tot_stats()
535         container = {'start_tot': start}
536         try:
537             yield container
538         finally:
539             container['end_tot'] = end = self.get_all_tot_stats()
540
541         container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
542
543     def tot_stats(self):
544         """Get the total statistics from the remote system"""
545         stats = self.get_all_tot_stats()
546         return stats[:3]
547
548     def tot_ierrors(self):
549         """Get the total ierrors from the remote system"""
550         self.put_command("tot ierrors tot\n")
551         recv = self.get_data().split(',')
552         tot_ierrors = int(recv[0])
553         tsc = int(recv[0])
554         return tot_ierrors, tsc
555
556     def set_count(self, count, cores):
557         """Set the number of packets to send on the specified core"""
558         self._run_template_over_cores("count {} 0 {}\n", cores, count)
559
560     def dump_rx(self, core_id, task_id=0, count=1):
561         """Activate dump on rx on the specified core"""
562         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
563         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
564         time.sleep(1.5)  # Give PROX time to set up packet dumping
565
566     def quit(self):
567         self.stop_all()
568         self._quit()
569         self.force_quit()
570
571     def _quit(self):
572         """ stop all cores on the remote instance """
573         LOG.debug("Quit prox")
574         self.put_command("quit\n")
575         time.sleep(3)
576
577     def force_quit(self):
578         """ stop all cores on the remote instance """
579         LOG.debug("Force Quit prox")
580         self.put_command("quit_force\n")
581         time.sleep(3)
582
583
584 _LOCAL_OBJECT = object()
585
586
587 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
588     # the actual app is lowercase
589     APP_NAME = 'prox'
590     # not used for Prox but added for consistency
591     VNF_TYPE = "PROX"
592
593     LUA_PARAMETER_NAME = ""
594     LUA_PARAMETER_PEER = {
595         "gen": "sut",
596         "sut": "gen",
597     }
598
599     CONFIG_QUEUE_TIMEOUT = 120
600
601     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
602         self.remote_path = None
603         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
604         self.remote_prox_file_name = None
605         self._prox_config_data = None
606         self.additional_files = {}
607         self.config_queue = Queue()
608         # allow_exit_without_flush
609         self.config_queue.cancel_join_thread()
610         self._global_section = None
611
612     @property
613     def prox_config_data(self):
614         if self._prox_config_data is None:
615             # this will block, but it needs too
616             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
617         return self._prox_config_data
618
619     @property
620     def global_section(self):
621         if self._global_section is None and self.prox_config_data:
622             self._global_section = self.find_section("global")
623         return self._global_section
624
625     def find_section(self, name, default=_LOCAL_OBJECT):
626         result = next((value for key, value in self.prox_config_data if key == name), default)
627         if result is _LOCAL_OBJECT:
628             raise KeyError('{} not found in Prox config'.format(name))
629         return result
630
631     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
632         section = self.find_section(section_name, [])
633         result = next((value for key, value in section if key == section_key), default)
634         if result is _LOCAL_OBJECT:
635             template = '{} not found in {} section of Prox config'
636             raise KeyError(template.format(section_key, section_name))
637         return result
638
639     def copy_to_target(self, config_file_path, prox_file):
640         remote_path = os.path.join("/tmp", prox_file)
641         self.ssh_helper.put(config_file_path, remote_path)
642         return remote_path
643
644     @staticmethod
645     def _get_tx_port(section, sections):
646         iface_port = [-1]
647         for item in sections[section]:
648             if item[0] == "tx port":
649                 iface_port = re.findall(r'\d+', item[1])
650                 # do we want the last one?
651                 #   if yes, then can we reverse?
652         return int(iface_port[0])
653
654     @staticmethod
655     def _replace_quoted_with_value(quoted, value, count=1):
656         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
657         return new_string
658
659     def _insert_additional_file(self, value):
660         file_str = value.split('"')
661         base_name = os.path.basename(file_str[1])
662         file_str[1] = self.additional_files[base_name]
663         return '"'.join(file_str)
664
665     def generate_prox_config_file(self, config_path):
666         sections = []
667         prox_config = ConfigParser(config_path, sections)
668         prox_config.parse()
669
670         # Ensure MAC is set "hardware"
671         all_ports = self.vnfd_helper.port_pairs.all_ports
672         # use dpdk port number
673         for port_name in all_ports:
674             port_num = self.vnfd_helper.port_num(port_name)
675             port_section_name = "port {}".format(port_num)
676             for section_name, section in sections:
677                 if port_section_name != section_name:
678                     continue
679
680                 for section_data in section:
681                     if section_data[0] == "mac":
682                         section_data[1] = "hardware"
683
684         # search for dst mac
685         for _, section in sections:
686             for section_data in section:
687                 item_key, item_val = section_data
688                 if item_val.startswith("@@dst_mac"):
689                     tx_port_iter = re.finditer(r'\d+', item_val)
690                     tx_port_no = int(next(tx_port_iter).group(0))
691                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
692                     mac = intf["virtual-interface"]["dst_mac"]
693                     section_data[1] = mac.replace(":", " ", 6)
694
695                 if item_key == "dst mac" and item_val.startswith("@@"):
696                     tx_port_iter = re.finditer(r'\d+', item_val)
697                     tx_port_no = int(next(tx_port_iter).group(0))
698                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
699                     mac = intf["virtual-interface"]["dst_mac"]
700                     section_data[1] = mac
701
702                 if item_val.startswith("@@src_mac"):
703                     tx_port_iter = re.finditer(r'\d+', item_val)
704                     tx_port_no = int(next(tx_port_iter).group(0))
705                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
706                     mac = intf["virtual-interface"]["local_mac"]
707                     section_data[1] = mac.replace(":", " ", 6)
708
709                 if item_key == "src mac" and item_val.startswith("@@"):
710                     tx_port_iter = re.finditer(r'\d+', item_val)
711                     tx_port_no = int(next(tx_port_iter).group(0))
712                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
713                     mac = intf["virtual-interface"]["local_mac"]
714                     section_data[1] = mac
715
716         # if addition file specified in prox config
717         if not self.additional_files:
718             return sections
719
720         for section_name, section in sections:
721             for section_data in section:
722                 try:
723                     if section_data[0].startswith("dofile"):
724                         section_data[0] = self._insert_additional_file(section_data[0])
725
726                     if section_data[1].startswith("dofile"):
727                         section_data[1] = self._insert_additional_file(section_data[1])
728                 except:  # pylint: disable=bare-except
729                     pass
730
731         return sections
732
733     @staticmethod
734     def write_prox_lua(lua_config):
735         """
736         Write an .ini-format config file for PROX (parameters.lua)
737         PROX does not allow a space before/after the =, so we need
738         a custom method
739         """
740         out = []
741         for key in lua_config:
742             value = '"' + lua_config[key] + '"'
743             if key == "__name__":
744                 continue
745             if value is not None and value != '@':
746                 key = "=".join((key, str(value).replace('\n', '\n\t')))
747                 out.append(key)
748             else:
749                 key = str(key).replace('\n', '\n\t')
750                 out.append(key)
751         return os.linesep.join(out)
752
753     @staticmethod
754     def write_prox_config(prox_config):
755         """
756         Write an .ini-format config file for PROX
757         PROX does not allow a space before/after the =, so we need
758         a custom method
759         """
760         out = []
761         for (section_name, section) in prox_config:
762             out.append("[{}]".format(section_name))
763             for item in section:
764                 key, value = item
765                 if key == "__name__":
766                     continue
767                 if value is not None and value != '@':
768                     key = "=".join((key, str(value).replace('\n', '\n\t')))
769                     out.append(key)
770                 else:
771                     key = str(key).replace('\n', '\n\t')
772                     out.append(key)
773         return os.linesep.join(out)
774
775     def put_string_to_file(self, s, remote_path):
776         file_obj = cStringIO(s)
777         self.ssh_helper.put_file_obj(file_obj, remote_path)
778         return remote_path
779
780     def generate_prox_lua_file(self):
781         p = OrderedDict()
782         all_ports = self.vnfd_helper.port_pairs.all_ports
783         for port_name in all_ports:
784             port_num = self.vnfd_helper.port_num(port_name)
785             intf = self.vnfd_helper.find_interface(name=port_name)
786             vintf = intf['virtual-interface']
787             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
788             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
789
790         return p
791
792     def upload_prox_lua(self, config_file, lua_data):
793         # prox can't handle spaces around ' = ' so use custom method
794         out = StringIO(self.write_prox_lua(lua_data))
795         out.seek(0)
796         remote_path = os.path.join("/tmp", config_file)
797         self.ssh_helper.put_file_obj(out, remote_path)
798
799         return remote_path
800
801     def upload_prox_config(self, config_file, prox_config_data):
802         # prox can't handle spaces around ' = ' so use custom method
803         out = StringIO(self.write_prox_config(prox_config_data))
804         out.seek(0)
805         remote_path = os.path.join("/tmp", config_file)
806         self.ssh_helper.put_file_obj(out, remote_path)
807
808         return remote_path
809
810     def build_config_file(self):
811         task_path = self.scenario_helper.task_path
812         options = self.scenario_helper.options
813         config_path = options['prox_config']
814         config_file = os.path.basename(config_path)
815         config_path = utils.find_relative_file(config_path, task_path)
816         self.additional_files = {}
817
818         try:
819             if options['prox_generate_parameter']:
820                 self.lua = []
821                 self.lua = self.generate_prox_lua_file()
822                 if len(self.lua) > 0:
823                     self.upload_prox_lua("parameters.lua", self.lua)
824         except:  # pylint: disable=bare-except
825             pass
826
827         prox_files = options.get('prox_files', [])
828         if isinstance(prox_files, six.string_types):
829             prox_files = [prox_files]
830         for key_prox_file in prox_files:
831             base_prox_file = os.path.basename(key_prox_file)
832             key_prox_path = utils.find_relative_file(key_prox_file, task_path)
833             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
834             self.additional_files[base_prox_file] = remote_prox_file
835
836         self._prox_config_data = self.generate_prox_config_file(config_path)
837         # copy config to queue so we can read it from traffic_runner process
838         self.config_queue.put(self._prox_config_data)
839         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
840
841     def build_config(self):
842         self.build_config_file()
843
844         options = self.scenario_helper.options
845         prox_args = options['prox_args']
846         tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
847
848         self.pipeline_kwargs = {
849             'tool_path': tool_path,
850             'tool_dir': os.path.dirname(tool_path),
851             'cfg_file': self.remote_path,
852             'args': ' '.join(' '.join([str(k), str(v) if v else ''])
853                              for k, v in prox_args.items())
854         }
855
856         cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
857                         "{args} -f {cfg_file} '")
858         return cmd_template.format(**self.pipeline_kwargs)
859
860
861 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
862 class ProxResourceHelper(ClientResourceHelper):
863
864     RESOURCE_WORD = 'prox'
865
866     PROX_MODE = ""
867
868     WAIT_TIME = 3
869
870     @staticmethod
871     def find_pci(pci, bound_pci):
872         # we have to substring match PCI bus address from the end
873         return any(b.endswith(pci) for b in bound_pci)
874
875     def __init__(self, setup_helper):
876         super(ProxResourceHelper, self).__init__(setup_helper)
877         self.mgmt_interface = self.vnfd_helper.mgmt_interface
878         self._user = self.mgmt_interface["user"]
879         self._ip = self.mgmt_interface["ip"]
880
881         self.done = False
882         self._vpci_to_if_name_map = None
883         self.additional_file = {}
884         self.remote_prox_file_name = None
885         self.lower = None
886         self.upper = None
887         self.step_delta = 1
888         self.step_time = 0.5
889         self._test_type = None
890
891     @property
892     def sut(self):
893         if not self.client:
894             self.client = self._connect()
895         return self.client
896
897     @property
898     def test_type(self):
899         if self._test_type is None:
900             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
901         return self._test_type
902
903     def run_traffic(self, traffic_profile):
904         self._queue.cancel_join_thread()
905         self.lower = 0.0
906         self.upper = 100.0
907
908         traffic_profile.init(self._queue)
909         # this frees up the run_traffic loop
910         self.client_started.value = 1
911
912         while not self._terminated.value:
913             # move it all to traffic_profile
914             self._run_traffic_once(traffic_profile)
915
916     def _run_traffic_once(self, traffic_profile):
917         traffic_profile.execute_traffic(self)
918         if traffic_profile.done:
919             self._queue.put({'done': True})
920             LOG.debug("tg_prox done")
921             self._terminated.value = 1
922
923     # For VNF use ResourceHelper method to collect KPIs directly.
924     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
925     def collect_collectd_kpi(self):
926         return self._collect_resource_kpi()
927
928     def collect_kpi(self):
929         result = super(ProxResourceHelper, self).collect_kpi()
930         # add in collectd kpis manually
931         if result:
932             result['collect_stats'] = self._collect_resource_kpi()
933         return result
934
935     def terminate(self):
936         # should not be called, use VNF terminate
937         raise NotImplementedError()
938
939     def up_post(self):
940         return self.sut  # force connection
941
942     def execute(self, cmd, *args, **kwargs):
943         func = getattr(self.sut, cmd, None)
944         if func:
945             return func(*args, **kwargs)
946         return None
947
948     def _connect(self, client=None):
949         """Run and connect to prox on the remote system """
950         # De-allocating a large amount of hugepages takes some time. If a new
951         # PROX instance is started immediately after killing the previous one,
952         # it might not be able to allocate hugepages, because they are still
953         # being freed. Hence the -w switch.
954         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
955         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
956         # -f ./handle_none-4.cfg"
957         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
958         #  "; " \
959         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
960         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
961         # sudo " \
962         #    + "./build/Prox " + prox_args
963         # log.debug("Starting PROX with command [%s]", prox_cmd)
964         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
965         # self._ip, prox_cmd))
966         if client is None:
967             client = ProxSocketHelper()
968
969         # try connecting to Prox for 60s
970         for _ in range(RETRY_SECONDS):
971             time.sleep(RETRY_INTERVAL)
972             try:
973                 client.connect(self._ip, PROX_PORT)
974             except (socket.gaierror, socket.error):
975                 continue
976             else:
977                 return client
978
979         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
980         raise Exception(msg.format(self._ip, PROX_PORT))
981
982
983 class ProxDataHelper(object):
984
985     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss, line_speed):
986         super(ProxDataHelper, self).__init__()
987         self.vnfd_helper = vnfd_helper
988         self.sut = sut
989         self.pkt_size = pkt_size
990         self.value = value
991         self.line_speed = line_speed
992         self.tolerated_loss = tolerated_loss
993         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
994         self.tsc_hz = None
995         self.measured_stats = None
996         self.latency = None
997         self._totals_and_pps = None
998         self.result_tuple = None
999
1000     @property
1001     def totals_and_pps(self):
1002         if self._totals_and_pps is None:
1003             rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
1004             pps = self.value / 100.0 * self.line_rate_to_pps()
1005             self._totals_and_pps = rx_total, tx_total, pps
1006         return self._totals_and_pps
1007
1008     @property
1009     def rx_total(self):
1010         return self.totals_and_pps[0]
1011
1012     @property
1013     def tx_total(self):
1014         return self.totals_and_pps[1]
1015
1016     @property
1017     def pps(self):
1018         return self.totals_and_pps[2]
1019
1020     @property
1021     def samples(self):
1022         samples = {}
1023         for port_name, port_num in self.vnfd_helper.ports_iter():
1024             try:
1025                 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1026                 samples[port_name] = {
1027                     "in_packets": port_rx_total,
1028                     "out_packets": port_tx_total,
1029                 }
1030             except (KeyError, TypeError, NameError, MemoryError, ValueError,
1031                     SystemError, BufferError):
1032                 samples[port_name] = {
1033                     "in_packets": 0,
1034                     "out_packets": 0,
1035                 }
1036         return samples
1037
1038     def __enter__(self):
1039         self.check_interface_count()
1040         return self
1041
1042     def __exit__(self, exc_type, exc_val, exc_tb):
1043         self.make_tuple()
1044
1045     def make_tuple(self):
1046         if self.result_tuple:
1047             return
1048
1049         self.result_tuple = ProxTestDataTuple(
1050             self.tolerated_loss,
1051             self.tsc_hz,
1052             self.measured_stats['delta'].rx,
1053             self.measured_stats['delta'].tx,
1054             self.measured_stats['delta'].tsc,
1055             self.latency,
1056             self.rx_total,
1057             self.tx_total,
1058             self.pps,
1059         )
1060         self.result_tuple.log_data()
1061
1062     @contextmanager
1063     def measure_tot_stats(self):
1064         with self.sut.measure_tot_stats() as self.measured_stats:
1065             yield
1066
1067     def check_interface_count(self):
1068         # do this assert in init?  unless we expect interface count to
1069         # change from one run to another run...
1070         assert self.port_count in {1, 2, 4}, \
1071             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1072
1073     def capture_tsc_hz(self):
1074         self.tsc_hz = float(self.sut.hz())
1075
1076     def line_rate_to_pps(self):
1077       return self.port_count * self.line_speed  / BITS_PER_BYTE / (self.pkt_size + 20)
1078
1079 class ProxProfileHelper(object):
1080
1081     __prox_profile_type__ = "Generic"
1082
1083     PROX_CORE_GEN_MODE = "gen"
1084     PROX_CORE_LAT_MODE = "lat"
1085
1086     @classmethod
1087     def get_cls(cls, helper_type):
1088         """Return class of specified type."""
1089         if not helper_type:
1090             return ProxProfileHelper
1091
1092         for profile_helper_class in utils.itersubclasses(cls):
1093             if helper_type == profile_helper_class.__prox_profile_type__:
1094                 return profile_helper_class
1095
1096         return ProxProfileHelper
1097
1098     @classmethod
1099     def make_profile_helper(cls, resource_helper):
1100         return cls.get_cls(resource_helper.test_type)(resource_helper)
1101
1102     def __init__(self, resource_helper):
1103         super(ProxProfileHelper, self).__init__()
1104         self.resource_helper = resource_helper
1105         self._cpu_topology = None
1106         self._test_cores = None
1107         self._latency_cores = None
1108
1109     @property
1110     def cpu_topology(self):
1111         if not self._cpu_topology:
1112             stdout = io.BytesIO()
1113             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1114             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1115         return self._cpu_topology
1116
1117     @property
1118     def test_cores(self):
1119         if not self._test_cores:
1120             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1121         return self._test_cores
1122
1123     @property
1124     def latency_cores(self):
1125         if not self._latency_cores:
1126             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1127         return self._latency_cores
1128
1129     @contextmanager
1130     def traffic_context(self, pkt_size, value):
1131         self.sut.stop_all()
1132         self.sut.reset_stats()
1133         try:
1134             self.sut.set_pkt_size(self.test_cores, pkt_size)
1135             self.sut.set_speed(self.test_cores, value)
1136             self.sut.start_all()
1137             yield
1138         finally:
1139             self.sut.stop_all()
1140
1141     def get_cores(self, mode):
1142         cores = []
1143
1144         for section_name, section in self.setup_helper.prox_config_data:
1145             if not section_name.startswith("core"):
1146                 continue
1147
1148             for key, value in section:
1149                 if key == "mode" and value == mode:
1150                     core_tuple = CoreSocketTuple(section_name)
1151                     core = core_tuple.core_id
1152                     cores.append(core)
1153
1154         return cores
1155
1156     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1157                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1158         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1159                                      value, tolerated_loss, line_speed)
1160
1161         with data_helper, self.traffic_context(pkt_size, value):
1162             with data_helper.measure_tot_stats():
1163                 time.sleep(duration)
1164                 # Getting statistics to calculate PPS at right speed....
1165                 data_helper.capture_tsc_hz()
1166                 data_helper.latency = self.get_latency()
1167
1168         return data_helper.result_tuple, data_helper.samples
1169
1170     def get_latency(self):
1171         """
1172         :return: return lat_min, lat_max, lat_avg
1173         :rtype: list
1174         """
1175
1176         if not self._latency_cores:
1177             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1178
1179         if self._latency_cores:
1180             return self.sut.lat_stats(self._latency_cores)
1181         return []
1182
1183     def terminate(self):
1184         pass
1185
1186     def __getattr__(self, item):
1187         return getattr(self.resource_helper, item)
1188
1189
1190 class ProxMplsProfileHelper(ProxProfileHelper):
1191
1192     __prox_profile_type__ = "MPLS tag/untag"
1193
1194     def __init__(self, resource_helper):
1195         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1196         self._cores_tuple = None
1197
1198     @property
1199     def mpls_cores(self):
1200         if not self._cores_tuple:
1201             self._cores_tuple = self.get_cores_mpls()
1202         return self._cores_tuple
1203
1204     @property
1205     def tagged_cores(self):
1206         return self.mpls_cores[0]
1207
1208     @property
1209     def plain_cores(self):
1210         return self.mpls_cores[1]
1211
1212     def get_cores_mpls(self):
1213         cores_tagged = []
1214         cores_plain = []
1215         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1216             if not section_name.startswith("core"):
1217                 continue
1218
1219             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1220                 continue
1221
1222             for item_key, item_value in section:
1223                 if item_key != 'name':
1224                     continue
1225
1226                 if item_value.startswith("tag"):
1227                     core_tuple = CoreSocketTuple(section_name)
1228                     core_tag = core_tuple.core_id
1229                     cores_tagged.append(core_tag)
1230
1231                 elif item_value.startswith("udp"):
1232                     core_tuple = CoreSocketTuple(section_name)
1233                     core_udp = core_tuple.core_id
1234                     cores_plain.append(core_udp)
1235
1236         return cores_tagged, cores_plain
1237
1238     @contextmanager
1239     def traffic_context(self, pkt_size, value):
1240         self.sut.stop_all()
1241         self.sut.reset_stats()
1242         try:
1243             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1244             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1245             self.sut.set_speed(self.tagged_cores, value)
1246             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1247             self.sut.set_speed(self.plain_cores, value * ratio)
1248             self.sut.start_all()
1249             yield
1250         finally:
1251             self.sut.stop_all()
1252
1253
1254 class ProxBngProfileHelper(ProxProfileHelper):
1255
1256     __prox_profile_type__ = "BNG gen"
1257
1258     def __init__(self, resource_helper):
1259         super(ProxBngProfileHelper, self).__init__(resource_helper)
1260         self._cores_tuple = None
1261
1262     @property
1263     def bng_cores(self):
1264         if not self._cores_tuple:
1265             self._cores_tuple = self.get_cores_gen_bng_qos()
1266         return self._cores_tuple
1267
1268     @property
1269     def cpe_cores(self):
1270         return self.bng_cores[0]
1271
1272     @property
1273     def inet_cores(self):
1274         return self.bng_cores[1]
1275
1276     @property
1277     def arp_cores(self):
1278         return self.bng_cores[2]
1279
1280     @property
1281     def arp_task_cores(self):
1282         return self.bng_cores[3]
1283
1284     @property
1285     def all_rx_cores(self):
1286         return self.latency_cores
1287
1288     def get_cores_gen_bng_qos(self):
1289         cpe_cores = []
1290         inet_cores = []
1291         arp_cores = []
1292         arp_tasks_core = [0]
1293         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1294             if not section_name.startswith("core"):
1295                 continue
1296
1297             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1298                 continue
1299
1300             for item_key, item_value in section:
1301                 if item_key != 'name':
1302                     continue
1303
1304                 if item_value.startswith("cpe"):
1305                     core_tuple = CoreSocketTuple(section_name)
1306                     cpe_core = core_tuple.core_id
1307                     cpe_cores.append(cpe_core)
1308
1309                 elif item_value.startswith("inet"):
1310                     core_tuple = CoreSocketTuple(section_name)
1311                     inet_core = core_tuple.core_id
1312                     inet_cores.append(inet_core)
1313
1314                 elif item_value.startswith("arp"):
1315                     core_tuple = CoreSocketTuple(section_name)
1316                     arp_core = core_tuple.core_id
1317                     arp_cores.append(arp_core)
1318
1319                 # We check the tasks/core separately
1320                 if item_value.startswith("arp_task"):
1321                     core_tuple = CoreSocketTuple(section_name)
1322                     arp_task_core = core_tuple.core_id
1323                     arp_tasks_core.append(arp_task_core)
1324
1325         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1326
1327     @contextmanager
1328     def traffic_context(self, pkt_size, value):
1329         # Tester is sending packets at the required speed already after
1330         # setup_test(). Just get the current statistics, sleep the required
1331         # amount of time and calculate packet loss.
1332         inet_pkt_size = pkt_size
1333         cpe_pkt_size = pkt_size - 24
1334         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1335
1336         curr_up_speed = curr_down_speed = 0
1337         max_up_speed = max_down_speed = value
1338         if ratio < 1:
1339             max_down_speed = value * ratio
1340         else:
1341             max_up_speed = value / ratio
1342
1343         # Initialize cores
1344         self.sut.stop_all()
1345         time.sleep(0.5)
1346
1347         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1348         # wrong.
1349         self.sut.start(self.all_rx_cores)
1350         time.sleep(0.5)
1351         self.sut.stop(self.all_rx_cores)
1352         time.sleep(0.5)
1353         self.sut.reset_stats()
1354
1355         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1356         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1357
1358         self.sut.reset_values(self.cpe_cores)
1359         self.sut.reset_values(self.inet_cores)
1360
1361         # Set correct IP and UDP lengths in packet headers
1362         # CPE
1363         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1364         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1365         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1366         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1367
1368         # INET
1369         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1370         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1371         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1372         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1373         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1374         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1375
1376         # Sending ARP to initialize tables - need a few seconds of generation
1377         # to make sure all CPEs are initialized
1378         LOG.info("Initializing SUT: sending ARP packets")
1379         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1380         self.sut.set_speed(self.inet_cores, curr_up_speed)
1381         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1382         self.sut.start(self.arp_cores)
1383         time.sleep(4)
1384
1385         # Ramp up the transmission speed. First go to the common speed, then
1386         # increase steps for the faster one.
1387         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1388
1389         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1390
1391         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1392             # The min(..., ...) takes care of 1) floating point rounding errors
1393             # that could make curr_*_speed to be slightly greater than
1394             # max_*_speed and 2) max_*_speed not being an exact multiple of
1395             # self._step_delta.
1396             if curr_up_speed < max_up_speed:
1397                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1398             if curr_down_speed < max_down_speed:
1399                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1400
1401             self.sut.set_speed(self.inet_cores, curr_up_speed)
1402             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1403             time.sleep(self.step_time)
1404
1405         LOG.info("Target speeds reached. Starting real test.")
1406
1407         yield
1408
1409         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1410         LOG.info("Test ended. Flushing NIC buffers")
1411         self.sut.start(self.all_rx_cores)
1412         time.sleep(3)
1413         self.sut.stop(self.all_rx_cores)
1414
1415     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1416                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1417         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1418                                      value, tolerated_loss, line_speed)
1419
1420         with data_helper, self.traffic_context(pkt_size, value):
1421             with data_helper.measure_tot_stats():
1422                 time.sleep(duration)
1423                 # Getting statistics to calculate PPS at right speed....
1424                 data_helper.capture_tsc_hz()
1425                 data_helper.latency = self.get_latency()
1426
1427         return data_helper.result_tuple, data_helper.samples
1428
1429
1430 class ProxVpeProfileHelper(ProxProfileHelper):
1431
1432     __prox_profile_type__ = "vPE gen"
1433
1434     def __init__(self, resource_helper):
1435         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1436         self._cores_tuple = None
1437         self._ports_tuple = None
1438
1439     @property
1440     def vpe_cores(self):
1441         if not self._cores_tuple:
1442             self._cores_tuple = self.get_cores_gen_vpe()
1443         return self._cores_tuple
1444
1445     @property
1446     def cpe_cores(self):
1447         return self.vpe_cores[0]
1448
1449     @property
1450     def inet_cores(self):
1451         return self.vpe_cores[1]
1452
1453     @property
1454     def all_rx_cores(self):
1455         return self.latency_cores
1456
1457     @property
1458     def vpe_ports(self):
1459         if not self._ports_tuple:
1460             self._ports_tuple = self.get_ports_gen_vpe()
1461         return self._ports_tuple
1462
1463     @property
1464     def cpe_ports(self):
1465         return self.vpe_ports[0]
1466
1467     @property
1468     def inet_ports(self):
1469         return self.vpe_ports[1]
1470
1471     def get_cores_gen_vpe(self):
1472         cpe_cores = []
1473         inet_cores = []
1474         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1475             if not section_name.startswith("core"):
1476                 continue
1477
1478             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1479                 continue
1480
1481             for item_key, item_value in section:
1482                 if item_key != 'name':
1483                     continue
1484
1485                 if item_value.startswith("cpe"):
1486                     core_tuple = CoreSocketTuple(section_name)
1487                     core_tag = core_tuple.core_id
1488                     cpe_cores.append(core_tag)
1489
1490                 elif item_value.startswith("inet"):
1491                     core_tuple = CoreSocketTuple(section_name)
1492                     inet_core = core_tuple.core_id
1493                     inet_cores.append(inet_core)
1494
1495         return cpe_cores, inet_cores
1496
1497     def get_ports_gen_vpe(self):
1498         cpe_ports = []
1499         inet_ports = []
1500
1501         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1502             if not section_name.startswith("port"):
1503                 continue
1504             tx_port_iter = re.finditer(r'\d+', section_name)
1505             tx_port_no = int(next(tx_port_iter).group(0))
1506
1507             for item_key, item_value in section:
1508                 if item_key != 'name':
1509                     continue
1510
1511                 if item_value.startswith("cpe"):
1512                     cpe_ports.append(tx_port_no)
1513
1514                 elif item_value.startswith("inet"):
1515                     inet_ports.append(tx_port_no)
1516
1517         return cpe_ports, inet_ports
1518
1519     @contextmanager
1520     def traffic_context(self, pkt_size, value):
1521         # Calculate the target upload and download speed. The upload and
1522         # download packets have different packet sizes, so in order to get
1523         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1524         # of the packet sizes.
1525         cpe_pkt_size = pkt_size
1526         inet_pkt_size = pkt_size - 4
1527         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1528
1529         curr_up_speed = curr_down_speed = 0
1530         max_up_speed = max_down_speed = value
1531         if ratio < 1:
1532             max_down_speed = value * ratio
1533         else:
1534             max_up_speed = value / ratio
1535
1536         # Adjust speed when multiple cores per port are used to generate traffic
1537         if len(self.cpe_ports) != len(self.cpe_cores):
1538             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1539         if len(self.inet_ports) != len(self.inet_cores):
1540             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1541
1542         # Initialize cores
1543         self.sut.stop_all()
1544         time.sleep(2)
1545
1546         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1547         # wrong.
1548         self.sut.start(self.all_rx_cores)
1549         time.sleep(2)
1550         self.sut.stop(self.all_rx_cores)
1551         time.sleep(2)
1552         self.sut.reset_stats()
1553
1554         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1555         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1556
1557         self.sut.reset_values(self.cpe_cores)
1558         self.sut.reset_values(self.inet_cores)
1559
1560         # Set correct IP and UDP lengths in packet headers
1561         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1562         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1563         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1564         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1565
1566         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1567         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1568         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1569         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1570
1571         self.sut.set_speed(self.inet_cores, curr_up_speed)
1572         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1573
1574         # Ramp up the transmission speed. First go to the common speed, then
1575         # increase steps for the faster one.
1576         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1577
1578         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1579
1580         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1581             # The min(..., ...) takes care of 1) floating point rounding errors
1582             # that could make curr_*_speed to be slightly greater than
1583             # max_*_speed and 2) max_*_speed not being an exact multiple of
1584             # self._step_delta.
1585             if curr_up_speed < max_up_speed:
1586                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1587             if curr_down_speed < max_down_speed:
1588                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1589
1590             self.sut.set_speed(self.inet_cores, curr_up_speed)
1591             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1592             time.sleep(self.step_time)
1593
1594         LOG.info("Target speeds reached. Starting real test.")
1595
1596         yield
1597
1598         self.sut.stop(self.cpe_cores + self.inet_cores)
1599         LOG.info("Test ended. Flushing NIC buffers")
1600         self.sut.start(self.all_rx_cores)
1601         time.sleep(3)
1602         self.sut.stop(self.all_rx_cores)
1603
1604     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1605                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1606         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1607                                      value, tolerated_loss, line_speed)
1608
1609         with data_helper, self.traffic_context(pkt_size, value):
1610             with data_helper.measure_tot_stats():
1611                 time.sleep(duration)
1612                 # Getting statistics to calculate PPS at right speed....
1613                 data_helper.capture_tsc_hz()
1614                 data_helper.latency = self.get_latency()
1615
1616         return data_helper.result_tuple, data_helper.samples
1617
1618
1619 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1620
1621     __prox_profile_type__ = "lwAFTR gen"
1622
1623     def __init__(self, resource_helper):
1624         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1625         self._cores_tuple = None
1626         self._ports_tuple = None
1627         self.step_delta = 5
1628         self.step_time = 0.5
1629
1630     @property
1631     def _lwaftr_cores(self):
1632         if not self._cores_tuple:
1633             self._cores_tuple = self._get_cores_gen_lwaftr()
1634         return self._cores_tuple
1635
1636     @property
1637     def tun_cores(self):
1638         return self._lwaftr_cores[0]
1639
1640     @property
1641     def inet_cores(self):
1642         return self._lwaftr_cores[1]
1643
1644     @property
1645     def _lwaftr_ports(self):
1646         if not self._ports_tuple:
1647             self._ports_tuple = self._get_ports_gen_lw_aftr()
1648         return self._ports_tuple
1649
1650     @property
1651     def tun_ports(self):
1652         return self._lwaftr_ports[0]
1653
1654     @property
1655     def inet_ports(self):
1656         return self._lwaftr_ports[1]
1657
1658     @property
1659     def all_rx_cores(self):
1660         return self.latency_cores
1661
1662     def _get_cores_gen_lwaftr(self):
1663         tun_cores = []
1664         inet_cores = []
1665         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1666             if not section_name.startswith("core"):
1667                 continue
1668
1669             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1670                 continue
1671
1672             core_tuple = CoreSocketTuple(section_name)
1673             core_tag = core_tuple.core_id
1674             for item_value in (v for k, v in section if k == 'name'):
1675                 if item_value.startswith('tun'):
1676                     tun_cores.append(core_tag)
1677                 elif item_value.startswith('inet'):
1678                     inet_cores.append(core_tag)
1679
1680         return tun_cores, inet_cores
1681
1682     def _get_ports_gen_lw_aftr(self):
1683         tun_ports = []
1684         inet_ports = []
1685
1686         re_port = re.compile(r'port (\d+)')
1687         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1688             match = re_port.search(section_name)
1689             if not match:
1690                 continue
1691
1692             tx_port_no = int(match.group(1))
1693             for item_value in (v for k, v in section if k == 'name'):
1694                 if item_value.startswith('lwB4'):
1695                     tun_ports.append(tx_port_no)
1696                 elif item_value.startswith('inet'):
1697                     inet_ports.append(tx_port_no)
1698
1699         return tun_ports, inet_ports
1700
1701     @staticmethod
1702     def _resize(len1, len2):
1703         if len1 == len2:
1704             return 1.0
1705         return 1.0 * len1 / len2
1706
1707     @contextmanager
1708     def traffic_context(self, pkt_size, value):
1709         # Tester is sending packets at the required speed already after
1710         # setup_test(). Just get the current statistics, sleep the required
1711         # amount of time and calculate packet loss.
1712         tun_pkt_size = pkt_size
1713         inet_pkt_size = pkt_size - 40
1714         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1715
1716         curr_up_speed = curr_down_speed = 0
1717         max_up_speed = max_down_speed = value
1718
1719         max_up_speed = value / ratio
1720
1721         # Adjust speed when multiple cores per port are used to generate traffic
1722         if len(self.tun_ports) != len(self.tun_cores):
1723             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1724         if len(self.inet_ports) != len(self.inet_cores):
1725             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1726
1727         # Initialize cores
1728         self.sut.stop_all()
1729         time.sleep(0.5)
1730
1731         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1732         # wrong.
1733         self.sut.start(self.all_rx_cores)
1734         time.sleep(0.5)
1735         self.sut.stop(self.all_rx_cores)
1736         time.sleep(0.5)
1737         self.sut.reset_stats()
1738
1739         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1740         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1741
1742         self.sut.reset_values(self.tun_cores)
1743         self.sut.reset_values(self.inet_cores)
1744
1745         # Set correct IP and UDP lengths in packet headers
1746         # tun
1747         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1748         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1749         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1750         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1751         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1752         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1753
1754         # INET
1755         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1756         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1757         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1758         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1759
1760         LOG.info("Initializing SUT: sending lwAFTR packets")
1761         self.sut.set_speed(self.inet_cores, curr_up_speed)
1762         self.sut.set_speed(self.tun_cores, curr_down_speed)
1763         time.sleep(4)
1764
1765         # Ramp up the transmission speed. First go to the common speed, then
1766         # increase steps for the faster one.
1767         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1768
1769         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1770
1771         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1772             # The min(..., ...) takes care of 1) floating point rounding errors
1773             # that could make curr_*_speed to be slightly greater than
1774             # max_*_speed and 2) max_*_speed not being an exact multiple of
1775             # self._step_delta.
1776             if curr_up_speed < max_up_speed:
1777                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1778             if curr_down_speed < max_down_speed:
1779                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1780
1781             self.sut.set_speed(self.inet_cores, curr_up_speed)
1782             self.sut.set_speed(self.tun_cores, curr_down_speed)
1783             time.sleep(self.step_time)
1784
1785         LOG.info("Target speeds reached. Starting real test.")
1786
1787         yield
1788
1789         self.sut.stop(self.tun_cores + self.inet_cores)
1790         LOG.info("Test ended. Flushing NIC buffers")
1791         self.sut.start(self.all_rx_cores)
1792         time.sleep(3)
1793         self.sut.stop(self.all_rx_cores)
1794
1795     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1796                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1797         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1798                                      value, tolerated_loss, line_speed)
1799
1800         with data_helper, self.traffic_context(pkt_size, value):
1801             with data_helper.measure_tot_stats():
1802                 time.sleep(duration)
1803                 # Getting statistics to calculate PPS at right speed....
1804                 data_helper.capture_tsc_hz()
1805                 data_helper.latency = self.get_latency()
1806
1807         return data_helper.result_tuple, data_helper.samples