Merge "Fix NSB NFVi metrics accuracy"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import array
16 import io
17 import logging
18 import operator
19 import os
20 import re
21 import select
22 import socket
23 import time
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
28
29 import six
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
32
33 from yardstick.common import utils
34 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
35 from yardstick.network_services.helpers.iniparser import ConfigParser
36 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
38 from yardstick.network_services import constants
39
40 PROX_PORT = 8474
41
42 SECTION_NAME = 0
43 SECTION_CONTENTS = 1
44
45 LOG = logging.getLogger(__name__)
46 LOG.setLevel(logging.DEBUG)
47 LOG_RESULT = logging.getLogger('yardstick')
48 LOG_RESULT.setLevel(logging.DEBUG)
49
50 BITS_PER_BYTE = 8
51 RETRY_SECONDS = 60
52 RETRY_INTERVAL = 1
53
54 CONFIGURATION_OPTIONS = (
55     # dict key           section     key               default value
56     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
57     ('testDuration', 'general', 'test_duration', 5.0),
58     ('testPrecision', 'general', 'test_precision', 1.0),
59     ('tests', 'general', 'tests', None),
60     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
61
62     ('logFile', 'logging', 'file', 'dats.log'),
63     ('logDateFormat', 'logging', 'datefmt', None),
64     ('logLevel', 'logging', 'level', 'INFO'),
65     ('logOverwrite', 'logging', 'overwrite', 1),
66
67     ('testerIp', 'tester', 'ip', None),
68     ('testerUser', 'tester', 'user', 'root'),
69     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
70     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
71     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
72     ('testerSocketId', 'tester', 'socket_id', 0),
73
74     ('sutIp', 'sut', 'ip', None),
75     ('sutUser', 'sut', 'user', 'root'),
76     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
77     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
78     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
79     ('sutSocketId', 'sut', 'socket_id', 0),
80 )
81
82
83 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
84     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
85
86     def __new__(cls, *args):
87         try:
88             matches = cls.CORE_RE.search(str(args[0]))
89             if matches:
90                 args = matches.groups()
91
92             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
93                                                        'h' if args[2] else '')
94
95         except (AttributeError, TypeError, IndexError, ValueError):
96             raise ValueError('Invalid core spec {}'.format(args))
97
98     def is_hyperthread(self):
99         return self.hyperthread == 'h'
100
101     @property
102     def index(self):
103         return int(self.is_hyperthread())
104
105     def find_in_topology(self, cpu_topology):
106         try:
107             socket_core_match = cpu_topology[self.socket_id][self.core_id]
108             sorted_match = sorted(socket_core_match.values())
109             return sorted_match[self.index][0]
110         except (KeyError, IndexError):
111             template = "Core {}{} on socket {} does not exist"
112             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
113
114
115 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
116     def __new__(cls, *args):
117         try:
118             assert args[0] is not str(args[0])
119             args = tuple(args[0])
120         except (AssertionError, IndexError, TypeError):
121             pass
122
123         return super(TotStatsTuple, cls).__new__(cls, *args)
124
125
126 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
127                                                         'delta_tx,delta_tsc,'
128                                                         'latency,rx_total,tx_total,'
129                                                         'requested_pps')):
130     @property
131     def pkt_loss(self):
132         try:
133             return 1e2 * self.drop_total / float(self.tx_total)
134         except ZeroDivisionError:
135             return 100.0
136
137     @property
138     def tx_mpps(self):
139         # calculate the effective throughput in Mpps
140         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
141
142     @property
143     def rx_mpps(self):
144         # calculate the effective throughput in Mpps
145         return float(self.delta_rx) * self.tsc_hz / self.delta_tsc / 1e6
146
147     @property
148     def can_be_lost(self):
149         return int(self.tx_total * self.tolerated / 1e2)
150
151     @property
152     def drop_total(self):
153         return self.tx_total - self.rx_total
154
155     @property
156     def success(self):
157         return self.drop_total <= self.can_be_lost
158
159     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
160         if pkt_loss is None:
161             pkt_loss = self.pkt_loss
162
163         if port_samples is None:
164             port_samples = {}
165
166         latency_keys = [
167             "LatencyMin",
168             "LatencyMax",
169             "LatencyAvg",
170         ]
171
172         samples = {
173             "Throughput": self.rx_mpps,
174             "RxThroughput": self.rx_mpps,
175             "DropPackets": pkt_loss,
176             "CurrentDropPackets": pkt_loss,
177             "RequestedTxThroughput": self.requested_pps / 1e6,
178             "TxThroughput": self.tx_mpps,
179             "PktSize": pkt_size,
180         }
181         if port_samples:
182             samples.update(port_samples)
183
184         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
185         return samples
186
187     def log_data(self, logger=None):
188         if logger is None:
189             logger = LOG_RESULT
190
191         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
192         logger.info(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
193         logger.info("Mpps configured: %f; Mpps generated %f; Mpps received %f",
194                     self.requested_pps / 1e6, self.tx_mpps, self.rx_mpps)
195
196
197 class PacketDump(object):
198     @staticmethod
199     def assert_func(func, value1, value2, template=None):
200         assert func(value1, value2), template.format(value1, value2)
201
202     def __init__(self, port_id, data_len, payload):
203         template = "Packet dump has specified length {}, but payload is {} bytes long"
204         self.assert_func(operator.eq, data_len, len(payload), template)
205         self._port_id = port_id
206         self._data_len = data_len
207         self._payload = payload
208
209     @property
210     def port_id(self):
211         """Get the port id of the packet dump"""
212         return self._port_id
213
214     @property
215     def data_len(self):
216         """Get the length of the data received"""
217         return self._data_len
218
219     def __str__(self):
220         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
221
222     def payload(self, start=None, end=None):
223         """Get part of the payload as a list of ordinals.
224
225         Returns a list of byte values, matching the contents of the packet dump.
226         Optional start and end parameters can be specified to retrieve only a
227         part of the packet contents.
228
229         The number of elements in the list is equal to end - start + 1, so end
230         is the offset of the last character.
231
232         Args:
233             start (pos. int): the starting offset in the payload. If it is not
234                 specified or None, offset 0 is assumed.
235             end (pos. int): the ending offset of the payload. If it is not
236                 specified or None, the contents until the end of the packet are
237                 returned.
238
239         Returns:
240             [int, int, ...]. Each int represents the ordinal value of a byte in
241             the packet payload.
242         """
243         if start is None:
244             start = 0
245
246         if end is None:
247             end = self.data_len - 1
248
249         # Bounds checking on offsets
250         template = "Start offset must be non-negative"
251         self.assert_func(operator.ge, start, 0, template)
252
253         template = "End offset must be less than {1}"
254         self.assert_func(operator.lt, end, self.data_len, template)
255
256         # Adjust for splice operation: end offset must be 1 more than the offset
257         # of the last desired character.
258         end += 1
259
260         return self._payload[start:end]
261
262
263 class ProxSocketHelper(object):
264
265     def __init__(self, sock=None):
266         """ creates new prox instance """
267         super(ProxSocketHelper, self).__init__()
268
269         if sock is None:
270             sock = socket.socket()
271
272         self._sock = sock
273         self._pkt_dumps = []
274         self.master_stats = None
275
276     def connect(self, ip, port):
277         """Connect to the prox instance on the remote system"""
278         self._sock.connect((ip, port))
279
280     def get_socket(self):
281         """ get the socket connected to the remote instance """
282         return self._sock
283
284     def _parse_socket_data(self, decoded_data, pkt_dump_only):
285         def get_newline_index():
286             return decoded_data.find('\n', index)
287
288         ret_str = ''
289         index = 0
290         for newline_index in iter(get_newline_index, -1):
291             ret_str = decoded_data[index:newline_index]
292
293             try:
294                 mode, port_id, data_len = ret_str.split(',', 2)
295             except ValueError:
296                 mode, port_id, data_len = None, None, None
297
298             if mode != 'pktdump':
299                 # Regular 1-line message. Stop reading from the socket.
300                 LOG.debug("Regular response read")
301                 return ret_str, True
302
303             LOG.debug("Packet dump header read: [%s]", ret_str)
304
305             # The line is a packet dump header. Parse it, read the
306             # packet payload, store the dump for later retrieval.
307             # Skip over the packet dump and continue processing: a
308             # 1-line response may follow the packet dump.
309
310             data_len = int(data_len)
311             data_start = newline_index + 1  # + 1 to skip over \n
312             data_end = data_start + data_len
313             sub_data = decoded_data[data_start:data_end]
314             pkt_payload = array.array('B', (ord(v) for v in sub_data))
315             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
316             self._pkt_dumps.append(pkt_dump)
317
318             if pkt_dump_only:
319                 # Return boolean instead of string to signal
320                 # successful reception of the packet dump.
321                 LOG.debug("Packet dump stored, returning")
322                 return True, False
323
324             index = data_end + 1
325
326         return ret_str, False
327
328     def get_data(self, pkt_dump_only=False, timeout=1):
329         """ read data from the socket """
330
331         # This method behaves slightly differently depending on whether it is
332         # called to read the response to a command (pkt_dump_only = 0) or if
333         # it is called specifically to read a packet dump (pkt_dump_only = 1).
334         #
335         # Packet dumps look like:
336         #   pktdump,<port_id>,<data_len>\n
337         #   <packet contents as byte array>\n
338         # This means the total packet dump message consists of 2 lines instead
339         # of 1 line.
340         #
341         # - Response for a command (pkt_dump_only = 0):
342         #   1) Read response from the socket until \n (end of message)
343         #   2a) If the response is a packet dump header (starts with "pktdump,"):
344         #     - Read the packet payload and store the packet dump for later
345         #       retrieval.
346         #     - Reset the state and restart from 1). Eventually state 2b) will
347         #       be reached and the function will return.
348         #   2b) If the response is not a packet dump:
349         #     - Return the received message as a string
350         #
351         # - Explicit request to read a packet dump (pkt_dump_only = 1):
352         #   - Read the dump header and payload
353         #   - Store the packet dump for later retrieval
354         #   - Return True to signify a packet dump was successfully read
355
356         def is_ready():
357             # recv() is blocking, so avoid calling it when no data is waiting.
358             ready = select.select([self._sock], [], [], timeout)
359             return bool(ready[0])
360
361         status = False
362         ret_str = ""
363         for status in iter(is_ready, False):
364             decoded_data = self._sock.recv(256).decode('utf-8')
365             ret_str, done = self._parse_socket_data(decoded_data, pkt_dump_only)
366             if (done):
367                 break
368
369         LOG.debug("Received data from socket: [%s]", ret_str)
370         return ret_str if status else ''
371
372     def put_command(self, to_send):
373         """ send data to the remote instance """
374         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
375         try:
376             # NOTE: sendall will block, we need a timeout
377             self._sock.sendall(to_send.encode('utf-8'))
378         except:  # pylint: disable=bare-except
379             pass
380
381     def get_packet_dump(self):
382         """ get the next packet dump """
383         if self._pkt_dumps:
384             return self._pkt_dumps.pop(0)
385         return None
386
387     def stop_all_reset(self):
388         """ stop the remote instance and reset stats """
389         LOG.debug("Stop all and reset stats")
390         self.stop_all()
391         self.reset_stats()
392
393     def stop_all(self):
394         """ stop all cores on the remote instance """
395         LOG.debug("Stop all")
396         self.put_command("stop all\n")
397         time.sleep(3)
398
399     def stop(self, cores, task=''):
400         """ stop specific cores on the remote instance """
401         LOG.debug("Stopping cores %s", cores)
402         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
403         time.sleep(3)
404
405     def start_all(self):
406         """ start all cores on the remote instance """
407         LOG.debug("Start all")
408         self.put_command("start all\n")
409
410     def start(self, cores):
411         """ start specific cores on the remote instance """
412         LOG.debug("Starting cores %s", cores)
413         self.put_command("start {}\n".format(join_non_strings(',', cores)))
414         time.sleep(3)
415
416     def reset_stats(self):
417         """ reset the statistics on the remote instance """
418         LOG.debug("Reset stats")
419         self.put_command("reset stats\n")
420         time.sleep(1)
421
422     def _run_template_over_cores(self, template, cores, *args):
423         for core in cores:
424             self.put_command(template.format(core, *args))
425
426     def set_pkt_size(self, cores, pkt_size):
427         """ set the packet size to generate on the remote instance """
428         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
429         pkt_size -= 4
430         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
431         time.sleep(1)
432
433     def set_value(self, cores, offset, value, length):
434         """ set value on the remote instance """
435         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
436         LOG.debug(msg, cores, value, length, offset)
437         template = "set value {} 0 {} {} {}\n"
438         self._run_template_over_cores(template, cores, offset, value, length)
439
440     def reset_values(self, cores):
441         """ reset values on the remote instance """
442         LOG.debug("Set value for core(s) %s", cores)
443         self._run_template_over_cores("reset values {} 0\n", cores)
444
445     def set_speed(self, cores, speed, tasks=None):
446         """ set speed on the remote instance """
447         if tasks is None:
448             tasks = [0] * len(cores)
449         elif len(tasks) != len(cores):
450             LOG.error("set_speed: cores and tasks must have the same len")
451         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
452         for (core, task) in list(zip(cores, tasks)):
453             self.put_command("speed {} {} {}\n".format(core, task, speed))
454
455     def slope_speed(self, cores_speed, duration, n_steps=0):
456         """will start to increase speed from 0 to N where N is taken from
457         a['speed'] for each a in cores_speed"""
458         # by default, each step will take 0.5 sec
459         if n_steps == 0:
460             n_steps = duration * 2
461
462         private_core_data = []
463         step_duration = float(duration) / n_steps
464         for core_data in cores_speed:
465             target = float(core_data['speed'])
466             private_core_data.append({
467                 'cores': core_data['cores'],
468                 'zero': 0,
469                 'delta': target / n_steps,
470                 'current': 0,
471                 'speed': target,
472             })
473
474         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
475         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
476             time.sleep(step_duration)
477             for core_data in private_core_data:
478                 core_data['current'] = core_data[key1] + core_data[key2]
479                 self.set_speed(core_data['cores'], core_data['current'])
480
481     def set_pps(self, cores, pps, pkt_size,
482                 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
483         """ set packets per second for specific cores on the remote instance """
484         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
485         LOG.debug(msg, cores, pps, pkt_size)
486
487         # speed in percent of line-rate
488         speed = float(pps) * (pkt_size + 20) / line_speed / BITS_PER_BYTE
489         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
490
491     def lat_stats(self, cores, task=0):
492         """Get the latency statistics from the remote system"""
493         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
494         lat_min = {}
495         lat_max = {}
496         lat_avg = {}
497         for core in cores:
498             self.put_command("lat stats {} {} \n".format(core, task))
499             ret = self.get_data()
500
501             try:
502                 lat_min[core], lat_max[core], lat_avg[core] = \
503                     tuple(int(n) for n in ret.split(",")[:3])
504
505             except (AttributeError, ValueError, TypeError):
506                 pass
507
508         return lat_min, lat_max, lat_avg
509
510     def get_all_tot_stats(self):
511         self.put_command("tot stats\n")
512         all_stats_str = self.get_data().split(",")
513         if len(all_stats_str) != 4:
514             all_stats = [0] * 4
515             return all_stats
516         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
517         self.master_stats = all_stats
518         return all_stats
519
520     def hz(self):
521         return self.get_all_tot_stats()[3]
522
523     def core_stats(self, cores, task=0):
524         """Get the receive statistics from the remote system"""
525         rx = tx = drop = tsc = 0
526         for core in cores:
527             self.put_command("core stats {} {}\n".format(core, task))
528             ret = self.get_data().split(",")
529             rx += int(ret[0])
530             tx += int(ret[1])
531             drop += int(ret[2])
532             tsc = int(ret[3])
533         return rx, tx, drop, tsc
534
535     def port_stats(self, ports):
536         """get counter values from a specific port"""
537         tot_result = [0] * 12
538         for port in ports:
539             self.put_command("port_stats {}\n".format(port))
540             ret = [try_int(s, 0) for s in self.get_data().split(",")]
541             tot_result = [sum(x) for x in zip(tot_result, ret)]
542         return tot_result
543
544     @contextmanager
545     def measure_tot_stats(self):
546         start = self.get_all_tot_stats()
547         container = {'start_tot': start}
548         try:
549             yield container
550         finally:
551             container['end_tot'] = end = self.get_all_tot_stats()
552
553         container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
554
555     def tot_stats(self):
556         """Get the total statistics from the remote system"""
557         stats = self.get_all_tot_stats()
558         return stats[:3]
559
560     def tot_ierrors(self):
561         """Get the total ierrors from the remote system"""
562         self.put_command("tot ierrors tot\n")
563         recv = self.get_data().split(',')
564         tot_ierrors = int(recv[0])
565         tsc = int(recv[0])
566         return tot_ierrors, tsc
567
568     def set_count(self, count, cores):
569         """Set the number of packets to send on the specified core"""
570         self._run_template_over_cores("count {} 0 {}\n", cores, count)
571
572     def dump_rx(self, core_id, task_id=0, count=1):
573         """Activate dump on rx on the specified core"""
574         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
575         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
576         time.sleep(1.5)  # Give PROX time to set up packet dumping
577
578     def quit(self):
579         self.stop_all()
580         self._quit()
581         self.force_quit()
582
583     def _quit(self):
584         """ stop all cores on the remote instance """
585         LOG.debug("Quit prox")
586         self.put_command("quit\n")
587         time.sleep(3)
588
589     def force_quit(self):
590         """ stop all cores on the remote instance """
591         LOG.debug("Force Quit prox")
592         self.put_command("quit_force\n")
593         time.sleep(3)
594
595
596 _LOCAL_OBJECT = object()
597
598
599 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
600     # the actual app is lowercase
601     APP_NAME = 'prox'
602     # not used for Prox but added for consistency
603     VNF_TYPE = "PROX"
604
605     LUA_PARAMETER_NAME = ""
606     LUA_PARAMETER_PEER = {
607         "gen": "sut",
608         "sut": "gen",
609     }
610
611     CONFIG_QUEUE_TIMEOUT = 120
612
613     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
614         self.remote_path = None
615         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
616         self.remote_prox_file_name = None
617         self._prox_config_data = None
618         self.additional_files = {}
619         self.config_queue = Queue()
620         # allow_exit_without_flush
621         self.config_queue.cancel_join_thread()
622         self._global_section = None
623
624     @property
625     def prox_config_data(self):
626         if self._prox_config_data is None:
627             # this will block, but it needs too
628             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
629         return self._prox_config_data
630
631     @property
632     def global_section(self):
633         if self._global_section is None and self.prox_config_data:
634             self._global_section = self.find_section("global")
635         return self._global_section
636
637     def find_section(self, name, default=_LOCAL_OBJECT):
638         result = next((value for key, value in self.prox_config_data if key == name), default)
639         if result is _LOCAL_OBJECT:
640             raise KeyError('{} not found in Prox config'.format(name))
641         return result
642
643     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
644         section = self.find_section(section_name, [])
645         result = next((value for key, value in section if key == section_key), default)
646         if result is _LOCAL_OBJECT:
647             template = '{} not found in {} section of Prox config'
648             raise KeyError(template.format(section_key, section_name))
649         return result
650
651     def copy_to_target(self, config_file_path, prox_file):
652         remote_path = os.path.join("/tmp", prox_file)
653         self.ssh_helper.put(config_file_path, remote_path)
654         return remote_path
655
656     @staticmethod
657     def _get_tx_port(section, sections):
658         iface_port = [-1]
659         for item in sections[section]:
660             if item[0] == "tx port":
661                 iface_port = re.findall(r'\d+', item[1])
662                 # do we want the last one?
663                 #   if yes, then can we reverse?
664         return int(iface_port[0])
665
666     @staticmethod
667     def _replace_quoted_with_value(quoted, value, count=1):
668         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
669         return new_string
670
671     def _insert_additional_file(self, value):
672         file_str = value.split('"')
673         base_name = os.path.basename(file_str[1])
674         file_str[1] = self.additional_files[base_name]
675         return '"'.join(file_str)
676
677     def generate_prox_config_file(self, config_path):
678         sections = []
679         prox_config = ConfigParser(config_path, sections)
680         prox_config.parse()
681
682         # Ensure MAC is set "hardware"
683         all_ports = self.vnfd_helper.port_pairs.all_ports
684         # use dpdk port number
685         for port_name in all_ports:
686             port_num = self.vnfd_helper.port_num(port_name)
687             port_section_name = "port {}".format(port_num)
688             for section_name, section in sections:
689                 if port_section_name != section_name:
690                     continue
691
692                 for section_data in section:
693                     if section_data[0] == "mac":
694                         section_data[1] = "hardware"
695
696         # search for dst mac
697         for _, section in sections:
698             for section_data in section:
699                 item_key, item_val = section_data
700                 if item_val.startswith("@@dst_mac"):
701                     tx_port_iter = re.finditer(r'\d+', item_val)
702                     tx_port_no = int(next(tx_port_iter).group(0))
703                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
704                     mac = intf["virtual-interface"]["dst_mac"]
705                     section_data[1] = mac.replace(":", " ", 6)
706
707                 if item_key == "dst mac" and item_val.startswith("@@"):
708                     tx_port_iter = re.finditer(r'\d+', item_val)
709                     tx_port_no = int(next(tx_port_iter).group(0))
710                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
711                     mac = intf["virtual-interface"]["dst_mac"]
712                     section_data[1] = mac
713
714                 if item_val.startswith("@@src_mac"):
715                     tx_port_iter = re.finditer(r'\d+', item_val)
716                     tx_port_no = int(next(tx_port_iter).group(0))
717                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
718                     mac = intf["virtual-interface"]["local_mac"]
719                     section_data[1] = mac.replace(":", " ", 6)
720
721                 if item_key == "src mac" and item_val.startswith("@@"):
722                     tx_port_iter = re.finditer(r'\d+', item_val)
723                     tx_port_no = int(next(tx_port_iter).group(0))
724                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
725                     mac = intf["virtual-interface"]["local_mac"]
726                     section_data[1] = mac
727
728         # if addition file specified in prox config
729         if not self.additional_files:
730             return sections
731
732         for section_name, section in sections:
733             for section_data in section:
734                 try:
735                     if section_data[0].startswith("dofile"):
736                         section_data[0] = self._insert_additional_file(section_data[0])
737
738                     if section_data[1].startswith("dofile"):
739                         section_data[1] = self._insert_additional_file(section_data[1])
740                 except:  # pylint: disable=bare-except
741                     pass
742
743         return sections
744
745     @staticmethod
746     def write_prox_lua(lua_config):
747         """
748         Write an .ini-format config file for PROX (parameters.lua)
749         PROX does not allow a space before/after the =, so we need
750         a custom method
751         """
752         out = []
753         for key in lua_config:
754             value = '"' + lua_config[key] + '"'
755             if key == "__name__":
756                 continue
757             if value is not None and value != '@':
758                 key = "=".join((key, str(value).replace('\n', '\n\t')))
759                 out.append(key)
760             else:
761                 key = str(key).replace('\n', '\n\t')
762                 out.append(key)
763         return os.linesep.join(out)
764
765     @staticmethod
766     def write_prox_config(prox_config):
767         """
768         Write an .ini-format config file for PROX
769         PROX does not allow a space before/after the =, so we need
770         a custom method
771         """
772         out = []
773         for (section_name, section) in prox_config:
774             out.append("[{}]".format(section_name))
775             for item in section:
776                 key, value = item
777                 if key == "__name__":
778                     continue
779                 if value is not None and value != '@':
780                     key = "=".join((key, str(value).replace('\n', '\n\t')))
781                     out.append(key)
782                 else:
783                     key = str(key).replace('\n', '\n\t')
784                     out.append(key)
785         return os.linesep.join(out)
786
787     def put_string_to_file(self, s, remote_path):
788         file_obj = cStringIO(s)
789         self.ssh_helper.put_file_obj(file_obj, remote_path)
790         return remote_path
791
792     def generate_prox_lua_file(self):
793         p = OrderedDict()
794         all_ports = self.vnfd_helper.port_pairs.all_ports
795         for port_name in all_ports:
796             port_num = self.vnfd_helper.port_num(port_name)
797             intf = self.vnfd_helper.find_interface(name=port_name)
798             vintf = intf['virtual-interface']
799             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
800             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
801
802         return p
803
804     def upload_prox_lua(self, config_file, lua_data):
805         # prox can't handle spaces around ' = ' so use custom method
806         out = StringIO(self.write_prox_lua(lua_data))
807         out.seek(0)
808         remote_path = os.path.join("/tmp", config_file)
809         self.ssh_helper.put_file_obj(out, remote_path)
810
811         return remote_path
812
813     def upload_prox_config(self, config_file, prox_config_data):
814         # prox can't handle spaces around ' = ' so use custom method
815         out = StringIO(self.write_prox_config(prox_config_data))
816         out.seek(0)
817         remote_path = os.path.join("/tmp", config_file)
818         self.ssh_helper.put_file_obj(out, remote_path)
819
820         return remote_path
821
822     def build_config_file(self):
823         task_path = self.scenario_helper.task_path
824         options = self.scenario_helper.options
825         config_path = options['prox_config']
826         config_file = os.path.basename(config_path)
827         config_path = utils.find_relative_file(config_path, task_path)
828         self.additional_files = {}
829
830         try:
831             if options['prox_generate_parameter']:
832                 self.lua = []
833                 self.lua = self.generate_prox_lua_file()
834                 if len(self.lua) > 0:
835                     self.upload_prox_lua("parameters.lua", self.lua)
836         except:  # pylint: disable=bare-except
837             pass
838
839         prox_files = options.get('prox_files', [])
840         if isinstance(prox_files, six.string_types):
841             prox_files = [prox_files]
842         for key_prox_file in prox_files:
843             base_prox_file = os.path.basename(key_prox_file)
844             key_prox_path = utils.find_relative_file(key_prox_file, task_path)
845             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
846             self.additional_files[base_prox_file] = remote_prox_file
847
848         self._prox_config_data = self.generate_prox_config_file(config_path)
849         # copy config to queue so we can read it from traffic_runner process
850         self.config_queue.put(self._prox_config_data)
851         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
852
853     def build_config(self):
854         self.build_config_file()
855
856         options = self.scenario_helper.options
857         prox_args = options['prox_args']
858         tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
859
860         self.pipeline_kwargs = {
861             'tool_path': tool_path,
862             'tool_dir': os.path.dirname(tool_path),
863             'cfg_file': self.remote_path,
864             'args': ' '.join(' '.join([str(k), str(v) if v else ''])
865                              for k, v in prox_args.items())
866         }
867
868         cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
869                         "{args} -f {cfg_file} '")
870         return cmd_template.format(**self.pipeline_kwargs)
871
872
873 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
874 class ProxResourceHelper(ClientResourceHelper):
875
876     RESOURCE_WORD = 'prox'
877
878     PROX_MODE = ""
879
880     WAIT_TIME = 3
881
882     @staticmethod
883     def find_pci(pci, bound_pci):
884         # we have to substring match PCI bus address from the end
885         return any(b.endswith(pci) for b in bound_pci)
886
887     def __init__(self, setup_helper):
888         super(ProxResourceHelper, self).__init__(setup_helper)
889         self.mgmt_interface = self.vnfd_helper.mgmt_interface
890         self._user = self.mgmt_interface["user"]
891         self._ip = self.mgmt_interface["ip"]
892
893         self.done = False
894         self._vpci_to_if_name_map = None
895         self.additional_file = {}
896         self.remote_prox_file_name = None
897         self.lower = None
898         self.upper = None
899         self.step_delta = 1
900         self.step_time = 0.5
901         self._test_type = None
902
903     @property
904     def sut(self):
905         if not self.client:
906             self.client = self._connect()
907         return self.client
908
909     @property
910     def test_type(self):
911         if self._test_type is None:
912             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
913         return self._test_type
914
915     def run_traffic(self, traffic_profile):
916         self._queue.cancel_join_thread()
917         self.lower = 0.0
918         self.upper = 100.0
919
920         traffic_profile.init(self._queue)
921         # this frees up the run_traffic loop
922         self.client_started.value = 1
923
924         while not self._terminated.value:
925             # move it all to traffic_profile
926             self._run_traffic_once(traffic_profile)
927
928     def _run_traffic_once(self, traffic_profile):
929         traffic_profile.execute_traffic(self)
930         if traffic_profile.done:
931             self._queue.put({'done': True})
932             LOG.debug("tg_prox done")
933             self._terminated.value = 1
934
935     # For VNF use ResourceHelper method to collect KPIs directly.
936     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
937     def collect_collectd_kpi(self):
938         return self._collect_resource_kpi()
939
940     def collect_kpi(self):
941         result = super(ProxResourceHelper, self).collect_kpi()
942         # add in collectd kpis manually
943         if result:
944             result['collect_stats'] = self._collect_resource_kpi()
945         return result
946
947     def terminate(self):
948         # should not be called, use VNF terminate
949         raise NotImplementedError()
950
951     def up_post(self):
952         return self.sut  # force connection
953
954     def execute(self, cmd, *args, **kwargs):
955         func = getattr(self.sut, cmd, None)
956         if func:
957             return func(*args, **kwargs)
958         return None
959
960     def _connect(self, client=None):
961         """Run and connect to prox on the remote system """
962         # De-allocating a large amount of hugepages takes some time. If a new
963         # PROX instance is started immediately after killing the previous one,
964         # it might not be able to allocate hugepages, because they are still
965         # being freed. Hence the -w switch.
966         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
967         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
968         # -f ./handle_none-4.cfg"
969         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
970         #  "; " \
971         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
972         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
973         # sudo " \
974         #    + "./build/Prox " + prox_args
975         # log.debug("Starting PROX with command [%s]", prox_cmd)
976         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
977         # self._ip, prox_cmd))
978         if client is None:
979             client = ProxSocketHelper()
980
981         # try connecting to Prox for 60s
982         for _ in range(RETRY_SECONDS):
983             time.sleep(RETRY_INTERVAL)
984             try:
985                 client.connect(self._ip, PROX_PORT)
986             except (socket.gaierror, socket.error):
987                 continue
988             else:
989                 return client
990
991         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
992         raise Exception(msg.format(self._ip, PROX_PORT))
993
994
995 class ProxDataHelper(object):
996
997     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss, line_speed):
998         super(ProxDataHelper, self).__init__()
999         self.vnfd_helper = vnfd_helper
1000         self.sut = sut
1001         self.pkt_size = pkt_size
1002         self.value = value
1003         self.line_speed = line_speed
1004         self.tolerated_loss = tolerated_loss
1005         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
1006         self.tsc_hz = None
1007         self.measured_stats = None
1008         self.latency = None
1009         self._totals_and_pps = None
1010         self.result_tuple = None
1011
1012     @property
1013     def totals_and_pps(self):
1014         if self._totals_and_pps is None:
1015             rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
1016             requested_pps = self.value / 100.0 * self.line_rate_to_pps()
1017             self._totals_and_pps = rx_total, tx_total, requested_pps
1018         return self._totals_and_pps
1019
1020     @property
1021     def rx_total(self):
1022         return self.totals_and_pps[0]
1023
1024     @property
1025     def tx_total(self):
1026         return self.totals_and_pps[1]
1027
1028     @property
1029     def requested_pps(self):
1030         return self.totals_and_pps[2]
1031
1032     @property
1033     def samples(self):
1034         samples = {}
1035         for port_name, port_num in self.vnfd_helper.ports_iter():
1036             try:
1037                 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1038                 samples[port_name] = {
1039                     "in_packets": port_rx_total,
1040                     "out_packets": port_tx_total,
1041                 }
1042             except (KeyError, TypeError, NameError, MemoryError, ValueError,
1043                     SystemError, BufferError):
1044                 samples[port_name] = {
1045                     "in_packets": 0,
1046                     "out_packets": 0,
1047                 }
1048         return samples
1049
1050     def __enter__(self):
1051         self.check_interface_count()
1052         return self
1053
1054     def __exit__(self, exc_type, exc_val, exc_tb):
1055         self.make_tuple()
1056
1057     def make_tuple(self):
1058         if self.result_tuple:
1059             return
1060
1061         self.result_tuple = ProxTestDataTuple(
1062             self.tolerated_loss,
1063             self.tsc_hz,
1064             self.measured_stats['delta'].rx,
1065             self.measured_stats['delta'].tx,
1066             self.measured_stats['delta'].tsc,
1067             self.latency,
1068             self.rx_total,
1069             self.tx_total,
1070             self.requested_pps,
1071         )
1072         self.result_tuple.log_data()
1073
1074     @contextmanager
1075     def measure_tot_stats(self):
1076         with self.sut.measure_tot_stats() as self.measured_stats:
1077             yield
1078
1079     def check_interface_count(self):
1080         # do this assert in init?  unless we expect interface count to
1081         # change from one run to another run...
1082         assert self.port_count in {1, 2, 4}, \
1083             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1084
1085     def capture_tsc_hz(self):
1086         self.tsc_hz = float(self.sut.hz())
1087
1088     def line_rate_to_pps(self):
1089       return self.port_count * self.line_speed  / BITS_PER_BYTE / (self.pkt_size + 20)
1090
1091 class ProxProfileHelper(object):
1092
1093     __prox_profile_type__ = "Generic"
1094
1095     PROX_CORE_GEN_MODE = "gen"
1096     PROX_CORE_LAT_MODE = "lat"
1097
1098     @classmethod
1099     def get_cls(cls, helper_type):
1100         """Return class of specified type."""
1101         if not helper_type:
1102             return ProxProfileHelper
1103
1104         for profile_helper_class in utils.itersubclasses(cls):
1105             if helper_type == profile_helper_class.__prox_profile_type__:
1106                 return profile_helper_class
1107
1108         return ProxProfileHelper
1109
1110     @classmethod
1111     def make_profile_helper(cls, resource_helper):
1112         return cls.get_cls(resource_helper.test_type)(resource_helper)
1113
1114     def __init__(self, resource_helper):
1115         super(ProxProfileHelper, self).__init__()
1116         self.resource_helper = resource_helper
1117         self._cpu_topology = None
1118         self._test_cores = None
1119         self._latency_cores = None
1120
1121     @property
1122     def cpu_topology(self):
1123         if not self._cpu_topology:
1124             stdout = io.BytesIO()
1125             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1126             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1127         return self._cpu_topology
1128
1129     @property
1130     def test_cores(self):
1131         if not self._test_cores:
1132             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1133         return self._test_cores
1134
1135     @property
1136     def latency_cores(self):
1137         if not self._latency_cores:
1138             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1139         return self._latency_cores
1140
1141     @contextmanager
1142     def traffic_context(self, pkt_size, value):
1143         self.sut.stop_all()
1144         self.sut.reset_stats()
1145         try:
1146             self.sut.set_pkt_size(self.test_cores, pkt_size)
1147             self.sut.set_speed(self.test_cores, value)
1148             self.sut.start_all()
1149             time.sleep(1)
1150             yield
1151         finally:
1152             self.sut.stop_all()
1153
1154     def get_cores(self, mode):
1155         cores = []
1156
1157         for section_name, section in self.setup_helper.prox_config_data:
1158             if not section_name.startswith("core"):
1159                 continue
1160
1161             for key, value in section:
1162                 if key == "mode" and value == mode:
1163                     core_tuple = CoreSocketTuple(section_name)
1164                     core = core_tuple.core_id
1165                     cores.append(core)
1166
1167         return cores
1168
1169     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1170                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1171         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1172                                      value, tolerated_loss, line_speed)
1173
1174         with data_helper, self.traffic_context(pkt_size, value):
1175             with data_helper.measure_tot_stats():
1176                 time.sleep(duration)
1177                 # Getting statistics to calculate PPS at right speed....
1178                 data_helper.capture_tsc_hz()
1179                 data_helper.latency = self.get_latency()
1180
1181         return data_helper.result_tuple, data_helper.samples
1182
1183     def get_latency(self):
1184         """
1185         :return: return lat_min, lat_max, lat_avg
1186         :rtype: list
1187         """
1188
1189         if not self._latency_cores:
1190             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1191
1192         if self._latency_cores:
1193             return self.sut.lat_stats(self._latency_cores)
1194         return []
1195
1196     def terminate(self):
1197         pass
1198
1199     def __getattr__(self, item):
1200         return getattr(self.resource_helper, item)
1201
1202
1203 class ProxMplsProfileHelper(ProxProfileHelper):
1204
1205     __prox_profile_type__ = "MPLS tag/untag"
1206
1207     def __init__(self, resource_helper):
1208         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1209         self._cores_tuple = None
1210
1211     @property
1212     def mpls_cores(self):
1213         if not self._cores_tuple:
1214             self._cores_tuple = self.get_cores_mpls()
1215         return self._cores_tuple
1216
1217     @property
1218     def tagged_cores(self):
1219         return self.mpls_cores[0]
1220
1221     @property
1222     def plain_cores(self):
1223         return self.mpls_cores[1]
1224
1225     def get_cores_mpls(self):
1226         cores_tagged = []
1227         cores_plain = []
1228         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1229             if not section_name.startswith("core"):
1230                 continue
1231
1232             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1233                 continue
1234
1235             for item_key, item_value in section:
1236                 if item_key != 'name':
1237                     continue
1238
1239                 if item_value.startswith("tag"):
1240                     core_tuple = CoreSocketTuple(section_name)
1241                     core_tag = core_tuple.core_id
1242                     cores_tagged.append(core_tag)
1243
1244                 elif item_value.startswith("udp"):
1245                     core_tuple = CoreSocketTuple(section_name)
1246                     core_udp = core_tuple.core_id
1247                     cores_plain.append(core_udp)
1248
1249         return cores_tagged, cores_plain
1250
1251     @contextmanager
1252     def traffic_context(self, pkt_size, value):
1253         self.sut.stop_all()
1254         self.sut.reset_stats()
1255         try:
1256             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1257             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1258             self.sut.set_speed(self.tagged_cores, value)
1259             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1260             self.sut.set_speed(self.plain_cores, value * ratio)
1261             self.sut.start_all()
1262             time.sleep(1)
1263             yield
1264         finally:
1265             self.sut.stop_all()
1266
1267
1268 class ProxBngProfileHelper(ProxProfileHelper):
1269
1270     __prox_profile_type__ = "BNG gen"
1271
1272     def __init__(self, resource_helper):
1273         super(ProxBngProfileHelper, self).__init__(resource_helper)
1274         self._cores_tuple = None
1275
1276     @property
1277     def bng_cores(self):
1278         if not self._cores_tuple:
1279             self._cores_tuple = self.get_cores_gen_bng_qos()
1280         return self._cores_tuple
1281
1282     @property
1283     def cpe_cores(self):
1284         return self.bng_cores[0]
1285
1286     @property
1287     def inet_cores(self):
1288         return self.bng_cores[1]
1289
1290     @property
1291     def arp_cores(self):
1292         return self.bng_cores[2]
1293
1294     @property
1295     def arp_task_cores(self):
1296         return self.bng_cores[3]
1297
1298     @property
1299     def all_rx_cores(self):
1300         return self.latency_cores
1301
1302     def get_cores_gen_bng_qos(self):
1303         cpe_cores = []
1304         inet_cores = []
1305         arp_cores = []
1306         arp_tasks_core = [0]
1307         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1308             if not section_name.startswith("core"):
1309                 continue
1310
1311             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1312                 continue
1313
1314             for item_key, item_value in section:
1315                 if item_key != 'name':
1316                     continue
1317
1318                 if item_value.startswith("cpe"):
1319                     core_tuple = CoreSocketTuple(section_name)
1320                     cpe_core = core_tuple.core_id
1321                     cpe_cores.append(cpe_core)
1322
1323                 elif item_value.startswith("inet"):
1324                     core_tuple = CoreSocketTuple(section_name)
1325                     inet_core = core_tuple.core_id
1326                     inet_cores.append(inet_core)
1327
1328                 elif item_value.startswith("arp"):
1329                     core_tuple = CoreSocketTuple(section_name)
1330                     arp_core = core_tuple.core_id
1331                     arp_cores.append(arp_core)
1332
1333                 # We check the tasks/core separately
1334                 if item_value.startswith("arp_task"):
1335                     core_tuple = CoreSocketTuple(section_name)
1336                     arp_task_core = core_tuple.core_id
1337                     arp_tasks_core.append(arp_task_core)
1338
1339         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1340
1341     @contextmanager
1342     def traffic_context(self, pkt_size, value):
1343         # Tester is sending packets at the required speed already after
1344         # setup_test(). Just get the current statistics, sleep the required
1345         # amount of time and calculate packet loss.
1346         inet_pkt_size = pkt_size
1347         cpe_pkt_size = pkt_size - 24
1348         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1349
1350         curr_up_speed = curr_down_speed = 0
1351         max_up_speed = max_down_speed = value
1352         if ratio < 1:
1353             max_down_speed = value * ratio
1354         else:
1355             max_up_speed = value / ratio
1356
1357         # Initialize cores
1358         self.sut.stop_all()
1359         time.sleep(0.5)
1360
1361         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1362         # wrong.
1363         self.sut.start(self.all_rx_cores)
1364         time.sleep(0.5)
1365         self.sut.stop(self.all_rx_cores)
1366         time.sleep(0.5)
1367         self.sut.reset_stats()
1368
1369         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1370         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1371
1372         self.sut.reset_values(self.cpe_cores)
1373         self.sut.reset_values(self.inet_cores)
1374
1375         # Set correct IP and UDP lengths in packet headers
1376         # CPE
1377         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1378         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1379         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1380         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1381
1382         # INET
1383         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1384         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1385         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1386         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1387         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1388         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1389
1390         # Sending ARP to initialize tables - need a few seconds of generation
1391         # to make sure all CPEs are initialized
1392         LOG.info("Initializing SUT: sending ARP packets")
1393         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1394         self.sut.set_speed(self.inet_cores, curr_up_speed)
1395         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1396         self.sut.start(self.arp_cores)
1397         time.sleep(4)
1398
1399         # Ramp up the transmission speed. First go to the common speed, then
1400         # increase steps for the faster one.
1401         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1402
1403         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1404
1405         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1406             # The min(..., ...) takes care of 1) floating point rounding errors
1407             # that could make curr_*_speed to be slightly greater than
1408             # max_*_speed and 2) max_*_speed not being an exact multiple of
1409             # self._step_delta.
1410             if curr_up_speed < max_up_speed:
1411                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1412             if curr_down_speed < max_down_speed:
1413                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1414
1415             self.sut.set_speed(self.inet_cores, curr_up_speed)
1416             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1417             time.sleep(self.step_time)
1418
1419         LOG.info("Target speeds reached. Starting real test.")
1420
1421         yield
1422
1423         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1424         LOG.info("Test ended. Flushing NIC buffers")
1425         self.sut.start(self.all_rx_cores)
1426         time.sleep(3)
1427         self.sut.stop(self.all_rx_cores)
1428
1429     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1430                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1431         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1432                                      value, tolerated_loss, line_speed)
1433
1434         with data_helper, self.traffic_context(pkt_size, value):
1435             with data_helper.measure_tot_stats():
1436                 time.sleep(duration)
1437                 # Getting statistics to calculate PPS at right speed....
1438                 data_helper.capture_tsc_hz()
1439                 data_helper.latency = self.get_latency()
1440
1441         return data_helper.result_tuple, data_helper.samples
1442
1443
1444 class ProxVpeProfileHelper(ProxProfileHelper):
1445
1446     __prox_profile_type__ = "vPE gen"
1447
1448     def __init__(self, resource_helper):
1449         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1450         self._cores_tuple = None
1451         self._ports_tuple = None
1452
1453     @property
1454     def vpe_cores(self):
1455         if not self._cores_tuple:
1456             self._cores_tuple = self.get_cores_gen_vpe()
1457         return self._cores_tuple
1458
1459     @property
1460     def cpe_cores(self):
1461         return self.vpe_cores[0]
1462
1463     @property
1464     def inet_cores(self):
1465         return self.vpe_cores[1]
1466
1467     @property
1468     def all_rx_cores(self):
1469         return self.latency_cores
1470
1471     @property
1472     def vpe_ports(self):
1473         if not self._ports_tuple:
1474             self._ports_tuple = self.get_ports_gen_vpe()
1475         return self._ports_tuple
1476
1477     @property
1478     def cpe_ports(self):
1479         return self.vpe_ports[0]
1480
1481     @property
1482     def inet_ports(self):
1483         return self.vpe_ports[1]
1484
1485     def get_cores_gen_vpe(self):
1486         cpe_cores = []
1487         inet_cores = []
1488         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1489             if not section_name.startswith("core"):
1490                 continue
1491
1492             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1493                 continue
1494
1495             for item_key, item_value in section:
1496                 if item_key != 'name':
1497                     continue
1498
1499                 if item_value.startswith("cpe"):
1500                     core_tuple = CoreSocketTuple(section_name)
1501                     core_tag = core_tuple.core_id
1502                     cpe_cores.append(core_tag)
1503
1504                 elif item_value.startswith("inet"):
1505                     core_tuple = CoreSocketTuple(section_name)
1506                     inet_core = core_tuple.core_id
1507                     inet_cores.append(inet_core)
1508
1509         return cpe_cores, inet_cores
1510
1511     def get_ports_gen_vpe(self):
1512         cpe_ports = []
1513         inet_ports = []
1514
1515         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1516             if not section_name.startswith("port"):
1517                 continue
1518             tx_port_iter = re.finditer(r'\d+', section_name)
1519             tx_port_no = int(next(tx_port_iter).group(0))
1520
1521             for item_key, item_value in section:
1522                 if item_key != 'name':
1523                     continue
1524
1525                 if item_value.startswith("cpe"):
1526                     cpe_ports.append(tx_port_no)
1527
1528                 elif item_value.startswith("inet"):
1529                     inet_ports.append(tx_port_no)
1530
1531         return cpe_ports, inet_ports
1532
1533     @contextmanager
1534     def traffic_context(self, pkt_size, value):
1535         # Calculate the target upload and download speed. The upload and
1536         # download packets have different packet sizes, so in order to get
1537         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1538         # of the packet sizes.
1539         cpe_pkt_size = pkt_size
1540         inet_pkt_size = pkt_size - 4
1541         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1542
1543         curr_up_speed = curr_down_speed = 0
1544         max_up_speed = max_down_speed = value
1545         if ratio < 1:
1546             max_down_speed = value * ratio
1547         else:
1548             max_up_speed = value / ratio
1549
1550         # Adjust speed when multiple cores per port are used to generate traffic
1551         if len(self.cpe_ports) != len(self.cpe_cores):
1552             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1553         if len(self.inet_ports) != len(self.inet_cores):
1554             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1555
1556         # Initialize cores
1557         self.sut.stop_all()
1558         time.sleep(2)
1559
1560         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1561         # wrong.
1562         self.sut.start(self.all_rx_cores)
1563         time.sleep(2)
1564         self.sut.stop(self.all_rx_cores)
1565         time.sleep(2)
1566         self.sut.reset_stats()
1567
1568         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1569         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1570
1571         self.sut.reset_values(self.cpe_cores)
1572         self.sut.reset_values(self.inet_cores)
1573
1574         # Set correct IP and UDP lengths in packet headers
1575         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1576         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1577         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1578         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1579
1580         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1581         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1582         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1583         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1584
1585         self.sut.set_speed(self.inet_cores, curr_up_speed)
1586         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1587
1588         # Ramp up the transmission speed. First go to the common speed, then
1589         # increase steps for the faster one.
1590         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1591
1592         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1593
1594         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1595             # The min(..., ...) takes care of 1) floating point rounding errors
1596             # that could make curr_*_speed to be slightly greater than
1597             # max_*_speed and 2) max_*_speed not being an exact multiple of
1598             # self._step_delta.
1599             if curr_up_speed < max_up_speed:
1600                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1601             if curr_down_speed < max_down_speed:
1602                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1603
1604             self.sut.set_speed(self.inet_cores, curr_up_speed)
1605             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1606             time.sleep(self.step_time)
1607
1608         LOG.info("Target speeds reached. Starting real test.")
1609
1610         yield
1611
1612         self.sut.stop(self.cpe_cores + self.inet_cores)
1613         LOG.info("Test ended. Flushing NIC buffers")
1614         self.sut.start(self.all_rx_cores)
1615         time.sleep(3)
1616         self.sut.stop(self.all_rx_cores)
1617
1618     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1619                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1620         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1621                                      value, tolerated_loss, line_speed)
1622
1623         with data_helper, self.traffic_context(pkt_size, value):
1624             with data_helper.measure_tot_stats():
1625                 time.sleep(duration)
1626                 # Getting statistics to calculate PPS at right speed....
1627                 data_helper.capture_tsc_hz()
1628                 data_helper.latency = self.get_latency()
1629
1630         return data_helper.result_tuple, data_helper.samples
1631
1632
1633 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1634
1635     __prox_profile_type__ = "lwAFTR gen"
1636
1637     def __init__(self, resource_helper):
1638         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1639         self._cores_tuple = None
1640         self._ports_tuple = None
1641         self.step_delta = 5
1642         self.step_time = 0.5
1643
1644     @property
1645     def _lwaftr_cores(self):
1646         if not self._cores_tuple:
1647             self._cores_tuple = self._get_cores_gen_lwaftr()
1648         return self._cores_tuple
1649
1650     @property
1651     def tun_cores(self):
1652         return self._lwaftr_cores[0]
1653
1654     @property
1655     def inet_cores(self):
1656         return self._lwaftr_cores[1]
1657
1658     @property
1659     def _lwaftr_ports(self):
1660         if not self._ports_tuple:
1661             self._ports_tuple = self._get_ports_gen_lw_aftr()
1662         return self._ports_tuple
1663
1664     @property
1665     def tun_ports(self):
1666         return self._lwaftr_ports[0]
1667
1668     @property
1669     def inet_ports(self):
1670         return self._lwaftr_ports[1]
1671
1672     @property
1673     def all_rx_cores(self):
1674         return self.latency_cores
1675
1676     def _get_cores_gen_lwaftr(self):
1677         tun_cores = []
1678         inet_cores = []
1679         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1680             if not section_name.startswith("core"):
1681                 continue
1682
1683             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1684                 continue
1685
1686             core_tuple = CoreSocketTuple(section_name)
1687             core_tag = core_tuple.core_id
1688             for item_value in (v for k, v in section if k == 'name'):
1689                 if item_value.startswith('tun'):
1690                     tun_cores.append(core_tag)
1691                 elif item_value.startswith('inet'):
1692                     inet_cores.append(core_tag)
1693
1694         return tun_cores, inet_cores
1695
1696     def _get_ports_gen_lw_aftr(self):
1697         tun_ports = []
1698         inet_ports = []
1699
1700         re_port = re.compile(r'port (\d+)')
1701         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1702             match = re_port.search(section_name)
1703             if not match:
1704                 continue
1705
1706             tx_port_no = int(match.group(1))
1707             for item_value in (v for k, v in section if k == 'name'):
1708                 if item_value.startswith('lwB4'):
1709                     tun_ports.append(tx_port_no)
1710                 elif item_value.startswith('inet'):
1711                     inet_ports.append(tx_port_no)
1712
1713         return tun_ports, inet_ports
1714
1715     @staticmethod
1716     def _resize(len1, len2):
1717         if len1 == len2:
1718             return 1.0
1719         return 1.0 * len1 / len2
1720
1721     @contextmanager
1722     def traffic_context(self, pkt_size, value):
1723         # Tester is sending packets at the required speed already after
1724         # setup_test(). Just get the current statistics, sleep the required
1725         # amount of time and calculate packet loss.
1726         tun_pkt_size = pkt_size
1727         inet_pkt_size = pkt_size - 40
1728         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1729
1730         curr_up_speed = curr_down_speed = 0
1731         max_up_speed = max_down_speed = value
1732
1733         max_up_speed = value / ratio
1734
1735         # Adjust speed when multiple cores per port are used to generate traffic
1736         if len(self.tun_ports) != len(self.tun_cores):
1737             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1738         if len(self.inet_ports) != len(self.inet_cores):
1739             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1740
1741         # Initialize cores
1742         self.sut.stop_all()
1743         time.sleep(0.5)
1744
1745         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1746         # wrong.
1747         self.sut.start(self.all_rx_cores)
1748         time.sleep(0.5)
1749         self.sut.stop(self.all_rx_cores)
1750         time.sleep(0.5)
1751         self.sut.reset_stats()
1752
1753         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1754         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1755
1756         self.sut.reset_values(self.tun_cores)
1757         self.sut.reset_values(self.inet_cores)
1758
1759         # Set correct IP and UDP lengths in packet headers
1760         # tun
1761         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1762         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1763         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1764         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1765         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1766         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1767
1768         # INET
1769         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1770         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1771         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1772         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1773
1774         LOG.info("Initializing SUT: sending lwAFTR packets")
1775         self.sut.set_speed(self.inet_cores, curr_up_speed)
1776         self.sut.set_speed(self.tun_cores, curr_down_speed)
1777         time.sleep(4)
1778
1779         # Ramp up the transmission speed. First go to the common speed, then
1780         # increase steps for the faster one.
1781         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1782
1783         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1784
1785         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1786             # The min(..., ...) takes care of 1) floating point rounding errors
1787             # that could make curr_*_speed to be slightly greater than
1788             # max_*_speed and 2) max_*_speed not being an exact multiple of
1789             # self._step_delta.
1790             if curr_up_speed < max_up_speed:
1791                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1792             if curr_down_speed < max_down_speed:
1793                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1794
1795             self.sut.set_speed(self.inet_cores, curr_up_speed)
1796             self.sut.set_speed(self.tun_cores, curr_down_speed)
1797             time.sleep(self.step_time)
1798
1799         LOG.info("Target speeds reached. Starting real test.")
1800
1801         yield
1802
1803         self.sut.stop(self.tun_cores + self.inet_cores)
1804         LOG.info("Test ended. Flushing NIC buffers")
1805         self.sut.start(self.all_rx_cores)
1806         time.sleep(3)
1807         self.sut.stop(self.all_rx_cores)
1808
1809     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1810                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1811         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1812                                      value, tolerated_loss, line_speed)
1813
1814         with data_helper, self.traffic_context(pkt_size, value):
1815             with data_helper.measure_tot_stats():
1816                 time.sleep(duration)
1817                 # Getting statistics to calculate PPS at right speed....
1818                 data_helper.capture_tsc_hz()
1819                 data_helper.latency = self.get_latency()
1820
1821         return data_helper.result_tuple, data_helper.samples