NSB PROX L3FWD Dropping packets
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import array
16 import io
17 import logging
18 import operator
19 import os
20 import re
21 import select
22 import socket
23 import time
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
28
29 import six
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
32
33 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
34 from yardstick.common import utils
35 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
36 from yardstick.network_services.helpers.iniparser import ConfigParser
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
39
40 PROX_PORT = 8474
41
42 SECTION_NAME = 0
43 SECTION_CONTENTS = 1
44
45 LOG = logging.getLogger(__name__)
46 LOG.setLevel(logging.DEBUG)
47
48 TEN_GIGABIT = 1e10
49 BITS_PER_BYTE = 8
50 RETRY_SECONDS = 60
51 RETRY_INTERVAL = 1
52
53 CONFIGURATION_OPTIONS = (
54     # dict key           section     key               default value
55     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
56     ('testDuration', 'general', 'test_duration', 5.0),
57     ('testPrecision', 'general', 'test_precision', 1.0),
58     ('tests', 'general', 'tests', None),
59     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
60
61     ('logFile', 'logging', 'file', 'dats.log'),
62     ('logDateFormat', 'logging', 'datefmt', None),
63     ('logLevel', 'logging', 'level', 'INFO'),
64     ('logOverwrite', 'logging', 'overwrite', 1),
65
66     ('testerIp', 'tester', 'ip', None),
67     ('testerUser', 'tester', 'user', 'root'),
68     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
69     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
70     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
71     ('testerSocketId', 'tester', 'socket_id', 0),
72
73     ('sutIp', 'sut', 'ip', None),
74     ('sutUser', 'sut', 'user', 'root'),
75     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
76     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
77     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
78     ('sutSocketId', 'sut', 'socket_id', 0),
79 )
80
81
82 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
83     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
84
85     def __new__(cls, *args):
86         try:
87             matches = cls.CORE_RE.search(str(args[0]))
88             if matches:
89                 args = matches.groups()
90
91             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
92                                                        'h' if args[2] else '')
93
94         except (AttributeError, TypeError, IndexError, ValueError):
95             raise ValueError('Invalid core spec {}'.format(args))
96
97     def is_hyperthread(self):
98         return self.hyperthread == 'h'
99
100     @property
101     def index(self):
102         return int(self.is_hyperthread())
103
104     def find_in_topology(self, cpu_topology):
105         try:
106             socket_core_match = cpu_topology[self.socket_id][self.core_id]
107             sorted_match = sorted(socket_core_match.values())
108             return sorted_match[self.index][0]
109         except (KeyError, IndexError):
110             template = "Core {}{} on socket {} does not exist"
111             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
112
113
114 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
115     def __new__(cls, *args):
116         try:
117             assert args[0] is not str(args[0])
118             args = tuple(args[0])
119         except (AssertionError, IndexError, TypeError):
120             pass
121
122         return super(TotStatsTuple, cls).__new__(cls, *args)
123
124
125 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
126                                                         'delta_tx,delta_tsc,'
127                                                         'latency,rx_total,tx_total,pps')):
128     @property
129     def pkt_loss(self):
130         try:
131             return 1e2 * self.drop_total / float(self.tx_total)
132         except ZeroDivisionError:
133             return 100.0
134
135     @property
136     def mpps(self):
137         # calculate the effective throughput in Mpps
138         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
139
140     @property
141     def can_be_lost(self):
142         return int(self.tx_total * self.tolerated / 1e2)
143
144     @property
145     def drop_total(self):
146         return self.tx_total - self.rx_total
147
148     @property
149     def success(self):
150         return self.drop_total <= self.can_be_lost
151
152     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
153         if pkt_loss is None:
154             pkt_loss = self.pkt_loss
155
156         if port_samples is None:
157             port_samples = {}
158
159         latency_keys = [
160             "LatencyMin",
161             "LatencyMax",
162             "LatencyAvg",
163         ]
164
165         samples = {
166             "Throughput": self.mpps,
167             "DropPackets": pkt_loss,
168             "CurrentDropPackets": pkt_loss,
169             "TxThroughput": self.pps / 1e6,
170             "RxThroughput": self.mpps,
171             "PktSize": pkt_size,
172         }
173         if port_samples:
174             samples.update(port_samples)
175
176         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
177         return samples
178
179     def log_data(self, logger=None):
180         if logger is None:
181             logger = LOG
182
183         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
184         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
185         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
186
187
188 class PacketDump(object):
189     @staticmethod
190     def assert_func(func, value1, value2, template=None):
191         assert func(value1, value2), template.format(value1, value2)
192
193     def __init__(self, port_id, data_len, payload):
194         template = "Packet dump has specified length {}, but payload is {} bytes long"
195         self.assert_func(operator.eq, data_len, len(payload), template)
196         self._port_id = port_id
197         self._data_len = data_len
198         self._payload = payload
199
200     @property
201     def port_id(self):
202         """Get the port id of the packet dump"""
203         return self._port_id
204
205     @property
206     def data_len(self):
207         """Get the length of the data received"""
208         return self._data_len
209
210     def __str__(self):
211         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
212
213     def payload(self, start=None, end=None):
214         """Get part of the payload as a list of ordinals.
215
216         Returns a list of byte values, matching the contents of the packet dump.
217         Optional start and end parameters can be specified to retrieve only a
218         part of the packet contents.
219
220         The number of elements in the list is equal to end - start + 1, so end
221         is the offset of the last character.
222
223         Args:
224             start (pos. int): the starting offset in the payload. If it is not
225                 specified or None, offset 0 is assumed.
226             end (pos. int): the ending offset of the payload. If it is not
227                 specified or None, the contents until the end of the packet are
228                 returned.
229
230         Returns:
231             [int, int, ...]. Each int represents the ordinal value of a byte in
232             the packet payload.
233         """
234         if start is None:
235             start = 0
236
237         if end is None:
238             end = self.data_len - 1
239
240         # Bounds checking on offsets
241         template = "Start offset must be non-negative"
242         self.assert_func(operator.ge, start, 0, template)
243
244         template = "End offset must be less than {1}"
245         self.assert_func(operator.lt, end, self.data_len, template)
246
247         # Adjust for splice operation: end offset must be 1 more than the offset
248         # of the last desired character.
249         end += 1
250
251         return self._payload[start:end]
252
253
254 class ProxSocketHelper(object):
255
256     def __init__(self, sock=None):
257         """ creates new prox instance """
258         super(ProxSocketHelper, self).__init__()
259
260         if sock is None:
261             sock = socket.socket()
262
263         self._sock = sock
264         self._pkt_dumps = []
265         self.master_stats = None
266
267     def connect(self, ip, port):
268         """Connect to the prox instance on the remote system"""
269         self._sock.connect((ip, port))
270
271     def get_socket(self):
272         """ get the socket connected to the remote instance """
273         return self._sock
274
275     def _parse_socket_data(self, decoded_data, pkt_dump_only):
276         def get_newline_index():
277             return decoded_data.find('\n', index)
278
279         ret_str = ''
280         index = 0
281         for newline_index in iter(get_newline_index, -1):
282             ret_str = decoded_data[index:newline_index]
283
284             try:
285                 mode, port_id, data_len = ret_str.split(',', 2)
286             except ValueError:
287                 mode, port_id, data_len = None, None, None
288
289             if mode != 'pktdump':
290                 # Regular 1-line message. Stop reading from the socket.
291                 LOG.debug("Regular response read")
292                 return ret_str
293
294             LOG.debug("Packet dump header read: [%s]", ret_str)
295
296             # The line is a packet dump header. Parse it, read the
297             # packet payload, store the dump for later retrieval.
298             # Skip over the packet dump and continue processing: a
299             # 1-line response may follow the packet dump.
300
301             data_len = int(data_len)
302             data_start = newline_index + 1  # + 1 to skip over \n
303             data_end = data_start + data_len
304             sub_data = decoded_data[data_start:data_end]
305             pkt_payload = array.array('B', (ord(v) for v in sub_data))
306             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
307             self._pkt_dumps.append(pkt_dump)
308
309             if pkt_dump_only:
310                 # Return boolean instead of string to signal
311                 # successful reception of the packet dump.
312                 LOG.debug("Packet dump stored, returning")
313                 return True
314
315             index = data_end + 1
316
317         return ret_str
318
319     def get_data(self, pkt_dump_only=False, timeout=1):
320         """ read data from the socket """
321
322         # This method behaves slightly differently depending on whether it is
323         # called to read the response to a command (pkt_dump_only = 0) or if
324         # it is called specifically to read a packet dump (pkt_dump_only = 1).
325         #
326         # Packet dumps look like:
327         #   pktdump,<port_id>,<data_len>\n
328         #   <packet contents as byte array>\n
329         # This means the total packet dump message consists of 2 lines instead
330         # of 1 line.
331         #
332         # - Response for a command (pkt_dump_only = 0):
333         #   1) Read response from the socket until \n (end of message)
334         #   2a) If the response is a packet dump header (starts with "pktdump,"):
335         #     - Read the packet payload and store the packet dump for later
336         #       retrieval.
337         #     - Reset the state and restart from 1). Eventually state 2b) will
338         #       be reached and the function will return.
339         #   2b) If the response is not a packet dump:
340         #     - Return the received message as a string
341         #
342         # - Explicit request to read a packet dump (pkt_dump_only = 1):
343         #   - Read the dump header and payload
344         #   - Store the packet dump for later retrieval
345         #   - Return True to signify a packet dump was successfully read
346
347         def is_ready():
348             # recv() is blocking, so avoid calling it when no data is waiting.
349             ready = select.select([self._sock], [], [], timeout)
350             return bool(ready[0])
351
352         status = False
353         ret_str = ""
354         for status in iter(is_ready, False):
355             decoded_data = self._sock.recv(256).decode('utf-8')
356             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
357
358         LOG.debug("Received data from socket: [%s]", ret_str)
359         return ret_str if status else ''
360
361     def put_command(self, to_send):
362         """ send data to the remote instance """
363         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
364         try:
365             # NOTE: sendall will block, we need a timeout
366             self._sock.sendall(to_send.encode('utf-8'))
367         except:  # pylint: disable=bare-except
368             pass
369
370     def get_packet_dump(self):
371         """ get the next packet dump """
372         if self._pkt_dumps:
373             return self._pkt_dumps.pop(0)
374         return None
375
376     def stop_all_reset(self):
377         """ stop the remote instance and reset stats """
378         LOG.debug("Stop all and reset stats")
379         self.stop_all()
380         self.reset_stats()
381
382     def stop_all(self):
383         """ stop all cores on the remote instance """
384         LOG.debug("Stop all")
385         self.put_command("stop all\n")
386         time.sleep(3)
387
388     def stop(self, cores, task=''):
389         """ stop specific cores on the remote instance """
390         LOG.debug("Stopping cores %s", cores)
391         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
392         time.sleep(3)
393
394     def start_all(self):
395         """ start all cores on the remote instance """
396         LOG.debug("Start all")
397         self.put_command("start all\n")
398
399     def start(self, cores):
400         """ start specific cores on the remote instance """
401         LOG.debug("Starting cores %s", cores)
402         self.put_command("start {}\n".format(join_non_strings(',', cores)))
403         time.sleep(3)
404
405     def reset_stats(self):
406         """ reset the statistics on the remote instance """
407         LOG.debug("Reset stats")
408         self.put_command("reset stats\n")
409         time.sleep(1)
410
411     def _run_template_over_cores(self, template, cores, *args):
412         for core in cores:
413             self.put_command(template.format(core, *args))
414
415     def set_pkt_size(self, cores, pkt_size):
416         """ set the packet size to generate on the remote instance """
417         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
418         pkt_size -= 4
419         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
420         time.sleep(1)
421
422     def set_value(self, cores, offset, value, length):
423         """ set value on the remote instance """
424         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
425         LOG.debug(msg, cores, value, length, offset)
426         template = "set value {} 0 {} {} {}\n"
427         self._run_template_over_cores(template, cores, offset, value, length)
428
429     def reset_values(self, cores):
430         """ reset values on the remote instance """
431         LOG.debug("Set value for core(s) %s", cores)
432         self._run_template_over_cores("reset values {} 0\n", cores)
433
434     def set_speed(self, cores, speed, tasks=None):
435         """ set speed on the remote instance """
436         if tasks is None:
437             tasks = [0] * len(cores)
438         elif len(tasks) != len(cores):
439             LOG.error("set_speed: cores and tasks must have the same len")
440         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
441         for (core, task) in list(zip(cores, tasks)):
442             self.put_command("speed {} {} {}\n".format(core, task, speed))
443
444     def slope_speed(self, cores_speed, duration, n_steps=0):
445         """will start to increase speed from 0 to N where N is taken from
446         a['speed'] for each a in cores_speed"""
447         # by default, each step will take 0.5 sec
448         if n_steps == 0:
449             n_steps = duration * 2
450
451         private_core_data = []
452         step_duration = float(duration) / n_steps
453         for core_data in cores_speed:
454             target = float(core_data['speed'])
455             private_core_data.append({
456                 'cores': core_data['cores'],
457                 'zero': 0,
458                 'delta': target / n_steps,
459                 'current': 0,
460                 'speed': target,
461             })
462
463         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
464         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
465             time.sleep(step_duration)
466             for core_data in private_core_data:
467                 core_data['current'] = core_data[key1] + core_data[key2]
468                 self.set_speed(core_data['cores'], core_data['current'])
469
470     def set_pps(self, cores, pps, pkt_size):
471         """ set packets per second for specific cores on the remote instance """
472         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
473         LOG.debug(msg, cores, pps, pkt_size)
474
475         # speed in percent of line-rate
476         speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
477         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
478
479     def lat_stats(self, cores, task=0):
480         """Get the latency statistics from the remote system"""
481         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
482         lat_min = {}
483         lat_max = {}
484         lat_avg = {}
485         for core in cores:
486             self.put_command("lat stats {} {} \n".format(core, task))
487             ret = self.get_data()
488
489             try:
490                 lat_min[core], lat_max[core], lat_avg[core] = \
491                     tuple(int(n) for n in ret.split(",")[:3])
492
493             except (AttributeError, ValueError, TypeError):
494                 pass
495
496         return lat_min, lat_max, lat_avg
497
498     def get_all_tot_stats(self):
499         self.put_command("tot stats\n")
500         all_stats_str = self.get_data().split(",")
501         if len(all_stats_str) != 4:
502             all_stats = [0] * 4
503             return all_stats
504         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
505         self.master_stats = all_stats
506         return all_stats
507
508     def hz(self):
509         return self.get_all_tot_stats()[3]
510
511     def core_stats(self, cores, task=0):
512         """Get the receive statistics from the remote system"""
513         rx = tx = drop = tsc = 0
514         for core in cores:
515             self.put_command("core stats {} {}\n".format(core, task))
516             ret = self.get_data().split(",")
517             rx += int(ret[0])
518             tx += int(ret[1])
519             drop += int(ret[2])
520             tsc = int(ret[3])
521         return rx, tx, drop, tsc
522
523     def port_stats(self, ports):
524         """get counter values from a specific port"""
525         tot_result = [0] * 12
526         for port in ports:
527             self.put_command("port_stats {}\n".format(port))
528             ret = [try_int(s, 0) for s in self.get_data().split(",")]
529             tot_result = [sum(x) for x in zip(tot_result, ret)]
530         return tot_result
531
532     @contextmanager
533     def measure_tot_stats(self):
534         start = self.get_all_tot_stats()
535         container = {'start_tot': start}
536         try:
537             yield container
538         finally:
539             container['end_tot'] = end = self.get_all_tot_stats()
540
541         container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
542
543     def tot_stats(self):
544         """Get the total statistics from the remote system"""
545         stats = self.get_all_tot_stats()
546         return stats[:3]
547
548     def tot_ierrors(self):
549         """Get the total ierrors from the remote system"""
550         self.put_command("tot ierrors tot\n")
551         recv = self.get_data().split(',')
552         tot_ierrors = int(recv[0])
553         tsc = int(recv[0])
554         return tot_ierrors, tsc
555
556     def set_count(self, count, cores):
557         """Set the number of packets to send on the specified core"""
558         self._run_template_over_cores("count {} 0 {}\n", cores, count)
559
560     def dump_rx(self, core_id, task_id=0, count=1):
561         """Activate dump on rx on the specified core"""
562         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
563         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
564         time.sleep(1.5)  # Give PROX time to set up packet dumping
565
566     def quit(self):
567         self.stop_all()
568         self._quit()
569         self.force_quit()
570
571     def _quit(self):
572         """ stop all cores on the remote instance """
573         LOG.debug("Quit prox")
574         self.put_command("quit\n")
575         time.sleep(3)
576
577     def force_quit(self):
578         """ stop all cores on the remote instance """
579         LOG.debug("Force Quit prox")
580         self.put_command("quit_force\n")
581         time.sleep(3)
582
583
584 _LOCAL_OBJECT = object()
585
586
587 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
588     # the actual app is lowercase
589     APP_NAME = 'prox'
590     # not used for Prox but added for consistency
591     VNF_TYPE = "PROX"
592
593     LUA_PARAMETER_NAME = ""
594     LUA_PARAMETER_PEER = {
595         "gen": "sut",
596         "sut": "gen",
597     }
598
599     CONFIG_QUEUE_TIMEOUT = 120
600
601     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
602         self.remote_path = None
603         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
604         self.remote_prox_file_name = None
605         self._prox_config_data = None
606         self.additional_files = {}
607         self.config_queue = Queue()
608         # allow_exit_without_flush
609         self.config_queue.cancel_join_thread()
610         self._global_section = None
611
612     @property
613     def prox_config_data(self):
614         if self._prox_config_data is None:
615             # this will block, but it needs too
616             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
617         return self._prox_config_data
618
619     @property
620     def global_section(self):
621         if self._global_section is None and self.prox_config_data:
622             self._global_section = self.find_section("global")
623         return self._global_section
624
625     def find_section(self, name, default=_LOCAL_OBJECT):
626         result = next((value for key, value in self.prox_config_data if key == name), default)
627         if result is _LOCAL_OBJECT:
628             raise KeyError('{} not found in Prox config'.format(name))
629         return result
630
631     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
632         section = self.find_section(section_name, [])
633         result = next((value for key, value in section if key == section_key), default)
634         if result is _LOCAL_OBJECT:
635             template = '{} not found in {} section of Prox config'
636             raise KeyError(template.format(section_key, section_name))
637         return result
638
639     def copy_to_target(self, config_file_path, prox_file):
640         remote_path = os.path.join("/tmp", prox_file)
641         self.ssh_helper.put(config_file_path, remote_path)
642         return remote_path
643
644     @staticmethod
645     def _get_tx_port(section, sections):
646         iface_port = [-1]
647         for item in sections[section]:
648             if item[0] == "tx port":
649                 iface_port = re.findall(r'\d+', item[1])
650                 # do we want the last one?
651                 #   if yes, then can we reverse?
652         return int(iface_port[0])
653
654     @staticmethod
655     def _replace_quoted_with_value(quoted, value, count=1):
656         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
657         return new_string
658
659     def _insert_additional_file(self, value):
660         file_str = value.split('"')
661         base_name = os.path.basename(file_str[1])
662         file_str[1] = self.additional_files[base_name]
663         return '"'.join(file_str)
664
665     def generate_prox_config_file(self, config_path):
666         sections = []
667         prox_config = ConfigParser(config_path, sections)
668         prox_config.parse()
669
670         # Ensure MAC is set "hardware"
671         all_ports = self.vnfd_helper.port_pairs.all_ports
672         # use dpdk port number
673         for port_name in all_ports:
674             port_num = self.vnfd_helper.port_num(port_name)
675             port_section_name = "port {}".format(port_num)
676             for section_name, section in sections:
677                 if port_section_name != section_name:
678                     continue
679
680                 for section_data in section:
681                     if section_data[0] == "mac":
682                         section_data[1] = "hardware"
683
684         # search for dst mac
685         for _, section in sections:
686             for section_data in section:
687                 item_key, item_val = section_data
688                 if item_val.startswith("@@dst_mac"):
689                     tx_port_iter = re.finditer(r'\d+', item_val)
690                     tx_port_no = int(next(tx_port_iter).group(0))
691                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
692                     mac = intf["virtual-interface"]["dst_mac"]
693                     section_data[1] = mac.replace(":", " ", 6)
694
695                 if item_key == "dst mac" and item_val.startswith("@@"):
696                     tx_port_iter = re.finditer(r'\d+', item_val)
697                     tx_port_no = int(next(tx_port_iter).group(0))
698                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
699                     mac = intf["virtual-interface"]["dst_mac"]
700                     section_data[1] = mac
701
702                 if item_val.startswith("@@src_mac"):
703                     tx_port_iter = re.finditer(r'\d+', item_val)
704                     tx_port_no = int(next(tx_port_iter).group(0))
705                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
706                     mac = intf["virtual-interface"]["local_mac"]
707                     section_data[1] = mac.replace(":", " ", 6)
708
709                 if item_key == "src mac" and item_val.startswith("@@"):
710                     tx_port_iter = re.finditer(r'\d+', item_val)
711                     tx_port_no = int(next(tx_port_iter).group(0))
712                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
713                     mac = intf["virtual-interface"]["local_mac"]
714                     section_data[1] = mac
715
716         # if addition file specified in prox config
717         if not self.additional_files:
718             return sections
719
720         for section_name, section in sections:
721             for section_data in section:
722                 try:
723                     if section_data[0].startswith("dofile"):
724                         section_data[0] = self._insert_additional_file(section_data[0])
725
726                     if section_data[1].startswith("dofile"):
727                         section_data[1] = self._insert_additional_file(section_data[1])
728                 except:  # pylint: disable=bare-except
729                     pass
730
731         return sections
732
733     @staticmethod
734     def write_prox_lua(lua_config):
735         """
736         Write an .ini-format config file for PROX (parameters.lua)
737         PROX does not allow a space before/after the =, so we need
738         a custom method
739         """
740         out = []
741         for key in lua_config:
742             value = '"' + lua_config[key] + '"'
743             if key == "__name__":
744                 continue
745             if value is not None and value != '@':
746                 key = "=".join((key, str(value).replace('\n', '\n\t')))
747                 out.append(key)
748             else:
749                 key = str(key).replace('\n', '\n\t')
750                 out.append(key)
751         return os.linesep.join(out)
752
753     @staticmethod
754     def write_prox_config(prox_config):
755         """
756         Write an .ini-format config file for PROX
757         PROX does not allow a space before/after the =, so we need
758         a custom method
759         """
760         out = []
761         for (section_name, section) in prox_config:
762             out.append("[{}]".format(section_name))
763             for item in section:
764                 key, value = item
765                 if key == "__name__":
766                     continue
767                 if value is not None and value != '@':
768                     key = "=".join((key, str(value).replace('\n', '\n\t')))
769                     out.append(key)
770                 else:
771                     key = str(key).replace('\n', '\n\t')
772                     out.append(key)
773         return os.linesep.join(out)
774
775     def put_string_to_file(self, s, remote_path):
776         file_obj = cStringIO(s)
777         self.ssh_helper.put_file_obj(file_obj, remote_path)
778         return remote_path
779
780     def generate_prox_lua_file(self):
781         p = OrderedDict()
782         all_ports = self.vnfd_helper.port_pairs.all_ports
783         for port_name in all_ports:
784             port_num = self.vnfd_helper.port_num(port_name)
785             intf = self.vnfd_helper.find_interface(name=port_name)
786             vintf = intf['virtual-interface']
787             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
788             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
789
790         return p
791
792     def upload_prox_lua(self, config_file, lua_data):
793         # prox can't handle spaces around ' = ' so use custom method
794         out = StringIO(self.write_prox_lua(lua_data))
795         out.seek(0)
796         remote_path = os.path.join("/tmp", config_file)
797         self.ssh_helper.put_file_obj(out, remote_path)
798
799         return remote_path
800
801     def upload_prox_config(self, config_file, prox_config_data):
802         # prox can't handle spaces around ' = ' so use custom method
803         out = StringIO(self.write_prox_config(prox_config_data))
804         out.seek(0)
805         remote_path = os.path.join("/tmp", config_file)
806         self.ssh_helper.put_file_obj(out, remote_path)
807
808         return remote_path
809
810     def build_config_file(self):
811         task_path = self.scenario_helper.task_path
812         options = self.scenario_helper.options
813         config_path = options['prox_config']
814         config_file = os.path.basename(config_path)
815         config_path = find_relative_file(config_path, task_path)
816         self.additional_files = {}
817
818         try:
819             if options['prox_generate_parameter']:
820                 self.lua = []
821                 self.lua = self.generate_prox_lua_file()
822                 if len(self.lua) > 0:
823                     self.upload_prox_lua("parameters.lua", self.lua)
824         except:  # pylint: disable=bare-except
825             pass
826
827         prox_files = options.get('prox_files', [])
828         if isinstance(prox_files, six.string_types):
829             prox_files = [prox_files]
830         for key_prox_file in prox_files:
831             base_prox_file = os.path.basename(key_prox_file)
832             key_prox_path = find_relative_file(key_prox_file, task_path)
833             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
834             self.additional_files[base_prox_file] = remote_prox_file
835
836         self._prox_config_data = self.generate_prox_config_file(config_path)
837         # copy config to queue so we can read it from traffic_runner process
838         self.config_queue.put(self._prox_config_data)
839         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
840
841     def build_config(self):
842         self.build_config_file()
843
844         options = self.scenario_helper.options
845         prox_args = options['prox_args']
846         tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
847
848         self.pipeline_kwargs = {
849             'tool_path': tool_path,
850             'tool_dir': os.path.dirname(tool_path),
851             'cfg_file': self.remote_path,
852             'args': ' '.join(' '.join([str(k), str(v) if v else ''])
853                              for k, v in prox_args.items())
854         }
855
856         cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
857                         "{args} -f {cfg_file} '")
858         return cmd_template.format(**self.pipeline_kwargs)
859
860
861 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
862 class ProxResourceHelper(ClientResourceHelper):
863
864     RESOURCE_WORD = 'prox'
865
866     PROX_MODE = ""
867
868     WAIT_TIME = 3
869
870     @staticmethod
871     def find_pci(pci, bound_pci):
872         # we have to substring match PCI bus address from the end
873         return any(b.endswith(pci) for b in bound_pci)
874
875     def __init__(self, setup_helper):
876         super(ProxResourceHelper, self).__init__(setup_helper)
877         self.mgmt_interface = self.vnfd_helper.mgmt_interface
878         self._user = self.mgmt_interface["user"]
879         self._ip = self.mgmt_interface["ip"]
880
881         self.done = False
882         self._vpci_to_if_name_map = None
883         self.additional_file = {}
884         self.remote_prox_file_name = None
885         self.lower = None
886         self.upper = None
887         self.step_delta = 1
888         self.step_time = 0.5
889         self._test_type = None
890
891     @property
892     def sut(self):
893         if not self.client:
894             self.client = self._connect()
895         return self.client
896
897     @property
898     def test_type(self):
899         if self._test_type is None:
900             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
901         return self._test_type
902
903     def run_traffic(self, traffic_profile):
904         self._queue.cancel_join_thread()
905         self.lower = 0.0
906         self.upper = 100.0
907
908         traffic_profile.init(self._queue)
909         # this frees up the run_traffic loop
910         self.client_started.value = 1
911
912         while not self._terminated.value:
913             # move it all to traffic_profile
914             self._run_traffic_once(traffic_profile)
915
916     def _run_traffic_once(self, traffic_profile):
917         traffic_profile.execute_traffic(self)
918         if traffic_profile.done:
919             self._queue.put({'done': True})
920             LOG.debug("tg_prox done")
921             self._terminated.value = 1
922
923     # For VNF use ResourceHelper method to collect KPIs directly.
924     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
925     def collect_collectd_kpi(self):
926         return self._collect_resource_kpi()
927
928     def collect_kpi(self):
929         result = super(ProxResourceHelper, self).collect_kpi()
930         # add in collectd kpis manually
931         if result:
932             result['collect_stats'] = self._collect_resource_kpi()
933         return result
934
935     def terminate(self):
936         # should not be called, use VNF terminate
937         raise NotImplementedError()
938
939     def up_post(self):
940         return self.sut  # force connection
941
942     def execute(self, cmd, *args, **kwargs):
943         func = getattr(self.sut, cmd, None)
944         if func:
945             return func(*args, **kwargs)
946         return None
947
948     def _connect(self, client=None):
949         """Run and connect to prox on the remote system """
950         # De-allocating a large amount of hugepages takes some time. If a new
951         # PROX instance is started immediately after killing the previous one,
952         # it might not be able to allocate hugepages, because they are still
953         # being freed. Hence the -w switch.
954         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
955         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
956         # -f ./handle_none-4.cfg"
957         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
958         #  "; " \
959         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
960         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
961         # sudo " \
962         #    + "./build/Prox " + prox_args
963         # log.debug("Starting PROX with command [%s]", prox_cmd)
964         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
965         # self._ip, prox_cmd))
966         if client is None:
967             client = ProxSocketHelper()
968
969         # try connecting to Prox for 60s
970         for _ in range(RETRY_SECONDS):
971             time.sleep(RETRY_INTERVAL)
972             try:
973                 client.connect(self._ip, PROX_PORT)
974             except (socket.gaierror, socket.error):
975                 continue
976             else:
977                 return client
978
979         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
980         raise Exception(msg.format(self._ip, PROX_PORT))
981
982
983 class ProxDataHelper(object):
984
985     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
986         super(ProxDataHelper, self).__init__()
987         self.vnfd_helper = vnfd_helper
988         self.sut = sut
989         self.pkt_size = pkt_size
990         self.value = value
991         self.tolerated_loss = tolerated_loss
992         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
993         self.tsc_hz = None
994         self.measured_stats = None
995         self.latency = None
996         self._totals_and_pps = None
997         self.result_tuple = None
998
999     @property
1000     def totals_and_pps(self):
1001         if self._totals_and_pps is None:
1002             rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
1003             pps = self.value / 100.0 * self.line_rate_to_pps()
1004             self._totals_and_pps = rx_total, tx_total, pps
1005         return self._totals_and_pps
1006
1007     @property
1008     def rx_total(self):
1009         return self.totals_and_pps[0]
1010
1011     @property
1012     def tx_total(self):
1013         return self.totals_and_pps[1]
1014
1015     @property
1016     def pps(self):
1017         return self.totals_and_pps[2]
1018
1019     @property
1020     def samples(self):
1021         samples = {}
1022         for port_name, port_num in self.vnfd_helper.ports_iter():
1023             try:
1024                 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1025                 samples[port_name] = {
1026                     "in_packets": port_rx_total,
1027                     "out_packets": port_tx_total,
1028                 }
1029             except (KeyError, TypeError, NameError, MemoryError, ValueError,
1030                     SystemError, BufferError):
1031                 samples[port_name] = {
1032                     "in_packets": 0,
1033                     "out_packets": 0,
1034                 }
1035         return samples
1036
1037     def __enter__(self):
1038         self.check_interface_count()
1039         return self
1040
1041     def __exit__(self, exc_type, exc_val, exc_tb):
1042         self.make_tuple()
1043
1044     def make_tuple(self):
1045         if self.result_tuple:
1046             return
1047
1048         self.result_tuple = ProxTestDataTuple(
1049             self.tolerated_loss,
1050             self.tsc_hz,
1051             self.measured_stats['delta'].rx,
1052             self.measured_stats['delta'].tx,
1053             self.measured_stats['delta'].tsc,
1054             self.latency,
1055             self.rx_total,
1056             self.tx_total,
1057             self.pps,
1058         )
1059         self.result_tuple.log_data()
1060
1061     @contextmanager
1062     def measure_tot_stats(self):
1063         with self.sut.measure_tot_stats() as self.measured_stats:
1064             yield
1065
1066     def check_interface_count(self):
1067         # do this assert in init?  unless we expect interface count to
1068         # change from one run to another run...
1069         assert self.port_count in {1, 2, 4}, \
1070             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1071
1072     def capture_tsc_hz(self):
1073         self.tsc_hz = float(self.sut.hz())
1074
1075     def line_rate_to_pps(self):
1076         # NOTE: to fix, don't hardcode 10Gb/s
1077         return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1078
1079
1080 class ProxProfileHelper(object):
1081
1082     __prox_profile_type__ = "Generic"
1083
1084     PROX_CORE_GEN_MODE = "gen"
1085     PROX_CORE_LAT_MODE = "lat"
1086
1087     @classmethod
1088     def get_cls(cls, helper_type):
1089         """Return class of specified type."""
1090         if not helper_type:
1091             return ProxProfileHelper
1092
1093         for profile_helper_class in utils.itersubclasses(cls):
1094             if helper_type == profile_helper_class.__prox_profile_type__:
1095                 return profile_helper_class
1096
1097         return ProxProfileHelper
1098
1099     @classmethod
1100     def make_profile_helper(cls, resource_helper):
1101         return cls.get_cls(resource_helper.test_type)(resource_helper)
1102
1103     def __init__(self, resource_helper):
1104         super(ProxProfileHelper, self).__init__()
1105         self.resource_helper = resource_helper
1106         self._cpu_topology = None
1107         self._test_cores = None
1108         self._latency_cores = None
1109
1110     @property
1111     def cpu_topology(self):
1112         if not self._cpu_topology:
1113             stdout = io.BytesIO()
1114             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1115             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1116         return self._cpu_topology
1117
1118     @property
1119     def test_cores(self):
1120         if not self._test_cores:
1121             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1122         return self._test_cores
1123
1124     @property
1125     def latency_cores(self):
1126         if not self._latency_cores:
1127             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1128         return self._latency_cores
1129
1130     @contextmanager
1131     def traffic_context(self, pkt_size, value):
1132         self.sut.stop_all()
1133         self.sut.reset_stats()
1134         try:
1135             self.sut.set_pkt_size(self.test_cores, pkt_size)
1136             self.sut.set_speed(self.test_cores, value)
1137             self.sut.start_all()
1138             yield
1139         finally:
1140             self.sut.stop_all()
1141
1142     def get_cores(self, mode):
1143         cores = []
1144
1145         for section_name, section in self.setup_helper.prox_config_data:
1146             if not section_name.startswith("core"):
1147                 continue
1148
1149             for key, value in section:
1150                 if key == "mode" and value == mode:
1151                     core_tuple = CoreSocketTuple(section_name)
1152                     core = core_tuple.core_id
1153                     cores.append(core)
1154
1155         return cores
1156
1157     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1158         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1159
1160         with data_helper, self.traffic_context(pkt_size, value):
1161             with data_helper.measure_tot_stats():
1162                 time.sleep(duration)
1163                 # Getting statistics to calculate PPS at right speed....
1164                 data_helper.capture_tsc_hz()
1165                 data_helper.latency = self.get_latency()
1166
1167         return data_helper.result_tuple, data_helper.samples
1168
1169     def get_latency(self):
1170         """
1171         :return: return lat_min, lat_max, lat_avg
1172         :rtype: list
1173         """
1174
1175         if not self._latency_cores:
1176             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1177
1178         if self._latency_cores:
1179             return self.sut.lat_stats(self._latency_cores)
1180         return []
1181
1182     def terminate(self):
1183         pass
1184
1185     def __getattr__(self, item):
1186         return getattr(self.resource_helper, item)
1187
1188
1189 class ProxMplsProfileHelper(ProxProfileHelper):
1190
1191     __prox_profile_type__ = "MPLS tag/untag"
1192
1193     def __init__(self, resource_helper):
1194         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1195         self._cores_tuple = None
1196
1197     @property
1198     def mpls_cores(self):
1199         if not self._cores_tuple:
1200             self._cores_tuple = self.get_cores_mpls()
1201         return self._cores_tuple
1202
1203     @property
1204     def tagged_cores(self):
1205         return self.mpls_cores[0]
1206
1207     @property
1208     def plain_cores(self):
1209         return self.mpls_cores[1]
1210
1211     def get_cores_mpls(self):
1212         cores_tagged = []
1213         cores_plain = []
1214         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1215             if not section_name.startswith("core"):
1216                 continue
1217
1218             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1219                 continue
1220
1221             for item_key, item_value in section:
1222                 if item_key != 'name':
1223                     continue
1224
1225                 if item_value.startswith("tag"):
1226                     core_tuple = CoreSocketTuple(section_name)
1227                     core_tag = core_tuple.core_id
1228                     cores_tagged.append(core_tag)
1229
1230                 elif item_value.startswith("udp"):
1231                     core_tuple = CoreSocketTuple(section_name)
1232                     core_udp = core_tuple.core_id
1233                     cores_plain.append(core_udp)
1234
1235         return cores_tagged, cores_plain
1236
1237     @contextmanager
1238     def traffic_context(self, pkt_size, value):
1239         self.sut.stop_all()
1240         self.sut.reset_stats()
1241         try:
1242             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1243             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1244             self.sut.set_speed(self.tagged_cores, value)
1245             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1246             self.sut.set_speed(self.plain_cores, value * ratio)
1247             self.sut.start_all()
1248             yield
1249         finally:
1250             self.sut.stop_all()
1251
1252
1253 class ProxBngProfileHelper(ProxProfileHelper):
1254
1255     __prox_profile_type__ = "BNG gen"
1256
1257     def __init__(self, resource_helper):
1258         super(ProxBngProfileHelper, self).__init__(resource_helper)
1259         self._cores_tuple = None
1260
1261     @property
1262     def bng_cores(self):
1263         if not self._cores_tuple:
1264             self._cores_tuple = self.get_cores_gen_bng_qos()
1265         return self._cores_tuple
1266
1267     @property
1268     def cpe_cores(self):
1269         return self.bng_cores[0]
1270
1271     @property
1272     def inet_cores(self):
1273         return self.bng_cores[1]
1274
1275     @property
1276     def arp_cores(self):
1277         return self.bng_cores[2]
1278
1279     @property
1280     def arp_task_cores(self):
1281         return self.bng_cores[3]
1282
1283     @property
1284     def all_rx_cores(self):
1285         return self.latency_cores
1286
1287     def get_cores_gen_bng_qos(self):
1288         cpe_cores = []
1289         inet_cores = []
1290         arp_cores = []
1291         arp_tasks_core = [0]
1292         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1293             if not section_name.startswith("core"):
1294                 continue
1295
1296             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1297                 continue
1298
1299             for item_key, item_value in section:
1300                 if item_key != 'name':
1301                     continue
1302
1303                 if item_value.startswith("cpe"):
1304                     core_tuple = CoreSocketTuple(section_name)
1305                     cpe_core = core_tuple.core_id
1306                     cpe_cores.append(cpe_core)
1307
1308                 elif item_value.startswith("inet"):
1309                     core_tuple = CoreSocketTuple(section_name)
1310                     inet_core = core_tuple.core_id
1311                     inet_cores.append(inet_core)
1312
1313                 elif item_value.startswith("arp"):
1314                     core_tuple = CoreSocketTuple(section_name)
1315                     arp_core = core_tuple.core_id
1316                     arp_cores.append(arp_core)
1317
1318                 # We check the tasks/core separately
1319                 if item_value.startswith("arp_task"):
1320                     core_tuple = CoreSocketTuple(section_name)
1321                     arp_task_core = core_tuple.core_id
1322                     arp_tasks_core.append(arp_task_core)
1323
1324         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1325
1326     @contextmanager
1327     def traffic_context(self, pkt_size, value):
1328         # Tester is sending packets at the required speed already after
1329         # setup_test(). Just get the current statistics, sleep the required
1330         # amount of time and calculate packet loss.
1331         inet_pkt_size = pkt_size
1332         cpe_pkt_size = pkt_size - 24
1333         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1334
1335         curr_up_speed = curr_down_speed = 0
1336         max_up_speed = max_down_speed = value
1337         if ratio < 1:
1338             max_down_speed = value * ratio
1339         else:
1340             max_up_speed = value / ratio
1341
1342         # Initialize cores
1343         self.sut.stop_all()
1344         time.sleep(0.5)
1345
1346         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1347         # wrong.
1348         self.sut.start(self.all_rx_cores)
1349         time.sleep(0.5)
1350         self.sut.stop(self.all_rx_cores)
1351         time.sleep(0.5)
1352         self.sut.reset_stats()
1353
1354         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1355         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1356
1357         self.sut.reset_values(self.cpe_cores)
1358         self.sut.reset_values(self.inet_cores)
1359
1360         # Set correct IP and UDP lengths in packet headers
1361         # CPE
1362         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1363         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1364         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1365         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1366
1367         # INET
1368         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1369         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1370         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1371         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1372         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1373         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1374
1375         # Sending ARP to initialize tables - need a few seconds of generation
1376         # to make sure all CPEs are initialized
1377         LOG.info("Initializing SUT: sending ARP packets")
1378         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1379         self.sut.set_speed(self.inet_cores, curr_up_speed)
1380         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1381         self.sut.start(self.arp_cores)
1382         time.sleep(4)
1383
1384         # Ramp up the transmission speed. First go to the common speed, then
1385         # increase steps for the faster one.
1386         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1387
1388         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1389
1390         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1391             # The min(..., ...) takes care of 1) floating point rounding errors
1392             # that could make curr_*_speed to be slightly greater than
1393             # max_*_speed and 2) max_*_speed not being an exact multiple of
1394             # self._step_delta.
1395             if curr_up_speed < max_up_speed:
1396                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1397             if curr_down_speed < max_down_speed:
1398                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1399
1400             self.sut.set_speed(self.inet_cores, curr_up_speed)
1401             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1402             time.sleep(self.step_time)
1403
1404         LOG.info("Target speeds reached. Starting real test.")
1405
1406         yield
1407
1408         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1409         LOG.info("Test ended. Flushing NIC buffers")
1410         self.sut.start(self.all_rx_cores)
1411         time.sleep(3)
1412         self.sut.stop(self.all_rx_cores)
1413
1414     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1415         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1416
1417         with data_helper, self.traffic_context(pkt_size, value):
1418             with data_helper.measure_tot_stats():
1419                 time.sleep(duration)
1420                 # Getting statistics to calculate PPS at right speed....
1421                 data_helper.capture_tsc_hz()
1422                 data_helper.latency = self.get_latency()
1423
1424         return data_helper.result_tuple, data_helper.samples
1425
1426
1427 class ProxVpeProfileHelper(ProxProfileHelper):
1428
1429     __prox_profile_type__ = "vPE gen"
1430
1431     def __init__(self, resource_helper):
1432         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1433         self._cores_tuple = None
1434         self._ports_tuple = None
1435
1436     @property
1437     def vpe_cores(self):
1438         if not self._cores_tuple:
1439             self._cores_tuple = self.get_cores_gen_vpe()
1440         return self._cores_tuple
1441
1442     @property
1443     def cpe_cores(self):
1444         return self.vpe_cores[0]
1445
1446     @property
1447     def inet_cores(self):
1448         return self.vpe_cores[1]
1449
1450     @property
1451     def all_rx_cores(self):
1452         return self.latency_cores
1453
1454     @property
1455     def vpe_ports(self):
1456         if not self._ports_tuple:
1457             self._ports_tuple = self.get_ports_gen_vpe()
1458         return self._ports_tuple
1459
1460     @property
1461     def cpe_ports(self):
1462         return self.vpe_ports[0]
1463
1464     @property
1465     def inet_ports(self):
1466         return self.vpe_ports[1]
1467
1468     def get_cores_gen_vpe(self):
1469         cpe_cores = []
1470         inet_cores = []
1471         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1472             if not section_name.startswith("core"):
1473                 continue
1474
1475             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1476                 continue
1477
1478             for item_key, item_value in section:
1479                 if item_key != 'name':
1480                     continue
1481
1482                 if item_value.startswith("cpe"):
1483                     core_tuple = CoreSocketTuple(section_name)
1484                     core_tag = core_tuple.core_id
1485                     cpe_cores.append(core_tag)
1486
1487                 elif item_value.startswith("inet"):
1488                     core_tuple = CoreSocketTuple(section_name)
1489                     inet_core = core_tuple.core_id
1490                     inet_cores.append(inet_core)
1491
1492         return cpe_cores, inet_cores
1493
1494     def get_ports_gen_vpe(self):
1495         cpe_ports = []
1496         inet_ports = []
1497
1498         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1499             if not section_name.startswith("port"):
1500                 continue
1501             tx_port_iter = re.finditer(r'\d+', section_name)
1502             tx_port_no = int(next(tx_port_iter).group(0))
1503
1504             for item_key, item_value in section:
1505                 if item_key != 'name':
1506                     continue
1507
1508                 if item_value.startswith("cpe"):
1509                     cpe_ports.append(tx_port_no)
1510
1511                 elif item_value.startswith("inet"):
1512                     inet_ports.append(tx_port_no)
1513
1514         return cpe_ports, inet_ports
1515
1516     @contextmanager
1517     def traffic_context(self, pkt_size, value):
1518         # Calculate the target upload and download speed. The upload and
1519         # download packets have different packet sizes, so in order to get
1520         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1521         # of the packet sizes.
1522         cpe_pkt_size = pkt_size
1523         inet_pkt_size = pkt_size - 4
1524         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1525
1526         curr_up_speed = curr_down_speed = 0
1527         max_up_speed = max_down_speed = value
1528         if ratio < 1:
1529             max_down_speed = value * ratio
1530         else:
1531             max_up_speed = value / ratio
1532
1533         # Adjust speed when multiple cores per port are used to generate traffic
1534         if len(self.cpe_ports) != len(self.cpe_cores):
1535             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1536         if len(self.inet_ports) != len(self.inet_cores):
1537             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1538
1539         # Initialize cores
1540         self.sut.stop_all()
1541         time.sleep(2)
1542
1543         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1544         # wrong.
1545         self.sut.start(self.all_rx_cores)
1546         time.sleep(2)
1547         self.sut.stop(self.all_rx_cores)
1548         time.sleep(2)
1549         self.sut.reset_stats()
1550
1551         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1552         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1553
1554         self.sut.reset_values(self.cpe_cores)
1555         self.sut.reset_values(self.inet_cores)
1556
1557         # Set correct IP and UDP lengths in packet headers
1558         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1559         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1560         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1561         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1562
1563         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1564         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1565         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1566         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1567
1568         self.sut.set_speed(self.inet_cores, curr_up_speed)
1569         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1570
1571         # Ramp up the transmission speed. First go to the common speed, then
1572         # increase steps for the faster one.
1573         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1574
1575         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1576
1577         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1578             # The min(..., ...) takes care of 1) floating point rounding errors
1579             # that could make curr_*_speed to be slightly greater than
1580             # max_*_speed and 2) max_*_speed not being an exact multiple of
1581             # self._step_delta.
1582             if curr_up_speed < max_up_speed:
1583                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1584             if curr_down_speed < max_down_speed:
1585                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1586
1587             self.sut.set_speed(self.inet_cores, curr_up_speed)
1588             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1589             time.sleep(self.step_time)
1590
1591         LOG.info("Target speeds reached. Starting real test.")
1592
1593         yield
1594
1595         self.sut.stop(self.cpe_cores + self.inet_cores)
1596         LOG.info("Test ended. Flushing NIC buffers")
1597         self.sut.start(self.all_rx_cores)
1598         time.sleep(3)
1599         self.sut.stop(self.all_rx_cores)
1600
1601     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1602         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1603
1604         with data_helper, self.traffic_context(pkt_size, value):
1605             with data_helper.measure_tot_stats():
1606                 time.sleep(duration)
1607                 # Getting statistics to calculate PPS at right speed....
1608                 data_helper.capture_tsc_hz()
1609                 data_helper.latency = self.get_latency()
1610
1611         return data_helper.result_tuple, data_helper.samples
1612
1613
1614 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1615
1616     __prox_profile_type__ = "lwAFTR gen"
1617
1618     def __init__(self, resource_helper):
1619         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1620         self._cores_tuple = None
1621         self._ports_tuple = None
1622         self.step_delta = 5
1623         self.step_time = 0.5
1624
1625     @property
1626     def _lwaftr_cores(self):
1627         if not self._cores_tuple:
1628             self._cores_tuple = self._get_cores_gen_lwaftr()
1629         return self._cores_tuple
1630
1631     @property
1632     def tun_cores(self):
1633         return self._lwaftr_cores[0]
1634
1635     @property
1636     def inet_cores(self):
1637         return self._lwaftr_cores[1]
1638
1639     @property
1640     def _lwaftr_ports(self):
1641         if not self._ports_tuple:
1642             self._ports_tuple = self._get_ports_gen_lw_aftr()
1643         return self._ports_tuple
1644
1645     @property
1646     def tun_ports(self):
1647         return self._lwaftr_ports[0]
1648
1649     @property
1650     def inet_ports(self):
1651         return self._lwaftr_ports[1]
1652
1653     @property
1654     def all_rx_cores(self):
1655         return self.latency_cores
1656
1657     def _get_cores_gen_lwaftr(self):
1658         tun_cores = []
1659         inet_cores = []
1660         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1661             if not section_name.startswith("core"):
1662                 continue
1663
1664             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1665                 continue
1666
1667             core_tuple = CoreSocketTuple(section_name)
1668             core_tag = core_tuple.core_id
1669             for item_value in (v for k, v in section if k == 'name'):
1670                 if item_value.startswith('tun'):
1671                     tun_cores.append(core_tag)
1672                 elif item_value.startswith('inet'):
1673                     inet_cores.append(core_tag)
1674
1675         return tun_cores, inet_cores
1676
1677     def _get_ports_gen_lw_aftr(self):
1678         tun_ports = []
1679         inet_ports = []
1680
1681         re_port = re.compile(r'port (\d+)')
1682         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1683             match = re_port.search(section_name)
1684             if not match:
1685                 continue
1686
1687             tx_port_no = int(match.group(1))
1688             for item_value in (v for k, v in section if k == 'name'):
1689                 if item_value.startswith('lwB4'):
1690                     tun_ports.append(tx_port_no)
1691                 elif item_value.startswith('inet'):
1692                     inet_ports.append(tx_port_no)
1693
1694         return tun_ports, inet_ports
1695
1696     @staticmethod
1697     def _resize(len1, len2):
1698         if len1 == len2:
1699             return 1.0
1700         return 1.0 * len1 / len2
1701
1702     @contextmanager
1703     def traffic_context(self, pkt_size, value):
1704         # Tester is sending packets at the required speed already after
1705         # setup_test(). Just get the current statistics, sleep the required
1706         # amount of time and calculate packet loss.
1707         tun_pkt_size = pkt_size
1708         inet_pkt_size = pkt_size - 40
1709         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1710
1711         curr_up_speed = curr_down_speed = 0
1712         max_up_speed = max_down_speed = value
1713
1714         max_up_speed = value / ratio
1715
1716         # Adjust speed when multiple cores per port are used to generate traffic
1717         if len(self.tun_ports) != len(self.tun_cores):
1718             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1719         if len(self.inet_ports) != len(self.inet_cores):
1720             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1721
1722         # Initialize cores
1723         self.sut.stop_all()
1724         time.sleep(0.5)
1725
1726         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1727         # wrong.
1728         self.sut.start(self.all_rx_cores)
1729         time.sleep(0.5)
1730         self.sut.stop(self.all_rx_cores)
1731         time.sleep(0.5)
1732         self.sut.reset_stats()
1733
1734         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1735         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1736
1737         self.sut.reset_values(self.tun_cores)
1738         self.sut.reset_values(self.inet_cores)
1739
1740         # Set correct IP and UDP lengths in packet headers
1741         # tun
1742         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1743         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1744         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1745         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1746         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1747         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1748
1749         # INET
1750         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1751         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1752         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1753         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1754
1755         LOG.info("Initializing SUT: sending lwAFTR packets")
1756         self.sut.set_speed(self.inet_cores, curr_up_speed)
1757         self.sut.set_speed(self.tun_cores, curr_down_speed)
1758         time.sleep(4)
1759
1760         # Ramp up the transmission speed. First go to the common speed, then
1761         # increase steps for the faster one.
1762         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1763
1764         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1765
1766         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1767             # The min(..., ...) takes care of 1) floating point rounding errors
1768             # that could make curr_*_speed to be slightly greater than
1769             # max_*_speed and 2) max_*_speed not being an exact multiple of
1770             # self._step_delta.
1771             if curr_up_speed < max_up_speed:
1772                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1773             if curr_down_speed < max_down_speed:
1774                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1775
1776             self.sut.set_speed(self.inet_cores, curr_up_speed)
1777             self.sut.set_speed(self.tun_cores, curr_down_speed)
1778             time.sleep(self.step_time)
1779
1780         LOG.info("Target speeds reached. Starting real test.")
1781
1782         yield
1783
1784         self.sut.stop(self.tun_cores + self.inet_cores)
1785         LOG.info("Test ended. Flushing NIC buffers")
1786         self.sut.start(self.all_rx_cores)
1787         time.sleep(3)
1788         self.sut.stop(self.all_rx_cores)
1789
1790     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1791         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1792
1793         with data_helper, self.traffic_context(pkt_size, value):
1794             with data_helper.measure_tot_stats():
1795                 time.sleep(duration)
1796                 # Getting statistics to calculate PPS at right speed....
1797                 data_helper.capture_tsc_hz()
1798                 data_helper.latency = self.get_latency()
1799
1800         return data_helper.result_tuple, data_helper.samples