Merge "Adding new vFirewall Test cases based on Concurrency, Connections per second...
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import array
16 import io
17 import logging
18 import operator
19 import os
20 import re
21 import select
22 import socket
23 import time
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
28
29 import six
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
32
33 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
34 from yardstick.common import utils
35 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
36 from yardstick.network_services.helpers.iniparser import ConfigParser
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
39
40 PROX_PORT = 8474
41
42 SECTION_NAME = 0
43 SECTION_CONTENTS = 1
44
45 LOG = logging.getLogger(__name__)
46 LOG.setLevel(logging.DEBUG)
47
48 TEN_GIGABIT = 1e10
49 BITS_PER_BYTE = 8
50 RETRY_SECONDS = 60
51 RETRY_INTERVAL = 1
52
53 CONFIGURATION_OPTIONS = (
54     # dict key           section     key               default value
55     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
56     ('testDuration', 'general', 'test_duration', 5.0),
57     ('testPrecision', 'general', 'test_precision', 1.0),
58     ('tests', 'general', 'tests', None),
59     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
60
61     ('logFile', 'logging', 'file', 'dats.log'),
62     ('logDateFormat', 'logging', 'datefmt', None),
63     ('logLevel', 'logging', 'level', 'INFO'),
64     ('logOverwrite', 'logging', 'overwrite', 1),
65
66     ('testerIp', 'tester', 'ip', None),
67     ('testerUser', 'tester', 'user', 'root'),
68     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
69     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
70     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
71     ('testerSocketId', 'tester', 'socket_id', 0),
72
73     ('sutIp', 'sut', 'ip', None),
74     ('sutUser', 'sut', 'user', 'root'),
75     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
76     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
77     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
78     ('sutSocketId', 'sut', 'socket_id', 0),
79 )
80
81
82 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
83     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
84
85     def __new__(cls, *args):
86         try:
87             matches = cls.CORE_RE.search(str(args[0]))
88             if matches:
89                 args = matches.groups()
90
91             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
92                                                        'h' if args[2] else '')
93
94         except (AttributeError, TypeError, IndexError, ValueError):
95             raise ValueError('Invalid core spec {}'.format(args))
96
97     def is_hyperthread(self):
98         return self.hyperthread == 'h'
99
100     @property
101     def index(self):
102         return int(self.is_hyperthread())
103
104     def find_in_topology(self, cpu_topology):
105         try:
106             socket_core_match = cpu_topology[self.socket_id][self.core_id]
107             sorted_match = sorted(socket_core_match.values())
108             return sorted_match[self.index][0]
109         except (KeyError, IndexError):
110             template = "Core {}{} on socket {} does not exist"
111             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
112
113
114 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
115     def __new__(cls, *args):
116         try:
117             assert args[0] is not str(args[0])
118             args = tuple(args[0])
119         except (AssertionError, IndexError, TypeError):
120             pass
121
122         return super(TotStatsTuple, cls).__new__(cls, *args)
123
124
125 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
126                                                         'delta_tx,delta_tsc,'
127                                                         'latency,rx_total,tx_total,pps')):
128     @property
129     def pkt_loss(self):
130         try:
131             return 1e2 * self.drop_total / float(self.tx_total)
132         except ZeroDivisionError:
133             return 100.0
134
135     @property
136     def mpps(self):
137         # calculate the effective throughput in Mpps
138         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
139
140     @property
141     def can_be_lost(self):
142         return int(self.tx_total * self.tolerated / 1e2)
143
144     @property
145     def drop_total(self):
146         return self.tx_total - self.rx_total
147
148     @property
149     def success(self):
150         return self.drop_total <= self.can_be_lost
151
152     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
153         if pkt_loss is None:
154             pkt_loss = self.pkt_loss
155
156         if port_samples is None:
157             port_samples = {}
158
159         latency_keys = [
160             "LatencyMin",
161             "LatencyMax",
162             "LatencyAvg",
163         ]
164
165         samples = {
166             "Throughput": self.mpps,
167             "DropPackets": pkt_loss,
168             "CurrentDropPackets": pkt_loss,
169             "TxThroughput": self.pps / 1e6,
170             "RxThroughput": self.mpps,
171             "PktSize": pkt_size,
172         }
173         if port_samples:
174             samples.update(port_samples)
175
176         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
177         return samples
178
179     def log_data(self, logger=None):
180         if logger is None:
181             logger = LOG
182
183         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
184         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
185         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
186
187
188 class PacketDump(object):
189     @staticmethod
190     def assert_func(func, value1, value2, template=None):
191         assert func(value1, value2), template.format(value1, value2)
192
193     def __init__(self, port_id, data_len, payload):
194         template = "Packet dump has specified length {}, but payload is {} bytes long"
195         self.assert_func(operator.eq, data_len, len(payload), template)
196         self._port_id = port_id
197         self._data_len = data_len
198         self._payload = payload
199
200     @property
201     def port_id(self):
202         """Get the port id of the packet dump"""
203         return self._port_id
204
205     @property
206     def data_len(self):
207         """Get the length of the data received"""
208         return self._data_len
209
210     def __str__(self):
211         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
212
213     def payload(self, start=None, end=None):
214         """Get part of the payload as a list of ordinals.
215
216         Returns a list of byte values, matching the contents of the packet dump.
217         Optional start and end parameters can be specified to retrieve only a
218         part of the packet contents.
219
220         The number of elements in the list is equal to end - start + 1, so end
221         is the offset of the last character.
222
223         Args:
224             start (pos. int): the starting offset in the payload. If it is not
225                 specified or None, offset 0 is assumed.
226             end (pos. int): the ending offset of the payload. If it is not
227                 specified or None, the contents until the end of the packet are
228                 returned.
229
230         Returns:
231             [int, int, ...]. Each int represents the ordinal value of a byte in
232             the packet payload.
233         """
234         if start is None:
235             start = 0
236
237         if end is None:
238             end = self.data_len - 1
239
240         # Bounds checking on offsets
241         template = "Start offset must be non-negative"
242         self.assert_func(operator.ge, start, 0, template)
243
244         template = "End offset must be less than {1}"
245         self.assert_func(operator.lt, end, self.data_len, template)
246
247         # Adjust for splice operation: end offset must be 1 more than the offset
248         # of the last desired character.
249         end += 1
250
251         return self._payload[start:end]
252
253
254 class ProxSocketHelper(object):
255
256     def __init__(self, sock=None):
257         """ creates new prox instance """
258         super(ProxSocketHelper, self).__init__()
259
260         if sock is None:
261             sock = socket.socket()
262
263         self._sock = sock
264         self._pkt_dumps = []
265         self.master_stats = None
266
267     def connect(self, ip, port):
268         """Connect to the prox instance on the remote system"""
269         self._sock.connect((ip, port))
270
271     def get_socket(self):
272         """ get the socket connected to the remote instance """
273         return self._sock
274
275     def _parse_socket_data(self, decoded_data, pkt_dump_only):
276         def get_newline_index():
277             return decoded_data.find('\n', index)
278
279         ret_str = ''
280         index = 0
281         for newline_index in iter(get_newline_index, -1):
282             ret_str = decoded_data[index:newline_index]
283
284             try:
285                 mode, port_id, data_len = ret_str.split(',', 2)
286             except ValueError:
287                 mode, port_id, data_len = None, None, None
288
289             if mode != 'pktdump':
290                 # Regular 1-line message. Stop reading from the socket.
291                 LOG.debug("Regular response read")
292                 return ret_str
293
294             LOG.debug("Packet dump header read: [%s]", ret_str)
295
296             # The line is a packet dump header. Parse it, read the
297             # packet payload, store the dump for later retrieval.
298             # Skip over the packet dump and continue processing: a
299             # 1-line response may follow the packet dump.
300
301             data_len = int(data_len)
302             data_start = newline_index + 1  # + 1 to skip over \n
303             data_end = data_start + data_len
304             sub_data = decoded_data[data_start:data_end]
305             pkt_payload = array.array('B', (ord(v) for v in sub_data))
306             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
307             self._pkt_dumps.append(pkt_dump)
308
309             if pkt_dump_only:
310                 # Return boolean instead of string to signal
311                 # successful reception of the packet dump.
312                 LOG.debug("Packet dump stored, returning")
313                 return True
314
315             index = data_end + 1
316
317         return ret_str
318
319     def get_data(self, pkt_dump_only=False, timeout=1):
320         """ read data from the socket """
321
322         # This method behaves slightly differently depending on whether it is
323         # called to read the response to a command (pkt_dump_only = 0) or if
324         # it is called specifically to read a packet dump (pkt_dump_only = 1).
325         #
326         # Packet dumps look like:
327         #   pktdump,<port_id>,<data_len>\n
328         #   <packet contents as byte array>\n
329         # This means the total packet dump message consists of 2 lines instead
330         # of 1 line.
331         #
332         # - Response for a command (pkt_dump_only = 0):
333         #   1) Read response from the socket until \n (end of message)
334         #   2a) If the response is a packet dump header (starts with "pktdump,"):
335         #     - Read the packet payload and store the packet dump for later
336         #       retrieval.
337         #     - Reset the state and restart from 1). Eventually state 2b) will
338         #       be reached and the function will return.
339         #   2b) If the response is not a packet dump:
340         #     - Return the received message as a string
341         #
342         # - Explicit request to read a packet dump (pkt_dump_only = 1):
343         #   - Read the dump header and payload
344         #   - Store the packet dump for later retrieval
345         #   - Return True to signify a packet dump was successfully read
346
347         def is_ready():
348             # recv() is blocking, so avoid calling it when no data is waiting.
349             ready = select.select([self._sock], [], [], timeout)
350             return bool(ready[0])
351
352         status = False
353         ret_str = ""
354         for status in iter(is_ready, False):
355             decoded_data = self._sock.recv(256).decode('utf-8')
356             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
357
358         LOG.debug("Received data from socket: [%s]", ret_str)
359         return ret_str if status else ''
360
361     def put_command(self, to_send):
362         """ send data to the remote instance """
363         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
364         try:
365             # NOTE: sendall will block, we need a timeout
366             self._sock.sendall(to_send.encode('utf-8'))
367         except:  # pylint: disable=bare-except
368             pass
369
370     def get_packet_dump(self):
371         """ get the next packet dump """
372         if self._pkt_dumps:
373             return self._pkt_dumps.pop(0)
374         return None
375
376     def stop_all_reset(self):
377         """ stop the remote instance and reset stats """
378         LOG.debug("Stop all and reset stats")
379         self.stop_all()
380         self.reset_stats()
381
382     def stop_all(self):
383         """ stop all cores on the remote instance """
384         LOG.debug("Stop all")
385         self.put_command("stop all\n")
386         time.sleep(3)
387
388     def stop(self, cores, task=''):
389         """ stop specific cores on the remote instance """
390         LOG.debug("Stopping cores %s", cores)
391         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
392         time.sleep(3)
393
394     def start_all(self):
395         """ start all cores on the remote instance """
396         LOG.debug("Start all")
397         self.put_command("start all\n")
398
399     def start(self, cores):
400         """ start specific cores on the remote instance """
401         LOG.debug("Starting cores %s", cores)
402         self.put_command("start {}\n".format(join_non_strings(',', cores)))
403         time.sleep(3)
404
405     def reset_stats(self):
406         """ reset the statistics on the remote instance """
407         LOG.debug("Reset stats")
408         self.put_command("reset stats\n")
409         time.sleep(1)
410
411     def _run_template_over_cores(self, template, cores, *args):
412         for core in cores:
413             self.put_command(template.format(core, *args))
414
415     def set_pkt_size(self, cores, pkt_size):
416         """ set the packet size to generate on the remote instance """
417         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
418         pkt_size -= 4
419         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
420         time.sleep(1)
421
422     def set_value(self, cores, offset, value, length):
423         """ set value on the remote instance """
424         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
425         LOG.debug(msg, cores, value, length, offset)
426         template = "set value {} 0 {} {} {}\n"
427         self._run_template_over_cores(template, cores, offset, value, length)
428
429     def reset_values(self, cores):
430         """ reset values on the remote instance """
431         LOG.debug("Set value for core(s) %s", cores)
432         self._run_template_over_cores("reset values {} 0\n", cores)
433
434     def set_speed(self, cores, speed, tasks=None):
435         """ set speed on the remote instance """
436         if tasks is None:
437             tasks = [0] * len(cores)
438         elif len(tasks) != len(cores):
439             LOG.error("set_speed: cores and tasks must have the same len")
440         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
441         for (core, task) in list(zip(cores, tasks)):
442             self.put_command("speed {} {} {}\n".format(core, task, speed))
443
444     def slope_speed(self, cores_speed, duration, n_steps=0):
445         """will start to increase speed from 0 to N where N is taken from
446         a['speed'] for each a in cores_speed"""
447         # by default, each step will take 0.5 sec
448         if n_steps == 0:
449             n_steps = duration * 2
450
451         private_core_data = []
452         step_duration = float(duration) / n_steps
453         for core_data in cores_speed:
454             target = float(core_data['speed'])
455             private_core_data.append({
456                 'cores': core_data['cores'],
457                 'zero': 0,
458                 'delta': target / n_steps,
459                 'current': 0,
460                 'speed': target,
461             })
462
463         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
464         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
465             time.sleep(step_duration)
466             for core_data in private_core_data:
467                 core_data['current'] = core_data[key1] + core_data[key2]
468                 self.set_speed(core_data['cores'], core_data['current'])
469
470     def set_pps(self, cores, pps, pkt_size):
471         """ set packets per second for specific cores on the remote instance """
472         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
473         LOG.debug(msg, cores, pps, pkt_size)
474
475         # speed in percent of line-rate
476         speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
477         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
478
479     def lat_stats(self, cores, task=0):
480         """Get the latency statistics from the remote system"""
481         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
482         lat_min = {}
483         lat_max = {}
484         lat_avg = {}
485         for core in cores:
486             self.put_command("lat stats {} {} \n".format(core, task))
487             ret = self.get_data()
488
489             try:
490                 lat_min[core], lat_max[core], lat_avg[core] = \
491                     tuple(int(n) for n in ret.split(",")[:3])
492
493             except (AttributeError, ValueError, TypeError):
494                 pass
495
496         return lat_min, lat_max, lat_avg
497
498     def get_all_tot_stats(self):
499         self.put_command("tot stats\n")
500         all_stats_str = self.get_data().split(",")
501         if len(all_stats_str) != 4:
502             all_stats = [0] * 4
503             return all_stats
504         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
505         self.master_stats = all_stats
506         return all_stats
507
508     def hz(self):
509         return self.get_all_tot_stats()[3]
510
511     def core_stats(self, cores, task=0):
512         """Get the receive statistics from the remote system"""
513         rx = tx = drop = tsc = 0
514         for core in cores:
515             self.put_command("core stats {} {}\n".format(core, task))
516             ret = self.get_data().split(",")
517             rx += int(ret[0])
518             tx += int(ret[1])
519             drop += int(ret[2])
520             tsc = int(ret[3])
521         return rx, tx, drop, tsc
522
523     def port_stats(self, ports):
524         """get counter values from a specific port"""
525         tot_result = [0] * 12
526         for port in ports:
527             self.put_command("port_stats {}\n".format(port))
528             ret = [try_int(s, 0) for s in self.get_data().split(",")]
529             tot_result = [sum(x) for x in zip(tot_result, ret)]
530         return tot_result
531
532     @contextmanager
533     def measure_tot_stats(self):
534         start = self.get_all_tot_stats()
535         container = {'start_tot': start}
536         try:
537             yield container
538         finally:
539             container['end_tot'] = end = self.get_all_tot_stats()
540
541         container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
542
543     def tot_stats(self):
544         """Get the total statistics from the remote system"""
545         stats = self.get_all_tot_stats()
546         return stats[:3]
547
548     def tot_ierrors(self):
549         """Get the total ierrors from the remote system"""
550         self.put_command("tot ierrors tot\n")
551         recv = self.get_data().split(',')
552         tot_ierrors = int(recv[0])
553         tsc = int(recv[0])
554         return tot_ierrors, tsc
555
556     def set_count(self, count, cores):
557         """Set the number of packets to send on the specified core"""
558         self._run_template_over_cores("count {} 0 {}\n", cores, count)
559
560     def dump_rx(self, core_id, task_id=0, count=1):
561         """Activate dump on rx on the specified core"""
562         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
563         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
564         time.sleep(1.5)  # Give PROX time to set up packet dumping
565
566     def quit(self):
567         self.stop_all()
568         self._quit()
569         self.force_quit()
570
571     def _quit(self):
572         """ stop all cores on the remote instance """
573         LOG.debug("Quit prox")
574         self.put_command("quit\n")
575         time.sleep(3)
576
577     def force_quit(self):
578         """ stop all cores on the remote instance """
579         LOG.debug("Force Quit prox")
580         self.put_command("quit_force\n")
581         time.sleep(3)
582
583
584 _LOCAL_OBJECT = object()
585
586
587 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
588     # the actual app is lowercase
589     APP_NAME = 'prox'
590     # not used for Prox but added for consistency
591     VNF_TYPE = "PROX"
592
593     LUA_PARAMETER_NAME = ""
594     LUA_PARAMETER_PEER = {
595         "gen": "sut",
596         "sut": "gen",
597     }
598
599     CONFIG_QUEUE_TIMEOUT = 120
600
601     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
602         self.remote_path = None
603         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
604         self.remote_prox_file_name = None
605         self._prox_config_data = None
606         self.additional_files = {}
607         self.config_queue = Queue()
608         # allow_exit_without_flush
609         self.config_queue.cancel_join_thread()
610         self._global_section = None
611
612     @property
613     def prox_config_data(self):
614         if self._prox_config_data is None:
615             # this will block, but it needs too
616             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
617         return self._prox_config_data
618
619     @property
620     def global_section(self):
621         if self._global_section is None and self.prox_config_data:
622             self._global_section = self.find_section("global")
623         return self._global_section
624
625     def find_section(self, name, default=_LOCAL_OBJECT):
626         result = next((value for key, value in self.prox_config_data if key == name), default)
627         if result is _LOCAL_OBJECT:
628             raise KeyError('{} not found in Prox config'.format(name))
629         return result
630
631     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
632         section = self.find_section(section_name, [])
633         result = next((value for key, value in section if key == section_key), default)
634         if result is _LOCAL_OBJECT:
635             template = '{} not found in {} section of Prox config'
636             raise KeyError(template.format(section_key, section_name))
637         return result
638
639     def copy_to_target(self, config_file_path, prox_file):
640         remote_path = os.path.join("/tmp", prox_file)
641         self.ssh_helper.put(config_file_path, remote_path)
642         return remote_path
643
644     @staticmethod
645     def _get_tx_port(section, sections):
646         iface_port = [-1]
647         for item in sections[section]:
648             if item[0] == "tx port":
649                 iface_port = re.findall(r'\d+', item[1])
650                 # do we want the last one?
651                 #   if yes, then can we reverse?
652         return int(iface_port[0])
653
654     @staticmethod
655     def _replace_quoted_with_value(quoted, value, count=1):
656         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
657         return new_string
658
659     def _insert_additional_file(self, value):
660         file_str = value.split('"')
661         base_name = os.path.basename(file_str[1])
662         file_str[1] = self.additional_files[base_name]
663         return '"'.join(file_str)
664
665     def generate_prox_config_file(self, config_path):
666         sections = []
667         prox_config = ConfigParser(config_path, sections)
668         prox_config.parse()
669
670         # Ensure MAC is set "hardware"
671         all_ports = self.vnfd_helper.port_pairs.all_ports
672         # use dpdk port number
673         for port_name in all_ports:
674             port_num = self.vnfd_helper.port_num(port_name)
675             port_section_name = "port {}".format(port_num)
676             for section_name, section in sections:
677                 if port_section_name != section_name:
678                     continue
679
680                 for section_data in section:
681                     if section_data[0] == "mac":
682                         section_data[1] = "hardware"
683
684         # search for dst mac
685         for _, section in sections:
686             for section_data in section:
687                 item_key, item_val = section_data
688                 if item_val.startswith("@@dst_mac"):
689                     tx_port_iter = re.finditer(r'\d+', item_val)
690                     tx_port_no = int(next(tx_port_iter).group(0))
691                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
692                     mac = intf["virtual-interface"]["dst_mac"]
693                     section_data[1] = mac.replace(":", " ", 6)
694
695                 if item_key == "dst mac" and item_val.startswith("@@"):
696                     tx_port_iter = re.finditer(r'\d+', item_val)
697                     tx_port_no = int(next(tx_port_iter).group(0))
698                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
699                     mac = intf["virtual-interface"]["dst_mac"]
700                     section_data[1] = mac
701
702         # if addition file specified in prox config
703         if not self.additional_files:
704             return sections
705
706         for section_name, section in sections:
707             for section_data in section:
708                 try:
709                     if section_data[0].startswith("dofile"):
710                         section_data[0] = self._insert_additional_file(section_data[0])
711
712                     if section_data[1].startswith("dofile"):
713                         section_data[1] = self._insert_additional_file(section_data[1])
714                 except:  # pylint: disable=bare-except
715                     pass
716
717         return sections
718
719     @staticmethod
720     def write_prox_lua(lua_config):
721         """
722         Write an .ini-format config file for PROX (parameters.lua)
723         PROX does not allow a space before/after the =, so we need
724         a custom method
725         """
726         out = []
727         for key in lua_config:
728             value = '"' + lua_config[key] + '"'
729             if key == "__name__":
730                 continue
731             if value is not None and value != '@':
732                 key = "=".join((key, str(value).replace('\n', '\n\t')))
733                 out.append(key)
734             else:
735                 key = str(key).replace('\n', '\n\t')
736                 out.append(key)
737         return os.linesep.join(out)
738
739     @staticmethod
740     def write_prox_config(prox_config):
741         """
742         Write an .ini-format config file for PROX
743         PROX does not allow a space before/after the =, so we need
744         a custom method
745         """
746         out = []
747         for (section_name, section) in prox_config:
748             out.append("[{}]".format(section_name))
749             for item in section:
750                 key, value = item
751                 if key == "__name__":
752                     continue
753                 if value is not None and value != '@':
754                     key = "=".join((key, str(value).replace('\n', '\n\t')))
755                     out.append(key)
756                 else:
757                     key = str(key).replace('\n', '\n\t')
758                     out.append(key)
759         return os.linesep.join(out)
760
761     def put_string_to_file(self, s, remote_path):
762         file_obj = cStringIO(s)
763         self.ssh_helper.put_file_obj(file_obj, remote_path)
764         return remote_path
765
766     def generate_prox_lua_file(self):
767         p = OrderedDict()
768         all_ports = self.vnfd_helper.port_pairs.all_ports
769         for port_name in all_ports:
770             port_num = self.vnfd_helper.port_num(port_name)
771             intf = self.vnfd_helper.find_interface(name=port_name)
772             vintf = intf['virtual-interface']
773             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
774             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
775
776         return p
777
778     def upload_prox_lua(self, config_file, lua_data):
779         # prox can't handle spaces around ' = ' so use custom method
780         out = StringIO(self.write_prox_lua(lua_data))
781         out.seek(0)
782         remote_path = os.path.join("/tmp", config_file)
783         self.ssh_helper.put_file_obj(out, remote_path)
784
785         return remote_path
786
787     def upload_prox_config(self, config_file, prox_config_data):
788         # prox can't handle spaces around ' = ' so use custom method
789         out = StringIO(self.write_prox_config(prox_config_data))
790         out.seek(0)
791         remote_path = os.path.join("/tmp", config_file)
792         self.ssh_helper.put_file_obj(out, remote_path)
793
794         return remote_path
795
796     def build_config_file(self):
797         task_path = self.scenario_helper.task_path
798         options = self.scenario_helper.options
799         config_path = options['prox_config']
800         config_file = os.path.basename(config_path)
801         config_path = find_relative_file(config_path, task_path)
802         self.additional_files = {}
803
804         try:
805             if options['prox_generate_parameter']:
806                 self.lua = []
807                 self.lua = self.generate_prox_lua_file()
808                 if len(self.lua) > 0:
809                     self.upload_prox_lua("parameters.lua", self.lua)
810         except:  # pylint: disable=bare-except
811             pass
812
813         prox_files = options.get('prox_files', [])
814         if isinstance(prox_files, six.string_types):
815             prox_files = [prox_files]
816         for key_prox_file in prox_files:
817             base_prox_file = os.path.basename(key_prox_file)
818             key_prox_path = find_relative_file(key_prox_file, task_path)
819             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
820             self.additional_files[base_prox_file] = remote_prox_file
821
822         self._prox_config_data = self.generate_prox_config_file(config_path)
823         # copy config to queue so we can read it from traffic_runner process
824         self.config_queue.put(self._prox_config_data)
825         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
826
827     def build_config(self):
828         self.build_config_file()
829
830         options = self.scenario_helper.options
831         prox_args = options['prox_args']
832         tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
833
834         self.pipeline_kwargs = {
835             'tool_path': tool_path,
836             'tool_dir': os.path.dirname(tool_path),
837             'cfg_file': self.remote_path,
838             'args': ' '.join(' '.join([str(k), str(v) if v else ''])
839                              for k, v in prox_args.items())
840         }
841
842         cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
843                         "{args} -f {cfg_file} '")
844         return cmd_template.format(**self.pipeline_kwargs)
845
846
847 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
848 class ProxResourceHelper(ClientResourceHelper):
849
850     RESOURCE_WORD = 'prox'
851
852     PROX_MODE = ""
853
854     WAIT_TIME = 3
855
856     @staticmethod
857     def find_pci(pci, bound_pci):
858         # we have to substring match PCI bus address from the end
859         return any(b.endswith(pci) for b in bound_pci)
860
861     def __init__(self, setup_helper):
862         super(ProxResourceHelper, self).__init__(setup_helper)
863         self.mgmt_interface = self.vnfd_helper.mgmt_interface
864         self._user = self.mgmt_interface["user"]
865         self._ip = self.mgmt_interface["ip"]
866
867         self.done = False
868         self._vpci_to_if_name_map = None
869         self.additional_file = {}
870         self.remote_prox_file_name = None
871         self.lower = None
872         self.upper = None
873         self.step_delta = 1
874         self.step_time = 0.5
875         self._test_type = None
876
877     @property
878     def sut(self):
879         if not self.client:
880             self.client = self._connect()
881         return self.client
882
883     @property
884     def test_type(self):
885         if self._test_type is None:
886             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
887         return self._test_type
888
889     def run_traffic(self, traffic_profile):
890         self._queue.cancel_join_thread()
891         self.lower = 0.0
892         self.upper = 100.0
893
894         traffic_profile.init(self._queue)
895         # this frees up the run_traffic loop
896         self.client_started.value = 1
897
898         while not self._terminated.value:
899             # move it all to traffic_profile
900             self._run_traffic_once(traffic_profile)
901
902     def _run_traffic_once(self, traffic_profile):
903         traffic_profile.execute_traffic(self)
904         if traffic_profile.done:
905             self._queue.put({'done': True})
906             LOG.debug("tg_prox done")
907             self._terminated.value = 1
908
909     # For VNF use ResourceHelper method to collect KPIs directly.
910     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
911     def collect_collectd_kpi(self):
912         return self._collect_resource_kpi()
913
914     def collect_kpi(self):
915         result = super(ProxResourceHelper, self).collect_kpi()
916         # add in collectd kpis manually
917         if result:
918             result['collect_stats'] = self._collect_resource_kpi()
919         return result
920
921     def terminate(self):
922         # should not be called, use VNF terminate
923         raise NotImplementedError()
924
925     def up_post(self):
926         return self.sut  # force connection
927
928     def execute(self, cmd, *args, **kwargs):
929         func = getattr(self.sut, cmd, None)
930         if func:
931             return func(*args, **kwargs)
932
933     def _connect(self, client=None):
934         """Run and connect to prox on the remote system """
935         # De-allocating a large amount of hugepages takes some time. If a new
936         # PROX instance is started immediately after killing the previous one,
937         # it might not be able to allocate hugepages, because they are still
938         # being freed. Hence the -w switch.
939         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
940         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
941         # -f ./handle_none-4.cfg"
942         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
943         #  "; " \
944         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
945         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
946         # sudo " \
947         #    + "./build/Prox " + prox_args
948         # log.debug("Starting PROX with command [%s]", prox_cmd)
949         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
950         # self._ip, prox_cmd))
951         if client is None:
952             client = ProxSocketHelper()
953
954         # try connecting to Prox for 60s
955         for _ in range(RETRY_SECONDS):
956             time.sleep(RETRY_INTERVAL)
957             try:
958                 client.connect(self._ip, PROX_PORT)
959             except (socket.gaierror, socket.error):
960                 continue
961             else:
962                 return client
963
964         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
965         raise Exception(msg.format(self._ip, PROX_PORT))
966
967
968 class ProxDataHelper(object):
969
970     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
971         super(ProxDataHelper, self).__init__()
972         self.vnfd_helper = vnfd_helper
973         self.sut = sut
974         self.pkt_size = pkt_size
975         self.value = value
976         self.tolerated_loss = tolerated_loss
977         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
978         self.tsc_hz = None
979         self.measured_stats = None
980         self.latency = None
981         self._totals_and_pps = None
982         self.result_tuple = None
983
984     @property
985     def totals_and_pps(self):
986         if self._totals_and_pps is None:
987             rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
988             pps = self.value / 100.0 * self.line_rate_to_pps()
989             self._totals_and_pps = rx_total, tx_total, pps
990         return self._totals_and_pps
991
992     @property
993     def rx_total(self):
994         return self.totals_and_pps[0]
995
996     @property
997     def tx_total(self):
998         return self.totals_and_pps[1]
999
1000     @property
1001     def pps(self):
1002         return self.totals_and_pps[2]
1003
1004     @property
1005     def samples(self):
1006         samples = {}
1007         for port_name, port_num in self.vnfd_helper.ports_iter():
1008             port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1009             samples[port_name] = {
1010                 "in_packets": port_rx_total,
1011                 "out_packets": port_tx_total,
1012             }
1013         return samples
1014
1015     def __enter__(self):
1016         self.check_interface_count()
1017         return self
1018
1019     def __exit__(self, exc_type, exc_val, exc_tb):
1020         self.make_tuple()
1021
1022     def make_tuple(self):
1023         if self.result_tuple:
1024             return
1025
1026         self.result_tuple = ProxTestDataTuple(
1027             self.tolerated_loss,
1028             self.tsc_hz,
1029             self.measured_stats['delta'].rx,
1030             self.measured_stats['delta'].tx,
1031             self.measured_stats['delta'].tsc,
1032             self.latency,
1033             self.rx_total,
1034             self.tx_total,
1035             self.pps,
1036         )
1037         self.result_tuple.log_data()
1038
1039     @contextmanager
1040     def measure_tot_stats(self):
1041         with self.sut.measure_tot_stats() as self.measured_stats:
1042             yield
1043
1044     def check_interface_count(self):
1045         # do this assert in init?  unless we expect interface count to
1046         # change from one run to another run...
1047         assert self.port_count in {1, 2, 4}, \
1048             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1049
1050     def capture_tsc_hz(self):
1051         self.tsc_hz = float(self.sut.hz())
1052
1053     def line_rate_to_pps(self):
1054         # NOTE: to fix, don't hardcode 10Gb/s
1055         return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1056
1057
1058 class ProxProfileHelper(object):
1059
1060     __prox_profile_type__ = "Generic"
1061
1062     PROX_CORE_GEN_MODE = "gen"
1063     PROX_CORE_LAT_MODE = "lat"
1064
1065     @classmethod
1066     def get_cls(cls, helper_type):
1067         """Return class of specified type."""
1068         if not helper_type:
1069             return ProxProfileHelper
1070
1071         for profile_helper_class in utils.itersubclasses(cls):
1072             if helper_type == profile_helper_class.__prox_profile_type__:
1073                 return profile_helper_class
1074
1075         return ProxProfileHelper
1076
1077     @classmethod
1078     def make_profile_helper(cls, resource_helper):
1079         return cls.get_cls(resource_helper.test_type)(resource_helper)
1080
1081     def __init__(self, resource_helper):
1082         super(ProxProfileHelper, self).__init__()
1083         self.resource_helper = resource_helper
1084         self._cpu_topology = None
1085         self._test_cores = None
1086         self._latency_cores = None
1087
1088     @property
1089     def cpu_topology(self):
1090         if not self._cpu_topology:
1091             stdout = io.BytesIO()
1092             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1093             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1094         return self._cpu_topology
1095
1096     @property
1097     def test_cores(self):
1098         if not self._test_cores:
1099             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1100         return self._test_cores
1101
1102     @property
1103     def latency_cores(self):
1104         if not self._latency_cores:
1105             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1106         return self._latency_cores
1107
1108     @contextmanager
1109     def traffic_context(self, pkt_size, value):
1110         self.sut.stop_all()
1111         self.sut.reset_stats()
1112         try:
1113             self.sut.set_pkt_size(self.test_cores, pkt_size)
1114             self.sut.set_speed(self.test_cores, value)
1115             self.sut.start_all()
1116             yield
1117         finally:
1118             self.sut.stop_all()
1119
1120     def get_cores(self, mode):
1121         cores = []
1122
1123         for section_name, section in self.setup_helper.prox_config_data:
1124             if not section_name.startswith("core"):
1125                 continue
1126
1127             for key, value in section:
1128                 if key == "mode" and value == mode:
1129                     core_tuple = CoreSocketTuple(section_name)
1130                     core = core_tuple.find_in_topology(self.cpu_topology)
1131                     cores.append(core)
1132
1133         return cores
1134
1135     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1136         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1137
1138         with data_helper, self.traffic_context(pkt_size, value):
1139             with data_helper.measure_tot_stats():
1140                 time.sleep(duration)
1141                 # Getting statistics to calculate PPS at right speed....
1142                 data_helper.capture_tsc_hz()
1143                 data_helper.latency = self.get_latency()
1144
1145         return data_helper.result_tuple, data_helper.samples
1146
1147     def get_latency(self):
1148         """
1149         :return: return lat_min, lat_max, lat_avg
1150         :rtype: list
1151         """
1152         if self._latency_cores:
1153             return self.sut.lat_stats(self._latency_cores)
1154         return []
1155
1156     def terminate(self):
1157         pass
1158
1159     def __getattr__(self, item):
1160         return getattr(self.resource_helper, item)
1161
1162
1163 class ProxMplsProfileHelper(ProxProfileHelper):
1164
1165     __prox_profile_type__ = "MPLS tag/untag"
1166
1167     def __init__(self, resource_helper):
1168         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1169         self._cores_tuple = None
1170
1171     @property
1172     def mpls_cores(self):
1173         if not self._cores_tuple:
1174             self._cores_tuple = self.get_cores_mpls()
1175         return self._cores_tuple
1176
1177     @property
1178     def tagged_cores(self):
1179         return self.mpls_cores[0]
1180
1181     @property
1182     def plain_cores(self):
1183         return self.mpls_cores[1]
1184
1185     def get_cores_mpls(self):
1186         cores_tagged = []
1187         cores_plain = []
1188         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1189             if not section_name.startswith("core"):
1190                 continue
1191
1192             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1193                 continue
1194
1195             for item_key, item_value in section:
1196                 if item_key != 'name':
1197                     continue
1198
1199                 if item_value.startswith("tag"):
1200                     core_tuple = CoreSocketTuple(section_name)
1201                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1202                     cores_tagged.append(core_tag)
1203
1204                 elif item_value.startswith("udp"):
1205                     core_tuple = CoreSocketTuple(section_name)
1206                     core_udp = core_tuple.find_in_topology(self.cpu_topology)
1207                     cores_plain.append(core_udp)
1208
1209         return cores_tagged, cores_plain
1210
1211     @contextmanager
1212     def traffic_context(self, pkt_size, value):
1213         self.sut.stop_all()
1214         self.sut.reset_stats()
1215         try:
1216             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1217             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1218             self.sut.set_speed(self.tagged_cores, value)
1219             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1220             self.sut.set_speed(self.plain_cores, value * ratio)
1221             self.sut.start_all()
1222             yield
1223         finally:
1224             self.sut.stop_all()
1225
1226
1227 class ProxBngProfileHelper(ProxProfileHelper):
1228
1229     __prox_profile_type__ = "BNG gen"
1230
1231     def __init__(self, resource_helper):
1232         super(ProxBngProfileHelper, self).__init__(resource_helper)
1233         self._cores_tuple = None
1234
1235     @property
1236     def bng_cores(self):
1237         if not self._cores_tuple:
1238             self._cores_tuple = self.get_cores_gen_bng_qos()
1239         return self._cores_tuple
1240
1241     @property
1242     def cpe_cores(self):
1243         return self.bng_cores[0]
1244
1245     @property
1246     def inet_cores(self):
1247         return self.bng_cores[1]
1248
1249     @property
1250     def arp_cores(self):
1251         return self.bng_cores[2]
1252
1253     @property
1254     def arp_task_cores(self):
1255         return self.bng_cores[3]
1256
1257     @property
1258     def all_rx_cores(self):
1259         return self.latency_cores
1260
1261     def get_cores_gen_bng_qos(self):
1262         cpe_cores = []
1263         inet_cores = []
1264         arp_cores = []
1265         arp_tasks_core = [0]
1266         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1267             if not section_name.startswith("core"):
1268                 continue
1269
1270             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1271                 continue
1272
1273             for item_key, item_value in section:
1274                 if item_key != 'name':
1275                     continue
1276
1277                 if item_value.startswith("cpe"):
1278                     core_tuple = CoreSocketTuple(section_name)
1279                     cpe_core = core_tuple.find_in_topology(self.cpu_topology)
1280                     cpe_cores.append(cpe_core)
1281
1282                 elif item_value.startswith("inet"):
1283                     core_tuple = CoreSocketTuple(section_name)
1284                     inet_core = core_tuple.find_in_topology(self.cpu_topology)
1285                     inet_cores.append(inet_core)
1286
1287                 elif item_value.startswith("arp"):
1288                     core_tuple = CoreSocketTuple(section_name)
1289                     arp_core = core_tuple.find_in_topology(self.cpu_topology)
1290                     arp_cores.append(arp_core)
1291
1292                 # We check the tasks/core separately
1293                 if item_value.startswith("arp_task"):
1294                     core_tuple = CoreSocketTuple(section_name)
1295                     arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
1296                     arp_tasks_core.append(arp_task_core)
1297
1298         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1299
1300     @contextmanager
1301     def traffic_context(self, pkt_size, value):
1302         # Tester is sending packets at the required speed already after
1303         # setup_test(). Just get the current statistics, sleep the required
1304         # amount of time and calculate packet loss.
1305         inet_pkt_size = pkt_size
1306         cpe_pkt_size = pkt_size - 24
1307         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1308
1309         curr_up_speed = curr_down_speed = 0
1310         max_up_speed = max_down_speed = value
1311         if ratio < 1:
1312             max_down_speed = value * ratio
1313         else:
1314             max_up_speed = value / ratio
1315
1316         # Initialize cores
1317         self.sut.stop_all()
1318         time.sleep(0.5)
1319
1320         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1321         # wrong.
1322         self.sut.start(self.all_rx_cores)
1323         time.sleep(0.5)
1324         self.sut.stop(self.all_rx_cores)
1325         time.sleep(0.5)
1326         self.sut.reset_stats()
1327
1328         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1329         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1330
1331         self.sut.reset_values(self.cpe_cores)
1332         self.sut.reset_values(self.inet_cores)
1333
1334         # Set correct IP and UDP lengths in packet headers
1335         # CPE
1336         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1337         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1338         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1339         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1340
1341         # INET
1342         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1343         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1344         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1345         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1346         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1347         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1348
1349         # Sending ARP to initialize tables - need a few seconds of generation
1350         # to make sure all CPEs are initialized
1351         LOG.info("Initializing SUT: sending ARP packets")
1352         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1353         self.sut.set_speed(self.inet_cores, curr_up_speed)
1354         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1355         self.sut.start(self.arp_cores)
1356         time.sleep(4)
1357
1358         # Ramp up the transmission speed. First go to the common speed, then
1359         # increase steps for the faster one.
1360         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1361
1362         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1363
1364         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1365             # The min(..., ...) takes care of 1) floating point rounding errors
1366             # that could make curr_*_speed to be slightly greater than
1367             # max_*_speed and 2) max_*_speed not being an exact multiple of
1368             # self._step_delta.
1369             if curr_up_speed < max_up_speed:
1370                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1371             if curr_down_speed < max_down_speed:
1372                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1373
1374             self.sut.set_speed(self.inet_cores, curr_up_speed)
1375             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1376             time.sleep(self.step_time)
1377
1378         LOG.info("Target speeds reached. Starting real test.")
1379
1380         yield
1381
1382         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1383         LOG.info("Test ended. Flushing NIC buffers")
1384         self.sut.start(self.all_rx_cores)
1385         time.sleep(3)
1386         self.sut.stop(self.all_rx_cores)
1387
1388     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1389         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1390
1391         with data_helper, self.traffic_context(pkt_size, value):
1392             with data_helper.measure_tot_stats():
1393                 time.sleep(duration)
1394                 # Getting statistics to calculate PPS at right speed....
1395                 data_helper.capture_tsc_hz()
1396                 data_helper.latency = self.get_latency()
1397
1398         return data_helper.result_tuple, data_helper.samples
1399
1400
1401 class ProxVpeProfileHelper(ProxProfileHelper):
1402
1403     __prox_profile_type__ = "vPE gen"
1404
1405     def __init__(self, resource_helper):
1406         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1407         self._cores_tuple = None
1408         self._ports_tuple = None
1409
1410     @property
1411     def vpe_cores(self):
1412         if not self._cores_tuple:
1413             self._cores_tuple = self.get_cores_gen_vpe()
1414         return self._cores_tuple
1415
1416     @property
1417     def cpe_cores(self):
1418         return self.vpe_cores[0]
1419
1420     @property
1421     def inet_cores(self):
1422         return self.vpe_cores[1]
1423
1424     @property
1425     def all_rx_cores(self):
1426         return self.latency_cores
1427
1428     @property
1429     def vpe_ports(self):
1430         if not self._ports_tuple:
1431             self._ports_tuple = self.get_ports_gen_vpe()
1432         return self._ports_tuple
1433
1434     @property
1435     def cpe_ports(self):
1436         return self.vpe_ports[0]
1437
1438     @property
1439     def inet_ports(self):
1440         return self.vpe_ports[1]
1441
1442     def get_cores_gen_vpe(self):
1443         cpe_cores = []
1444         inet_cores = []
1445         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1446             if not section_name.startswith("core"):
1447                 continue
1448
1449             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1450                 continue
1451
1452             for item_key, item_value in section:
1453                 if item_key != 'name':
1454                     continue
1455
1456                 if item_value.startswith("cpe"):
1457                     core_tuple = CoreSocketTuple(section_name)
1458                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1459                     cpe_cores.append(core_tag)
1460
1461                 elif item_value.startswith("inet"):
1462                     core_tuple = CoreSocketTuple(section_name)
1463                     inet_core = core_tuple.find_in_topology(self.cpu_topology)
1464                     inet_cores.append(inet_core)
1465
1466         return cpe_cores, inet_cores
1467
1468     def get_ports_gen_vpe(self):
1469         cpe_ports = []
1470         inet_ports = []
1471
1472         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1473             if not section_name.startswith("port"):
1474                 continue
1475             tx_port_iter = re.finditer(r'\d+', section_name)
1476             tx_port_no = int(next(tx_port_iter).group(0))
1477
1478             for item_key, item_value in section:
1479                 if item_key != 'name':
1480                     continue
1481
1482                 if item_value.startswith("cpe"):
1483                     cpe_ports.append(tx_port_no)
1484
1485                 elif item_value.startswith("inet"):
1486                     inet_ports.append(tx_port_no)
1487
1488         return cpe_ports, inet_ports
1489
1490     @contextmanager
1491     def traffic_context(self, pkt_size, value):
1492         # Calculate the target upload and download speed. The upload and
1493         # download packets have different packet sizes, so in order to get
1494         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1495         # of the packet sizes.
1496         cpe_pkt_size = pkt_size
1497         inet_pkt_size = pkt_size - 4
1498         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1499
1500         curr_up_speed = curr_down_speed = 0
1501         max_up_speed = max_down_speed = value
1502         if ratio < 1:
1503             max_down_speed = value * ratio
1504         else:
1505             max_up_speed = value / ratio
1506
1507         # Adjust speed when multiple cores per port are used to generate traffic
1508         if len(self.cpe_ports) != len(self.cpe_cores):
1509             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1510         if len(self.inet_ports) != len(self.inet_cores):
1511             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1512
1513         # Initialize cores
1514         self.sut.stop_all()
1515         time.sleep(2)
1516
1517         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1518         # wrong.
1519         self.sut.start(self.all_rx_cores)
1520         time.sleep(2)
1521         self.sut.stop(self.all_rx_cores)
1522         time.sleep(2)
1523         self.sut.reset_stats()
1524
1525         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1526         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1527
1528         self.sut.reset_values(self.cpe_cores)
1529         self.sut.reset_values(self.inet_cores)
1530
1531         # Set correct IP and UDP lengths in packet headers
1532         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1533         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1534         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1535         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1536
1537         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1538         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1539         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1540         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1541
1542         self.sut.set_speed(self.inet_cores, curr_up_speed)
1543         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1544
1545         # Ramp up the transmission speed. First go to the common speed, then
1546         # increase steps for the faster one.
1547         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1548
1549         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1550
1551         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1552             # The min(..., ...) takes care of 1) floating point rounding errors
1553             # that could make curr_*_speed to be slightly greater than
1554             # max_*_speed and 2) max_*_speed not being an exact multiple of
1555             # self._step_delta.
1556             if curr_up_speed < max_up_speed:
1557                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1558             if curr_down_speed < max_down_speed:
1559                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1560
1561             self.sut.set_speed(self.inet_cores, curr_up_speed)
1562             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1563             time.sleep(self.step_time)
1564
1565         LOG.info("Target speeds reached. Starting real test.")
1566
1567         yield
1568
1569         self.sut.stop(self.cpe_cores + self.inet_cores)
1570         LOG.info("Test ended. Flushing NIC buffers")
1571         self.sut.start(self.all_rx_cores)
1572         time.sleep(3)
1573         self.sut.stop(self.all_rx_cores)
1574
1575     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1576         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1577
1578         with data_helper, self.traffic_context(pkt_size, value):
1579             with data_helper.measure_tot_stats():
1580                 time.sleep(duration)
1581                 # Getting statistics to calculate PPS at right speed....
1582                 data_helper.capture_tsc_hz()
1583                 data_helper.latency = self.get_latency()
1584
1585         return data_helper.result_tuple, data_helper.samples
1586
1587
1588 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1589
1590     __prox_profile_type__ = "lwAFTR gen"
1591
1592     def __init__(self, resource_helper):
1593         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1594         self._cores_tuple = None
1595         self._ports_tuple = None
1596         self.step_delta = 5
1597         self.step_time = 0.5
1598
1599     @property
1600     def _lwaftr_cores(self):
1601         if not self._cores_tuple:
1602             self._cores_tuple = self._get_cores_gen_lwaftr()
1603         return self._cores_tuple
1604
1605     @property
1606     def tun_cores(self):
1607         return self._lwaftr_cores[0]
1608
1609     @property
1610     def inet_cores(self):
1611         return self._lwaftr_cores[1]
1612
1613     @property
1614     def _lwaftr_ports(self):
1615         if not self._ports_tuple:
1616             self._ports_tuple = self._get_ports_gen_lw_aftr()
1617         return self._ports_tuple
1618
1619     @property
1620     def tun_ports(self):
1621         return self._lwaftr_ports[0]
1622
1623     @property
1624     def inet_ports(self):
1625         return self._lwaftr_ports[1]
1626
1627     @property
1628     def all_rx_cores(self):
1629         return self.latency_cores
1630
1631     def _get_cores_gen_lwaftr(self):
1632         tun_cores = []
1633         inet_cores = []
1634         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1635             if not section_name.startswith("core"):
1636                 continue
1637
1638             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1639                 continue
1640
1641             core_tuple = CoreSocketTuple(section_name)
1642             core_tag = core_tuple.find_in_topology(self.cpu_topology)
1643             for item_value in (v for k, v in section if k == 'name'):
1644                 if item_value.startswith('tun'):
1645                     tun_cores.append(core_tag)
1646                 elif item_value.startswith('inet'):
1647                     inet_cores.append(core_tag)
1648
1649         return tun_cores, inet_cores
1650
1651     def _get_ports_gen_lw_aftr(self):
1652         tun_ports = []
1653         inet_ports = []
1654
1655         re_port = re.compile(r'port (\d+)')
1656         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1657             match = re_port.search(section_name)
1658             if not match:
1659                 continue
1660
1661             tx_port_no = int(match.group(1))
1662             for item_value in (v for k, v in section if k == 'name'):
1663                 if item_value.startswith('lwB4'):
1664                     tun_ports.append(tx_port_no)
1665                 elif item_value.startswith('inet'):
1666                     inet_ports.append(tx_port_no)
1667
1668         return tun_ports, inet_ports
1669
1670     @staticmethod
1671     def _resize(len1, len2):
1672         if len1 == len2:
1673             return 1.0
1674         return 1.0 * len1 / len2
1675
1676     @contextmanager
1677     def traffic_context(self, pkt_size, value):
1678         # Tester is sending packets at the required speed already after
1679         # setup_test(). Just get the current statistics, sleep the required
1680         # amount of time and calculate packet loss.
1681         tun_pkt_size = pkt_size
1682         inet_pkt_size = pkt_size - 40
1683         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1684
1685         curr_up_speed = curr_down_speed = 0
1686         max_up_speed = max_down_speed = value
1687
1688         max_up_speed = value / ratio
1689
1690         # Adjust speed when multiple cores per port are used to generate traffic
1691         if len(self.tun_ports) != len(self.tun_cores):
1692             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1693         if len(self.inet_ports) != len(self.inet_cores):
1694             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1695
1696         # Initialize cores
1697         self.sut.stop_all()
1698         time.sleep(0.5)
1699
1700         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1701         # wrong.
1702         self.sut.start(self.all_rx_cores)
1703         time.sleep(0.5)
1704         self.sut.stop(self.all_rx_cores)
1705         time.sleep(0.5)
1706         self.sut.reset_stats()
1707
1708         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1709         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1710
1711         self.sut.reset_values(self.tun_cores)
1712         self.sut.reset_values(self.inet_cores)
1713
1714         # Set correct IP and UDP lengths in packet headers
1715         # tun
1716         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1717         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1718         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1719         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1720         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1721         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1722
1723         # INET
1724         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1725         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1726         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1727         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1728
1729         LOG.info("Initializing SUT: sending lwAFTR packets")
1730         self.sut.set_speed(self.inet_cores, curr_up_speed)
1731         self.sut.set_speed(self.tun_cores, curr_down_speed)
1732         time.sleep(4)
1733
1734         # Ramp up the transmission speed. First go to the common speed, then
1735         # increase steps for the faster one.
1736         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1737
1738         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1739
1740         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1741             # The min(..., ...) takes care of 1) floating point rounding errors
1742             # that could make curr_*_speed to be slightly greater than
1743             # max_*_speed and 2) max_*_speed not being an exact multiple of
1744             # self._step_delta.
1745             if curr_up_speed < max_up_speed:
1746                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1747             if curr_down_speed < max_down_speed:
1748                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1749
1750             self.sut.set_speed(self.inet_cores, curr_up_speed)
1751             self.sut.set_speed(self.tun_cores, curr_down_speed)
1752             time.sleep(self.step_time)
1753
1754         LOG.info("Target speeds reached. Starting real test.")
1755
1756         yield
1757
1758         self.sut.stop(self.tun_cores + self.inet_cores)
1759         LOG.info("Test ended. Flushing NIC buffers")
1760         self.sut.start(self.all_rx_cores)
1761         time.sleep(3)
1762         self.sut.stop(self.all_rx_cores)
1763
1764     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1765         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1766
1767         with data_helper, self.traffic_context(pkt_size, value):
1768             with data_helper.measure_tot_stats():
1769                 time.sleep(duration)
1770                 # Getting statistics to calculate PPS at right speed....
1771                 data_helper.capture_tsc_hz()
1772                 data_helper.latency = self.get_latency()
1773
1774         return data_helper.result_tuple, data_helper.samples