Merge "NSB Prox LW_AFTR Test"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
15
16 import array
17 import io
18 import logging
19 import operator
20 import os
21 import re
22 import select
23 import socket
24 import time
25 from collections import OrderedDict, namedtuple
26 from contextlib import contextmanager
27 from itertools import repeat, chain
28 from multiprocessing import Queue
29
30 import six
31 from six.moves import cStringIO
32 from six.moves import zip, StringIO
33
34 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
35 from yardstick.common import utils
36 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
37 from yardstick.network_services.helpers.iniparser import ConfigParser
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
39 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
40
41 PROX_PORT = 8474
42
43 SECTION_NAME = 0
44 SECTION_CONTENTS = 1
45
46 LOG = logging.getLogger(__name__)
47 LOG.setLevel(logging.DEBUG)
48
49 TEN_GIGABIT = 1e10
50 BITS_PER_BYTE = 8
51 RETRY_SECONDS = 60
52 RETRY_INTERVAL = 1
53
54 CONFIGURATION_OPTIONS = (
55     # dict key           section     key               default value
56     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
57     ('testDuration', 'general', 'test_duration', 5.0),
58     ('testPrecision', 'general', 'test_precision', 1.0),
59     ('tests', 'general', 'tests', None),
60     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
61
62     ('logFile', 'logging', 'file', 'dats.log'),
63     ('logDateFormat', 'logging', 'datefmt', None),
64     ('logLevel', 'logging', 'level', 'INFO'),
65     ('logOverwrite', 'logging', 'overwrite', 1),
66
67     ('testerIp', 'tester', 'ip', None),
68     ('testerUser', 'tester', 'user', 'root'),
69     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
70     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
71     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
72     ('testerSocketId', 'tester', 'socket_id', 0),
73
74     ('sutIp', 'sut', 'ip', None),
75     ('sutUser', 'sut', 'user', 'root'),
76     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
77     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
78     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
79     ('sutSocketId', 'sut', 'socket_id', 0),
80 )
81
82
83 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
84     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
85
86     def __new__(cls, *args):
87         try:
88             matches = cls.CORE_RE.search(str(args[0]))
89             if matches:
90                 args = matches.groups()
91
92             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
93                                                        'h' if args[2] else '')
94
95         except (AttributeError, TypeError, IndexError, ValueError):
96             raise ValueError('Invalid core spec {}'.format(args))
97
98     def is_hyperthread(self):
99         return self.hyperthread == 'h'
100
101     @property
102     def index(self):
103         return int(self.is_hyperthread())
104
105     def find_in_topology(self, cpu_topology):
106         try:
107             socket_core_match = cpu_topology[self.socket_id][self.core_id]
108             sorted_match = sorted(socket_core_match.values())
109             return sorted_match[self.index][0]
110         except (KeyError, IndexError):
111             template = "Core {}{} on socket {} does not exist"
112             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
113
114
115 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
116     def __new__(cls, *args):
117         try:
118             assert args[0] is not str(args[0])
119             args = tuple(args[0])
120         except (AssertionError, IndexError, TypeError):
121             pass
122
123         return super(TotStatsTuple, cls).__new__(cls, *args)
124
125
126 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
127                                                         'delta_tx,delta_tsc,'
128                                                         'latency,rx_total,tx_total,pps')):
129     @property
130     def pkt_loss(self):
131         try:
132             return 1e2 * self.drop_total / float(self.tx_total)
133         except ZeroDivisionError:
134             return 100.0
135
136     @property
137     def mpps(self):
138         # calculate the effective throughput in Mpps
139         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
140
141     @property
142     def can_be_lost(self):
143         return int(self.tx_total * self.tolerated / 1e2)
144
145     @property
146     def drop_total(self):
147         return self.tx_total - self.rx_total
148
149     @property
150     def success(self):
151         return self.drop_total <= self.can_be_lost
152
153     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
154         if pkt_loss is None:
155             pkt_loss = self.pkt_loss
156
157         if port_samples is None:
158             port_samples = {}
159
160         latency_keys = [
161             "LatencyMin",
162             "LatencyMax",
163             "LatencyAvg",
164         ]
165
166         samples = {
167             "Throughput": self.mpps,
168             "DropPackets": pkt_loss,
169             "CurrentDropPackets": pkt_loss,
170             "TxThroughput": self.pps / 1e6,
171             "RxThroughput": self.mpps,
172             "PktSize": pkt_size,
173         }
174         if port_samples:
175             samples.update(port_samples)
176
177         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
178         return samples
179
180     def log_data(self, logger=None):
181         if logger is None:
182             logger = LOG
183
184         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
185         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
186         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
187
188
189 class PacketDump(object):
190     @staticmethod
191     def assert_func(func, value1, value2, template=None):
192         assert func(value1, value2), template.format(value1, value2)
193
194     def __init__(self, port_id, data_len, payload):
195         template = "Packet dump has specified length {}, but payload is {} bytes long"
196         self.assert_func(operator.eq, data_len, len(payload), template)
197         self._port_id = port_id
198         self._data_len = data_len
199         self._payload = payload
200
201     @property
202     def port_id(self):
203         """Get the port id of the packet dump"""
204         return self._port_id
205
206     @property
207     def data_len(self):
208         """Get the length of the data received"""
209         return self._data_len
210
211     def __str__(self):
212         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
213
214     def payload(self, start=None, end=None):
215         """Get part of the payload as a list of ordinals.
216
217         Returns a list of byte values, matching the contents of the packet dump.
218         Optional start and end parameters can be specified to retrieve only a
219         part of the packet contents.
220
221         The number of elements in the list is equal to end - start + 1, so end
222         is the offset of the last character.
223
224         Args:
225             start (pos. int): the starting offset in the payload. If it is not
226                 specified or None, offset 0 is assumed.
227             end (pos. int): the ending offset of the payload. If it is not
228                 specified or None, the contents until the end of the packet are
229                 returned.
230
231         Returns:
232             [int, int, ...]. Each int represents the ordinal value of a byte in
233             the packet payload.
234         """
235         if start is None:
236             start = 0
237
238         if end is None:
239             end = self.data_len - 1
240
241         # Bounds checking on offsets
242         template = "Start offset must be non-negative"
243         self.assert_func(operator.ge, start, 0, template)
244
245         template = "End offset must be less than {1}"
246         self.assert_func(operator.lt, end, self.data_len, template)
247
248         # Adjust for splice operation: end offset must be 1 more than the offset
249         # of the last desired character.
250         end += 1
251
252         return self._payload[start:end]
253
254
255 class ProxSocketHelper(object):
256
257     def __init__(self, sock=None):
258         """ creates new prox instance """
259         super(ProxSocketHelper, self).__init__()
260
261         if sock is None:
262             sock = socket.socket()
263
264         self._sock = sock
265         self._pkt_dumps = []
266         self.master_stats = None
267
268     def connect(self, ip, port):
269         """Connect to the prox instance on the remote system"""
270         self._sock.connect((ip, port))
271
272     def get_socket(self):
273         """ get the socket connected to the remote instance """
274         return self._sock
275
276     def _parse_socket_data(self, decoded_data, pkt_dump_only):
277         def get_newline_index():
278             return decoded_data.find('\n', index)
279
280         ret_str = ''
281         index = 0
282         for newline_index in iter(get_newline_index, -1):
283             ret_str = decoded_data[index:newline_index]
284
285             try:
286                 mode, port_id, data_len = ret_str.split(',', 2)
287             except ValueError:
288                 mode, port_id, data_len = None, None, None
289
290             if mode != 'pktdump':
291                 # Regular 1-line message. Stop reading from the socket.
292                 LOG.debug("Regular response read")
293                 return ret_str
294
295             LOG.debug("Packet dump header read: [%s]", ret_str)
296
297             # The line is a packet dump header. Parse it, read the
298             # packet payload, store the dump for later retrieval.
299             # Skip over the packet dump and continue processing: a
300             # 1-line response may follow the packet dump.
301
302             data_len = int(data_len)
303             data_start = newline_index + 1  # + 1 to skip over \n
304             data_end = data_start + data_len
305             sub_data = decoded_data[data_start:data_end]
306             pkt_payload = array.array('B', (ord(v) for v in sub_data))
307             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
308             self._pkt_dumps.append(pkt_dump)
309
310             if pkt_dump_only:
311                 # Return boolean instead of string to signal
312                 # successful reception of the packet dump.
313                 LOG.debug("Packet dump stored, returning")
314                 return True
315
316             index = data_end + 1
317
318         return ret_str
319
320     def get_data(self, pkt_dump_only=False, timeout=1):
321         """ read data from the socket """
322
323         # This method behaves slightly differently depending on whether it is
324         # called to read the response to a command (pkt_dump_only = 0) or if
325         # it is called specifically to read a packet dump (pkt_dump_only = 1).
326         #
327         # Packet dumps look like:
328         #   pktdump,<port_id>,<data_len>\n
329         #   <packet contents as byte array>\n
330         # This means the total packet dump message consists of 2 lines instead
331         # of 1 line.
332         #
333         # - Response for a command (pkt_dump_only = 0):
334         #   1) Read response from the socket until \n (end of message)
335         #   2a) If the response is a packet dump header (starts with "pktdump,"):
336         #     - Read the packet payload and store the packet dump for later
337         #       retrieval.
338         #     - Reset the state and restart from 1). Eventually state 2b) will
339         #       be reached and the function will return.
340         #   2b) If the response is not a packet dump:
341         #     - Return the received message as a string
342         #
343         # - Explicit request to read a packet dump (pkt_dump_only = 1):
344         #   - Read the dump header and payload
345         #   - Store the packet dump for later retrieval
346         #   - Return True to signify a packet dump was successfully read
347
348         def is_ready():
349             # recv() is blocking, so avoid calling it when no data is waiting.
350             ready = select.select([self._sock], [], [], timeout)
351             return bool(ready[0])
352
353         status = False
354         ret_str = ""
355         for status in iter(is_ready, False):
356             decoded_data = self._sock.recv(256).decode('utf-8')
357             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
358
359         LOG.debug("Received data from socket: [%s]", ret_str)
360         return ret_str if status else ''
361
362     def put_command(self, to_send):
363         """ send data to the remote instance """
364         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
365         try:
366             # TODO: sendall will block, we need a timeout
367             self._sock.sendall(to_send.encode('utf-8'))
368         except:
369             pass
370
371     def get_packet_dump(self):
372         """ get the next packet dump """
373         if self._pkt_dumps:
374             return self._pkt_dumps.pop(0)
375         return None
376
377     def stop_all_reset(self):
378         """ stop the remote instance and reset stats """
379         LOG.debug("Stop all and reset stats")
380         self.stop_all()
381         self.reset_stats()
382
383     def stop_all(self):
384         """ stop all cores on the remote instance """
385         LOG.debug("Stop all")
386         self.put_command("stop all\n")
387         time.sleep(3)
388
389     def stop(self, cores, task=''):
390         """ stop specific cores on the remote instance """
391         LOG.debug("Stopping cores %s", cores)
392         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
393         time.sleep(3)
394
395     def start_all(self):
396         """ start all cores on the remote instance """
397         LOG.debug("Start all")
398         self.put_command("start all\n")
399
400     def start(self, cores):
401         """ start specific cores on the remote instance """
402         LOG.debug("Starting cores %s", cores)
403         self.put_command("start {}\n".format(join_non_strings(',', cores)))
404         time.sleep(3)
405
406     def reset_stats(self):
407         """ reset the statistics on the remote instance """
408         LOG.debug("Reset stats")
409         self.put_command("reset stats\n")
410         time.sleep(1)
411
412     def _run_template_over_cores(self, template, cores, *args):
413         for core in cores:
414             self.put_command(template.format(core, *args))
415
416     def set_pkt_size(self, cores, pkt_size):
417         """ set the packet size to generate on the remote instance """
418         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
419         pkt_size -= 4
420         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
421         time.sleep(1)
422
423     def set_value(self, cores, offset, value, length):
424         """ set value on the remote instance """
425         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
426         LOG.debug(msg, cores, value, length, offset)
427         template = "set value {} 0 {} {} {}\n"
428         self._run_template_over_cores(template, cores, offset, value, length)
429
430     def reset_values(self, cores):
431         """ reset values on the remote instance """
432         LOG.debug("Set value for core(s) %s", cores)
433         self._run_template_over_cores("reset values {} 0\n", cores)
434
435     def set_speed(self, cores, speed, tasks=None):
436         """ set speed on the remote instance """
437         if tasks is None:
438             tasks = [0] * len(cores)
439         elif len(tasks) != len(cores):
440             LOG.error("set_speed: cores and tasks must have the same len")
441         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
442         for (core, task) in list(zip(cores, tasks)):
443             self.put_command("speed {} {} {}\n".format(core, task, speed))
444
445     def slope_speed(self, cores_speed, duration, n_steps=0):
446         """will start to increase speed from 0 to N where N is taken from
447         a['speed'] for each a in cores_speed"""
448         # by default, each step will take 0.5 sec
449         if n_steps == 0:
450             n_steps = duration * 2
451
452         private_core_data = []
453         step_duration = float(duration) / n_steps
454         for core_data in cores_speed:
455             target = float(core_data['speed'])
456             private_core_data.append({
457                 'cores': core_data['cores'],
458                 'zero': 0,
459                 'delta': target / n_steps,
460                 'current': 0,
461                 'speed': target,
462             })
463
464         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
465         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
466             time.sleep(step_duration)
467             for core_data in private_core_data:
468                 core_data['current'] = core_data[key1] + core_data[key2]
469                 self.set_speed(core_data['cores'], core_data['current'])
470
471     def set_pps(self, cores, pps, pkt_size):
472         """ set packets per second for specific cores on the remote instance """
473         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
474         LOG.debug(msg, cores, pps, pkt_size)
475
476         # speed in percent of line-rate
477         speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
478         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
479
480     def lat_stats(self, cores, task=0):
481         """Get the latency statistics from the remote system"""
482         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
483         lat_min = {}
484         lat_max = {}
485         lat_avg = {}
486         for core in cores:
487             self.put_command("lat stats {} {} \n".format(core, task))
488             ret = self.get_data()
489
490             try:
491                 lat_min[core], lat_max[core], lat_avg[core] = \
492                     tuple(int(n) for n in ret.split(",")[:3])
493
494             except (AttributeError, ValueError, TypeError):
495                 pass
496
497         return lat_min, lat_max, lat_avg
498
499     def get_all_tot_stats(self):
500         self.put_command("tot stats\n")
501         all_stats_str = self.get_data().split(",")
502         if len(all_stats_str) != 4:
503             all_stats = [0] * 4
504             return all_stats
505         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
506         self.master_stats = all_stats
507         return all_stats
508
509     def hz(self):
510         return self.get_all_tot_stats()[3]
511
512     # Deprecated
513     # TODO: remove
514     def rx_stats(self, cores, task=0):
515         return self.core_stats(cores, task)
516
517     def core_stats(self, cores, task=0):
518         """Get the receive statistics from the remote system"""
519         rx = tx = drop = tsc = 0
520         for core in cores:
521             self.put_command("core stats {} {}\n".format(core, task))
522             ret = self.get_data().split(",")
523             rx += int(ret[0])
524             tx += int(ret[1])
525             drop += int(ret[2])
526             tsc = int(ret[3])
527         return rx, tx, drop, tsc
528
529     def port_stats(self, ports):
530         """get counter values from a specific port"""
531         tot_result = [0] * 12
532         for port in ports:
533             self.put_command("port_stats {}\n".format(port))
534             ret = [try_int(s, 0) for s in self.get_data().split(",")]
535             tot_result = [sum(x) for x in zip(tot_result, ret)]
536         return tot_result
537
538     @contextmanager
539     def measure_tot_stats(self):
540         start = self.get_all_tot_stats()
541         container = {'start_tot': start}
542         try:
543             yield container
544         finally:
545             container['end_tot'] = end = self.get_all_tot_stats()
546
547         container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
548
549     def tot_stats(self):
550         """Get the total statistics from the remote system"""
551         stats = self.get_all_tot_stats()
552         return stats[:3]
553
554     def tot_ierrors(self):
555         """Get the total ierrors from the remote system"""
556         self.put_command("tot ierrors tot\n")
557         recv = self.get_data().split(',')
558         tot_ierrors = int(recv[0])
559         tsc = int(recv[0])
560         return tot_ierrors, tsc
561
562     def set_count(self, count, cores):
563         """Set the number of packets to send on the specified core"""
564         self._run_template_over_cores("count {} 0 {}\n", cores, count)
565
566     def dump_rx(self, core_id, task_id=0, count=1):
567         """Activate dump on rx on the specified core"""
568         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
569         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
570         time.sleep(1.5)  # Give PROX time to set up packet dumping
571
572     def quit(self):
573         self.stop_all()
574         self._quit()
575         self.force_quit()
576
577     def _quit(self):
578         """ stop all cores on the remote instance """
579         LOG.debug("Quit prox")
580         self.put_command("quit\n")
581         time.sleep(3)
582
583     def force_quit(self):
584         """ stop all cores on the remote instance """
585         LOG.debug("Force Quit prox")
586         self.put_command("quit_force\n")
587         time.sleep(3)
588
589
590 _LOCAL_OBJECT = object()
591
592
593 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
594     # the actual app is lowercase
595     APP_NAME = 'prox'
596     # not used for Prox but added for consistency
597     VNF_TYPE = "PROX"
598
599     LUA_PARAMETER_NAME = ""
600     LUA_PARAMETER_PEER = {
601         "gen": "sut",
602         "sut": "gen",
603     }
604
605     CONFIG_QUEUE_TIMEOUT = 120
606
607     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
608         self.remote_path = None
609         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
610         self.remote_prox_file_name = None
611         self._prox_config_data = None
612         self.additional_files = {}
613         self.config_queue = Queue()
614         # allow_exit_without_flush
615         self.config_queue.cancel_join_thread()
616         self._global_section = None
617
618     @property
619     def prox_config_data(self):
620         if self._prox_config_data is None:
621             # this will block, but it needs too
622             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
623         return self._prox_config_data
624
625     @property
626     def global_section(self):
627         if self._global_section is None and self.prox_config_data:
628             self._global_section = self.find_section("global")
629         return self._global_section
630
631     def find_section(self, name, default=_LOCAL_OBJECT):
632         result = next((value for key, value in self.prox_config_data if key == name), default)
633         if result is _LOCAL_OBJECT:
634             raise KeyError('{} not found in Prox config'.format(name))
635         return result
636
637     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
638         section = self.find_section(section_name, [])
639         result = next((value for key, value in section if key == section_key), default)
640         if result is _LOCAL_OBJECT:
641             template = '{} not found in {} section of Prox config'
642             raise KeyError(template.format(section_key, section_name))
643         return result
644
645     def _build_pipeline_kwargs(self):
646         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
647         self.pipeline_kwargs = {
648             'tool_path': tool_path,
649             'tool_dir': os.path.dirname(tool_path),
650         }
651
652     def copy_to_target(self, config_file_path, prox_file):
653         remote_path = os.path.join("/tmp", prox_file)
654         self.ssh_helper.put(config_file_path, remote_path)
655         return remote_path
656
657     @staticmethod
658     def _get_tx_port(section, sections):
659         iface_port = [-1]
660         for item in sections[section]:
661             if item[0] == "tx port":
662                 iface_port = re.findall(r'\d+', item[1])
663                 # do we want the last one?
664                 #   if yes, then can we reverse?
665         return int(iface_port[0])
666
667     @staticmethod
668     def _replace_quoted_with_value(quoted, value, count=1):
669         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
670         return new_string
671
672     def _insert_additional_file(self, value):
673         file_str = value.split('"')
674         base_name = os.path.basename(file_str[1])
675         file_str[1] = self.additional_files[base_name]
676         return '"'.join(file_str)
677
678     def generate_prox_config_file(self, config_path):
679         sections = []
680         prox_config = ConfigParser(config_path, sections)
681         prox_config.parse()
682
683         # Ensure MAC is set "hardware"
684         all_ports = self.vnfd_helper.port_pairs.all_ports
685         # use dpdk port number
686         for port_name in all_ports:
687             port_num = self.vnfd_helper.port_num(port_name)
688             port_section_name = "port {}".format(port_num)
689             for section_name, section in sections:
690                 if port_section_name != section_name:
691                     continue
692
693                 for index, section_data in enumerate(section):
694                     if section_data[0] == "mac":
695                         section_data[1] = "hardware"
696
697         # search for dst mac
698         for _, section in sections:
699             # for index, (item_key, item_val) in enumerate(section):
700             for index, section_data in enumerate(section):
701                 item_key, item_val = section_data
702                 if item_val.startswith("@@dst_mac"):
703                     tx_port_iter = re.finditer(r'\d+', item_val)
704                     tx_port_no = int(next(tx_port_iter).group(0))
705                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
706                     mac = intf["virtual-interface"]["dst_mac"]
707                     section_data[1] = mac.replace(":", " ", 6)
708
709                 if item_key == "dst mac" and item_val.startswith("@@"):
710                     tx_port_iter = re.finditer(r'\d+', item_val)
711                     tx_port_no = int(next(tx_port_iter).group(0))
712                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
713                     mac = intf["virtual-interface"]["dst_mac"]
714                     section_data[1] = mac
715
716         # if addition file specified in prox config
717         if not self.additional_files:
718             return sections
719
720         for section_name, section in sections:
721             for index, section_data in enumerate(section):
722                 try:
723                     if section_data[0].startswith("dofile"):
724                         section_data[0] = self._insert_additional_file(section_data[0])
725
726                     if section_data[1].startswith("dofile"):
727                         section_data[1] = self._insert_additional_file(section_data[1])
728                 except:
729                     pass
730
731         return sections
732
733     @staticmethod
734     def write_prox_lua(lua_config):
735         """
736         Write an .ini-format config file for PROX (parameters.lua)
737         PROX does not allow a space before/after the =, so we need
738         a custom method
739         """
740         out = []
741         for key in lua_config:
742             value = '"' + lua_config[key] + '"'
743             if key == "__name__":
744                 continue
745             if value is not None and value != '@':
746                 key = "=".join((key, str(value).replace('\n', '\n\t')))
747                 out.append(key)
748             else:
749                 key = str(key).replace('\n', '\n\t')
750                 out.append(key)
751         return os.linesep.join(out)
752
753     @staticmethod
754     def write_prox_config(prox_config):
755         """
756         Write an .ini-format config file for PROX
757         PROX does not allow a space before/after the =, so we need
758         a custom method
759         """
760         out = []
761         for i, (section_name, section) in enumerate(prox_config):
762             out.append("[{}]".format(section_name))
763             for index, item in enumerate(section):
764                 key, value = item
765                 if key == "__name__":
766                     continue
767                 if value is not None and value != '@':
768                     key = "=".join((key, str(value).replace('\n', '\n\t')))
769                     out.append(key)
770                 else:
771                     key = str(key).replace('\n', '\n\t')
772                     out.append(key)
773         return os.linesep.join(out)
774
775     def put_string_to_file(self, s, remote_path):
776         file_obj = cStringIO(s)
777         self.ssh_helper.put_file_obj(file_obj, remote_path)
778         return remote_path
779
780     def generate_prox_lua_file(self):
781         p = OrderedDict()
782         all_ports = self.vnfd_helper.port_pairs.all_ports
783         for port_name in all_ports:
784             port_num = self.vnfd_helper.port_num(port_name)
785             intf = self.vnfd_helper.find_interface(name=port_name)
786             vintf = intf['virtual-interface']
787             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
788             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
789
790         return p
791
792     def upload_prox_lua(self, config_file, lua_data):
793         # prox can't handle spaces around ' = ' so use custom method
794         out = StringIO(self.write_prox_lua(lua_data))
795         out.seek(0)
796         remote_path = os.path.join("/tmp", config_file)
797         self.ssh_helper.put_file_obj(out, remote_path)
798
799         return remote_path
800
801     def upload_prox_config(self, config_file, prox_config_data):
802         # prox can't handle spaces around ' = ' so use custom method
803         out = StringIO(self.write_prox_config(prox_config_data))
804         out.seek(0)
805         remote_path = os.path.join("/tmp", config_file)
806         self.ssh_helper.put_file_obj(out, remote_path)
807
808         return remote_path
809
810     def build_config_file(self):
811         task_path = self.scenario_helper.task_path
812         options = self.scenario_helper.options
813         config_path = options['prox_config']
814         config_file = os.path.basename(config_path)
815         config_path = find_relative_file(config_path, task_path)
816         self.additional_files = {}
817
818         try:
819             if options['prox_generate_parameter']:
820                 self.lua = []
821                 self.lua = self.generate_prox_lua_file()
822                 if len(self.lua) > 0:
823                     self.upload_prox_lua("parameters.lua", self.lua)
824         except:
825             pass
826
827         prox_files = options.get('prox_files', [])
828         if isinstance(prox_files, six.string_types):
829             prox_files = [prox_files]
830         for key_prox_file in prox_files:
831             base_prox_file = os.path.basename(key_prox_file)
832             key_prox_path = find_relative_file(key_prox_file, task_path)
833             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
834             self.additional_files[base_prox_file] = remote_prox_file
835
836         self._prox_config_data = self.generate_prox_config_file(config_path)
837         # copy config to queue so we can read it from traffic_runner process
838         self.config_queue.put(self._prox_config_data)
839         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
840
841     def build_config(self):
842         self.build_config_file()
843
844         options = self.scenario_helper.options
845
846         prox_args = options['prox_args']
847         LOG.info("Provision and start the %s", self.APP_NAME)
848         self._build_pipeline_kwargs()
849         self.pipeline_kwargs["args"] = " ".join(
850             " ".join([k, v if v else ""]) for k, v in prox_args.items())
851         self.pipeline_kwargs["cfg_file"] = self.remote_path
852
853         cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
854         prox_cmd = cmd_template.format(**self.pipeline_kwargs)
855         return prox_cmd
856
857
858 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
859 class ProxResourceHelper(ClientResourceHelper):
860
861     RESOURCE_WORD = 'prox'
862
863     PROX_MODE = ""
864
865     WAIT_TIME = 3
866
867     @staticmethod
868     def find_pci(pci, bound_pci):
869         # we have to substring match PCI bus address from the end
870         return any(b.endswith(pci) for b in bound_pci)
871
872     def __init__(self, setup_helper):
873         super(ProxResourceHelper, self).__init__(setup_helper)
874         self.mgmt_interface = self.vnfd_helper.mgmt_interface
875         self._user = self.mgmt_interface["user"]
876         self._ip = self.mgmt_interface["ip"]
877
878         self.done = False
879         self._vpci_to_if_name_map = None
880         self.additional_file = {}
881         self.remote_prox_file_name = None
882         self.lower = None
883         self.upper = None
884         self.step_delta = 1
885         self.step_time = 0.5
886         self._test_type = None
887
888     @property
889     def sut(self):
890         if not self.client:
891             self.client = self._connect()
892         return self.client
893
894     @property
895     def test_type(self):
896         if self._test_type is None:
897             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
898         return self._test_type
899
900     def run_traffic(self, traffic_profile):
901         self._queue.cancel_join_thread()
902         self.lower = 0.0
903         self.upper = 100.0
904
905         traffic_profile.init(self._queue)
906         # this frees up the run_traffic loop
907         self.client_started.value = 1
908
909         while not self._terminated.value:
910             # move it all to traffic_profile
911             self._run_traffic_once(traffic_profile)
912
913     def _run_traffic_once(self, traffic_profile):
914         traffic_profile.execute_traffic(self)
915         if traffic_profile.done:
916             self._queue.put({'done': True})
917             LOG.debug("tg_prox done")
918             self._terminated.value = 1
919
920     # For VNF use ResourceHelper method to collect KPIs directly.
921     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
922     def collect_collectd_kpi(self):
923         return self._collect_resource_kpi()
924
925     def collect_kpi(self):
926         result = super(ProxResourceHelper, self).collect_kpi()
927         # add in collectd kpis manually
928         if result:
929             result['collect_stats'] = self._collect_resource_kpi()
930         return result
931
932     def terminate(self):
933         # should not be called, use VNF terminate
934         raise NotImplementedError()
935
936     def up_post(self):
937         return self.sut  # force connection
938
939     def execute(self, cmd, *args, **kwargs):
940         func = getattr(self.sut, cmd, None)
941         if func:
942             return func(*args, **kwargs)
943
944     def _connect(self, client=None):
945         """Run and connect to prox on the remote system """
946         # De-allocating a large amount of hugepages takes some time. If a new
947         # PROX instance is started immediately after killing the previous one,
948         # it might not be able to allocate hugepages, because they are still
949         # being freed. Hence the -w switch.
950         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
951         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
952         # -f ./handle_none-4.cfg"
953         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
954         #  "; " \
955         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
956         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
957         # sudo " \
958         #    + "./build/Prox " + prox_args
959         # log.debug("Starting PROX with command [%s]", prox_cmd)
960         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
961         # self._ip, prox_cmd))
962         if client is None:
963             client = ProxSocketHelper()
964
965         # try connecting to Prox for 60s
966         for _ in range(RETRY_SECONDS):
967             time.sleep(RETRY_INTERVAL)
968             try:
969                 client.connect(self._ip, PROX_PORT)
970             except (socket.gaierror, socket.error):
971                 continue
972             else:
973                 return client
974
975         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
976         raise Exception(msg.format(self._ip, PROX_PORT))
977
978
979 class ProxDataHelper(object):
980
981     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
982         super(ProxDataHelper, self).__init__()
983         self.vnfd_helper = vnfd_helper
984         self.sut = sut
985         self.pkt_size = pkt_size
986         self.value = value
987         self.tolerated_loss = tolerated_loss
988         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
989         self.tsc_hz = None
990         self.measured_stats = None
991         self.latency = None
992         self._totals_and_pps = None
993         self.result_tuple = None
994
995     @property
996     def totals_and_pps(self):
997         if self._totals_and_pps is None:
998             rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
999             pps = self.value / 100.0 * self.line_rate_to_pps()
1000             self._totals_and_pps = rx_total, tx_total, pps
1001         return self._totals_and_pps
1002
1003     @property
1004     def rx_total(self):
1005         return self.totals_and_pps[0]
1006
1007     @property
1008     def tx_total(self):
1009         return self.totals_and_pps[1]
1010
1011     @property
1012     def pps(self):
1013         return self.totals_and_pps[2]
1014
1015     @property
1016     def samples(self):
1017         samples = {}
1018         for port_name, port_num in self.vnfd_helper.ports_iter():
1019             port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1020             samples[port_name] = {
1021                 "in_packets": port_rx_total,
1022                 "out_packets": port_tx_total,
1023             }
1024         return samples
1025
1026     def __enter__(self):
1027         self.check_interface_count()
1028         return self
1029
1030     def __exit__(self, exc_type, exc_val, exc_tb):
1031         self.make_tuple()
1032
1033     def make_tuple(self):
1034         if self.result_tuple:
1035             return
1036
1037         self.result_tuple = ProxTestDataTuple(
1038             self.tolerated_loss,
1039             self.tsc_hz,
1040             self.measured_stats['delta'].rx,
1041             self.measured_stats['delta'].tx,
1042             self.measured_stats['delta'].tsc,
1043             self.latency,
1044             self.rx_total,
1045             self.tx_total,
1046             self.pps,
1047         )
1048         self.result_tuple.log_data()
1049
1050     @contextmanager
1051     def measure_tot_stats(self):
1052         with self.sut.measure_tot_stats() as self.measured_stats:
1053             yield
1054
1055     def check_interface_count(self):
1056         # do this assert in init?  unless we expect interface count to
1057         # change from one run to another run...
1058         assert self.port_count in {1, 2, 4}, \
1059             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1060
1061     def capture_tsc_hz(self):
1062         self.tsc_hz = float(self.sut.hz())
1063
1064     def line_rate_to_pps(self):
1065         # FIXME Don't hardcode 10Gb/s
1066         return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1067
1068
1069 class ProxProfileHelper(object):
1070
1071     __prox_profile_type__ = "Generic"
1072
1073     PROX_CORE_GEN_MODE = "gen"
1074     PROX_CORE_LAT_MODE = "lat"
1075
1076     @classmethod
1077     def get_cls(cls, helper_type):
1078         """Return class of specified type."""
1079         if not helper_type:
1080             return ProxProfileHelper
1081
1082         for profile_helper_class in utils.itersubclasses(cls):
1083             if helper_type == profile_helper_class.__prox_profile_type__:
1084                 return profile_helper_class
1085
1086         return ProxProfileHelper
1087
1088     @classmethod
1089     def make_profile_helper(cls, resource_helper):
1090         return cls.get_cls(resource_helper.test_type)(resource_helper)
1091
1092     def __init__(self, resource_helper):
1093         super(ProxProfileHelper, self).__init__()
1094         self.resource_helper = resource_helper
1095         self._cpu_topology = None
1096         self._test_cores = None
1097         self._latency_cores = None
1098
1099     @property
1100     def cpu_topology(self):
1101         if not self._cpu_topology:
1102             stdout = io.BytesIO()
1103             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1104             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1105         return self._cpu_topology
1106
1107     @property
1108     def test_cores(self):
1109         if not self._test_cores:
1110             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1111         return self._test_cores
1112
1113     @property
1114     def latency_cores(self):
1115         if not self._latency_cores:
1116             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1117         return self._latency_cores
1118
1119     @contextmanager
1120     def traffic_context(self, pkt_size, value):
1121         self.sut.stop_all()
1122         self.sut.reset_stats()
1123         try:
1124             self.sut.set_pkt_size(self.test_cores, pkt_size)
1125             self.sut.set_speed(self.test_cores, value)
1126             self.sut.start_all()
1127             yield
1128         finally:
1129             self.sut.stop_all()
1130
1131     def get_cores(self, mode):
1132         cores = []
1133
1134         for section_name, section in self.setup_helper.prox_config_data:
1135             if not section_name.startswith("core"):
1136                 continue
1137
1138             for key, value in section:
1139                 if key == "mode" and value == mode:
1140                     core_tuple = CoreSocketTuple(section_name)
1141                     core = core_tuple.find_in_topology(self.cpu_topology)
1142                     cores.append(core)
1143
1144         return cores
1145
1146     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1147         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1148
1149         with data_helper, self.traffic_context(pkt_size, value):
1150             with data_helper.measure_tot_stats():
1151                 time.sleep(duration)
1152                 # Getting statistics to calculate PPS at right speed....
1153                 data_helper.capture_tsc_hz()
1154                 data_helper.latency = self.get_latency()
1155
1156         return data_helper.result_tuple, data_helper.samples
1157
1158     def get_latency(self):
1159         """
1160         :return: return lat_min, lat_max, lat_avg
1161         :rtype: list
1162         """
1163         if self._latency_cores:
1164             return self.sut.lat_stats(self._latency_cores)
1165         return []
1166
1167     def terminate(self):
1168         pass
1169
1170     def __getattr__(self, item):
1171         return getattr(self.resource_helper, item)
1172
1173
1174 class ProxMplsProfileHelper(ProxProfileHelper):
1175
1176     __prox_profile_type__ = "MPLS tag/untag"
1177
1178     def __init__(self, resource_helper):
1179         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1180         self._cores_tuple = None
1181
1182     @property
1183     def mpls_cores(self):
1184         if not self._cores_tuple:
1185             self._cores_tuple = self.get_cores_mpls()
1186         return self._cores_tuple
1187
1188     @property
1189     def tagged_cores(self):
1190         return self.mpls_cores[0]
1191
1192     @property
1193     def plain_cores(self):
1194         return self.mpls_cores[1]
1195
1196     def get_cores_mpls(self):
1197         cores_tagged = []
1198         cores_plain = []
1199         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1200             if not section_name.startswith("core"):
1201                 continue
1202
1203             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1204                 continue
1205
1206             for item_key, item_value in section:
1207                 if item_key != 'name':
1208                     continue
1209
1210                 if item_value.startswith("tag"):
1211                     core_tuple = CoreSocketTuple(section_name)
1212                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1213                     cores_tagged.append(core_tag)
1214
1215                 elif item_value.startswith("udp"):
1216                     core_tuple = CoreSocketTuple(section_name)
1217                     core_udp = core_tuple.find_in_topology(self.cpu_topology)
1218                     cores_plain.append(core_udp)
1219
1220         return cores_tagged, cores_plain
1221
1222     @contextmanager
1223     def traffic_context(self, pkt_size, value):
1224         self.sut.stop_all()
1225         self.sut.reset_stats()
1226         try:
1227             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1228             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1229             self.sut.set_speed(self.tagged_cores, value)
1230             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1231             self.sut.set_speed(self.plain_cores, value * ratio)
1232             self.sut.start_all()
1233             yield
1234         finally:
1235             self.sut.stop_all()
1236
1237
1238 class ProxBngProfileHelper(ProxProfileHelper):
1239
1240     __prox_profile_type__ = "BNG gen"
1241
1242     def __init__(self, resource_helper):
1243         super(ProxBngProfileHelper, self).__init__(resource_helper)
1244         self._cores_tuple = None
1245
1246     @property
1247     def bng_cores(self):
1248         if not self._cores_tuple:
1249             self._cores_tuple = self.get_cores_gen_bng_qos()
1250         return self._cores_tuple
1251
1252     @property
1253     def cpe_cores(self):
1254         return self.bng_cores[0]
1255
1256     @property
1257     def inet_cores(self):
1258         return self.bng_cores[1]
1259
1260     @property
1261     def arp_cores(self):
1262         return self.bng_cores[2]
1263
1264     @property
1265     def arp_task_cores(self):
1266         return self.bng_cores[3]
1267
1268     @property
1269     def all_rx_cores(self):
1270         return self.latency_cores
1271
1272     def get_cores_gen_bng_qos(self):
1273         cpe_cores = []
1274         inet_cores = []
1275         arp_cores = []
1276         arp_tasks_core = [0]
1277         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1278             if not section_name.startswith("core"):
1279                 continue
1280
1281             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1282                 continue
1283
1284             for item_key, item_value in section:
1285                 if item_key != 'name':
1286                     continue
1287
1288                 if item_value.startswith("cpe"):
1289                     core_tuple = CoreSocketTuple(section_name)
1290                     cpe_core = core_tuple.find_in_topology(self.cpu_topology)
1291                     cpe_cores.append(cpe_core)
1292
1293                 elif item_value.startswith("inet"):
1294                     core_tuple = CoreSocketTuple(section_name)
1295                     inet_core = core_tuple.find_in_topology(self.cpu_topology)
1296                     inet_cores.append(inet_core)
1297
1298                 elif item_value.startswith("arp"):
1299                     core_tuple = CoreSocketTuple(section_name)
1300                     arp_core = core_tuple.find_in_topology(self.cpu_topology)
1301                     arp_cores.append(arp_core)
1302
1303                 # We check the tasks/core separately
1304                 if item_value.startswith("arp_task"):
1305                     core_tuple = CoreSocketTuple(section_name)
1306                     arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
1307                     arp_tasks_core.append(arp_task_core)
1308
1309         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1310
1311     @contextmanager
1312     def traffic_context(self, pkt_size, value):
1313         # Tester is sending packets at the required speed already after
1314         # setup_test(). Just get the current statistics, sleep the required
1315         # amount of time and calculate packet loss.
1316         inet_pkt_size = pkt_size
1317         cpe_pkt_size = pkt_size - 24
1318         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1319
1320         curr_up_speed = curr_down_speed = 0
1321         max_up_speed = max_down_speed = value
1322         if ratio < 1:
1323             max_down_speed = value * ratio
1324         else:
1325             max_up_speed = value / ratio
1326
1327         # Initialize cores
1328         self.sut.stop_all()
1329         time.sleep(0.5)
1330
1331         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1332         # wrong.
1333         self.sut.start(self.all_rx_cores)
1334         time.sleep(0.5)
1335         self.sut.stop(self.all_rx_cores)
1336         time.sleep(0.5)
1337         self.sut.reset_stats()
1338
1339         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1340         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1341
1342         self.sut.reset_values(self.cpe_cores)
1343         self.sut.reset_values(self.inet_cores)
1344
1345         # Set correct IP and UDP lengths in packet headers
1346         # CPE
1347         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1348         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1349         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1350         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1351
1352         # INET
1353         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1354         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1355         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1356         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1357         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1358         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1359
1360         # Sending ARP to initialize tables - need a few seconds of generation
1361         # to make sure all CPEs are initialized
1362         LOG.info("Initializing SUT: sending ARP packets")
1363         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1364         self.sut.set_speed(self.inet_cores, curr_up_speed)
1365         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1366         self.sut.start(self.arp_cores)
1367         time.sleep(4)
1368
1369         # Ramp up the transmission speed. First go to the common speed, then
1370         # increase steps for the faster one.
1371         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1372
1373         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1374
1375         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1376             # The min(..., ...) takes care of 1) floating point rounding errors
1377             # that could make curr_*_speed to be slightly greater than
1378             # max_*_speed and 2) max_*_speed not being an exact multiple of
1379             # self._step_delta.
1380             if curr_up_speed < max_up_speed:
1381                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1382             if curr_down_speed < max_down_speed:
1383                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1384
1385             self.sut.set_speed(self.inet_cores, curr_up_speed)
1386             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1387             time.sleep(self.step_time)
1388
1389         LOG.info("Target speeds reached. Starting real test.")
1390
1391         yield
1392
1393         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1394         LOG.info("Test ended. Flushing NIC buffers")
1395         self.sut.start(self.all_rx_cores)
1396         time.sleep(3)
1397         self.sut.stop(self.all_rx_cores)
1398
1399     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1400         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1401
1402         with data_helper, self.traffic_context(pkt_size, value):
1403             with data_helper.measure_tot_stats():
1404                 time.sleep(duration)
1405                 # Getting statistics to calculate PPS at right speed....
1406                 data_helper.capture_tsc_hz()
1407                 data_helper.latency = self.get_latency()
1408
1409         return data_helper.result_tuple, data_helper.samples
1410
1411
1412 class ProxVpeProfileHelper(ProxProfileHelper):
1413
1414     __prox_profile_type__ = "vPE gen"
1415
1416     def __init__(self, resource_helper):
1417         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1418         self._cores_tuple = None
1419         self._ports_tuple = None
1420
1421     @property
1422     def vpe_cores(self):
1423         if not self._cores_tuple:
1424             self._cores_tuple = self.get_cores_gen_vpe()
1425         return self._cores_tuple
1426
1427     @property
1428     def cpe_cores(self):
1429         return self.vpe_cores[0]
1430
1431     @property
1432     def inet_cores(self):
1433         return self.vpe_cores[1]
1434
1435     @property
1436     def all_rx_cores(self):
1437         return self.latency_cores
1438
1439     @property
1440     def vpe_ports(self):
1441         if not self._ports_tuple:
1442             self._ports_tuple = self.get_ports_gen_vpe()
1443         return self._ports_tuple
1444
1445     @property
1446     def cpe_ports(self):
1447         return self.vpe_ports[0]
1448
1449     @property
1450     def inet_ports(self):
1451         return self.vpe_ports[1]
1452
1453     def get_cores_gen_vpe(self):
1454         cpe_cores = []
1455         inet_cores = []
1456         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1457             if not section_name.startswith("core"):
1458                 continue
1459
1460             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1461                 continue
1462
1463             for item_key, item_value in section:
1464                 if item_key != 'name':
1465                     continue
1466
1467                 if item_value.startswith("cpe"):
1468                     core_tuple = CoreSocketTuple(section_name)
1469                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1470                     cpe_cores.append(core_tag)
1471
1472                 elif item_value.startswith("inet"):
1473                     core_tuple = CoreSocketTuple(section_name)
1474                     inet_core = core_tuple.find_in_topology(self.cpu_topology)
1475                     inet_cores.append(inet_core)
1476
1477         return cpe_cores, inet_cores
1478
1479     def get_ports_gen_vpe(self):
1480         cpe_ports = []
1481         inet_ports = []
1482
1483         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1484             if not section_name.startswith("port"):
1485                 continue
1486             tx_port_iter = re.finditer(r'\d+', section_name)
1487             tx_port_no = int(next(tx_port_iter).group(0))
1488
1489             for item_key, item_value in section:
1490                 if item_key != 'name':
1491                     continue
1492
1493                 if item_value.startswith("cpe"):
1494                     cpe_ports.append(tx_port_no)
1495
1496                 elif item_value.startswith("inet"):
1497                     inet_ports.append(tx_port_no)
1498
1499         return cpe_ports, inet_ports
1500
1501     @contextmanager
1502     def traffic_context(self, pkt_size, value):
1503         # Calculate the target upload and download speed. The upload and
1504         # download packets have different packet sizes, so in order to get
1505         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1506         # of the packet sizes.
1507         cpe_pkt_size = pkt_size
1508         inet_pkt_size = pkt_size - 4
1509         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1510
1511         curr_up_speed = curr_down_speed = 0
1512         max_up_speed = max_down_speed = value
1513         if ratio < 1:
1514             max_down_speed = value * ratio
1515         else:
1516             max_up_speed = value / ratio
1517
1518         # Adjust speed when multiple cores per port are used to generate traffic
1519         if len(self.cpe_ports) != len(self.cpe_cores):
1520             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1521         if len(self.inet_ports) != len(self.inet_cores):
1522             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1523
1524         # Initialize cores
1525         self.sut.stop_all()
1526         time.sleep(2)
1527
1528         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1529         # wrong.
1530         self.sut.start(self.all_rx_cores)
1531         time.sleep(2)
1532         self.sut.stop(self.all_rx_cores)
1533         time.sleep(2)
1534         self.sut.reset_stats()
1535
1536         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1537         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1538
1539         self.sut.reset_values(self.cpe_cores)
1540         self.sut.reset_values(self.inet_cores)
1541
1542         # Set correct IP and UDP lengths in packet headers
1543         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1544         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1545         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1546         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1547
1548         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1549         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1550         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1551         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1552
1553         self.sut.set_speed(self.inet_cores, curr_up_speed)
1554         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1555
1556         # Ramp up the transmission speed. First go to the common speed, then
1557         # increase steps for the faster one.
1558         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1559
1560         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1561
1562         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1563             # The min(..., ...) takes care of 1) floating point rounding errors
1564             # that could make curr_*_speed to be slightly greater than
1565             # max_*_speed and 2) max_*_speed not being an exact multiple of
1566             # self._step_delta.
1567             if curr_up_speed < max_up_speed:
1568                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1569             if curr_down_speed < max_down_speed:
1570                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1571
1572             self.sut.set_speed(self.inet_cores, curr_up_speed)
1573             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1574             time.sleep(self.step_time)
1575
1576         LOG.info("Target speeds reached. Starting real test.")
1577
1578         yield
1579
1580         self.sut.stop(self.cpe_cores + self.inet_cores)
1581         LOG.info("Test ended. Flushing NIC buffers")
1582         self.sut.start(self.all_rx_cores)
1583         time.sleep(3)
1584         self.sut.stop(self.all_rx_cores)
1585
1586     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1587         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1588
1589         with data_helper, self.traffic_context(pkt_size, value):
1590             with data_helper.measure_tot_stats():
1591                 time.sleep(duration)
1592                 # Getting statistics to calculate PPS at right speed....
1593                 data_helper.capture_tsc_hz()
1594                 data_helper.latency = self.get_latency()
1595
1596         return data_helper.result_tuple, data_helper.samples
1597
1598
1599 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1600
1601     __prox_profile_type__ = "lwAFTR gen"
1602
1603     def __init__(self, resource_helper):
1604         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1605         self._cores_tuple = None
1606         self._ports_tuple = None
1607         self.step_delta = 5
1608         self.step_time = 0.5
1609
1610     @property
1611     def _lwaftr_cores(self):
1612         if not self._cores_tuple:
1613             self._cores_tuple = self._get_cores_gen_lwaftr()
1614         return self._cores_tuple
1615
1616     @property
1617     def tun_cores(self):
1618         return self._lwaftr_cores[0]
1619
1620     @property
1621     def inet_cores(self):
1622         return self._lwaftr_cores[1]
1623
1624     @property
1625     def _lwaftr_ports(self):
1626         if not self._ports_tuple:
1627             self._ports_tuple = self._get_ports_gen_lw_aftr()
1628         return self._ports_tuple
1629
1630     @property
1631     def tun_ports(self):
1632         return self._lwaftr_ports[0]
1633
1634     @property
1635     def inet_ports(self):
1636         return self._lwaftr_ports[1]
1637
1638     @property
1639     def all_rx_cores(self):
1640         return self.latency_cores
1641
1642     def _get_cores_gen_lwaftr(self):
1643         tun_cores = []
1644         inet_cores = []
1645         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1646             if not section_name.startswith("core"):
1647                 continue
1648
1649             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1650                 continue
1651
1652             core_tuple = CoreSocketTuple(section_name)
1653             core_tag = core_tuple.find_in_topology(self.cpu_topology)
1654             for item_value in (v for k, v in section if k == 'name'):
1655                 if item_value.startswith('tun'):
1656                     tun_cores.append(core_tag)
1657                 elif item_value.startswith('inet'):
1658                     inet_cores.append(core_tag)
1659
1660         return tun_cores, inet_cores
1661
1662     def _get_ports_gen_lw_aftr(self):
1663         tun_ports = []
1664         inet_ports = []
1665
1666         re_port = re.compile('port (\d+)')
1667         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1668             match = re_port.search(section_name)
1669             if not match:
1670                 continue
1671
1672             tx_port_no = int(match.group(1))
1673             for item_value in (v for k, v in section if k == 'name'):
1674                 if item_value.startswith('lwB4'):
1675                     tun_ports.append(tx_port_no)
1676                 elif item_value.startswith('inet'):
1677                     inet_ports.append(tx_port_no)
1678
1679         return tun_ports, inet_ports
1680
1681     @staticmethod
1682     def _resize(len1, len2):
1683         if len1 == len2:
1684             return 1.0
1685         return 1.0 * len1 / len2
1686
1687     @contextmanager
1688     def traffic_context(self, pkt_size, value):
1689         # Tester is sending packets at the required speed already after
1690         # setup_test(). Just get the current statistics, sleep the required
1691         # amount of time and calculate packet loss.
1692         tun_pkt_size = pkt_size
1693         inet_pkt_size = pkt_size - 40
1694         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1695
1696         curr_up_speed = curr_down_speed = 0
1697         max_up_speed = max_down_speed = value
1698
1699         max_up_speed = value / ratio
1700
1701         # Adjust speed when multiple cores per port are used to generate traffic
1702         if len(self.tun_ports) != len(self.tun_cores):
1703             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1704         if len(self.inet_ports) != len(self.inet_cores):
1705             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1706
1707         # Initialize cores
1708         self.sut.stop_all()
1709         time.sleep(0.5)
1710
1711         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1712         # wrong.
1713         self.sut.start(self.all_rx_cores)
1714         time.sleep(0.5)
1715         self.sut.stop(self.all_rx_cores)
1716         time.sleep(0.5)
1717         self.sut.reset_stats()
1718
1719         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1720         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1721
1722         self.sut.reset_values(self.tun_cores)
1723         self.sut.reset_values(self.inet_cores)
1724
1725         # Set correct IP and UDP lengths in packet headers
1726         # tun
1727         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1728         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1729         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1730         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1731         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1732         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1733
1734         # INET
1735         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1736         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1737         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1738         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1739
1740         LOG.info("Initializing SUT: sending lwAFTR packets")
1741         self.sut.set_speed(self.inet_cores, curr_up_speed)
1742         self.sut.set_speed(self.tun_cores, curr_down_speed)
1743         time.sleep(4)
1744
1745         # Ramp up the transmission speed. First go to the common speed, then
1746         # increase steps for the faster one.
1747         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1748
1749         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1750
1751         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1752             # The min(..., ...) takes care of 1) floating point rounding errors
1753             # that could make curr_*_speed to be slightly greater than
1754             # max_*_speed and 2) max_*_speed not being an exact multiple of
1755             # self._step_delta.
1756             if curr_up_speed < max_up_speed:
1757                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1758             if curr_down_speed < max_down_speed:
1759                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1760
1761             self.sut.set_speed(self.inet_cores, curr_up_speed)
1762             self.sut.set_speed(self.tun_cores, curr_down_speed)
1763             time.sleep(self.step_time)
1764
1765         LOG.info("Target speeds reached. Starting real test.")
1766
1767         yield
1768
1769         self.sut.stop(self.tun_cores + self.inet_cores)
1770         LOG.info("Test ended. Flushing NIC buffers")
1771         self.sut.start(self.all_rx_cores)
1772         time.sleep(3)
1773         self.sut.stop(self.all_rx_cores)
1774
1775     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1776         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1777
1778         with data_helper, self.traffic_context(pkt_size, value):
1779             with data_helper.measure_tot_stats():
1780                 time.sleep(duration)
1781                 # Getting statistics to calculate PPS at right speed....
1782                 data_helper.capture_tsc_hz()
1783                 data_helper.latency = self.get_latency()
1784
1785         return data_helper.result_tuple, data_helper.samples