NSB NFVi BNG test fails to run - stops after one step
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import array
16 import io
17 import logging
18 import operator
19 import os
20 import re
21 import select
22 import socket
23 import time
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
28
29 import six
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
32
33 from yardstick.common import utils
34 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
35 from yardstick.network_services.helpers.iniparser import ConfigParser
36 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
38 from yardstick.network_services import constants
39
40 PROX_PORT = 8474
41
42 SECTION_NAME = 0
43 SECTION_CONTENTS = 1
44
45 LOG = logging.getLogger(__name__)
46 LOG.setLevel(logging.DEBUG)
47 LOG_RESULT = logging.getLogger('yardstick')
48 LOG_RESULT.setLevel(logging.DEBUG)
49
50 BITS_PER_BYTE = 8
51 RETRY_SECONDS = 60
52 RETRY_INTERVAL = 1
53
54 CONFIGURATION_OPTIONS = (
55     # dict key           section     key               default value
56     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
57     ('testDuration', 'general', 'test_duration', 5.0),
58     ('testPrecision', 'general', 'test_precision', 1.0),
59     ('tests', 'general', 'tests', None),
60     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
61
62     ('logFile', 'logging', 'file', 'dats.log'),
63     ('logDateFormat', 'logging', 'datefmt', None),
64     ('logLevel', 'logging', 'level', 'INFO'),
65     ('logOverwrite', 'logging', 'overwrite', 1),
66
67     ('testerIp', 'tester', 'ip', None),
68     ('testerUser', 'tester', 'user', 'root'),
69     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
70     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
71     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
72     ('testerSocketId', 'tester', 'socket_id', 0),
73
74     ('sutIp', 'sut', 'ip', None),
75     ('sutUser', 'sut', 'user', 'root'),
76     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
77     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
78     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
79     ('sutSocketId', 'sut', 'socket_id', 0),
80 )
81
82
83 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
84     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
85
86     def __new__(cls, *args):
87         try:
88             matches = cls.CORE_RE.search(str(args[0]))
89             if matches:
90                 args = matches.groups()
91
92             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
93                                                        'h' if args[2] else '')
94
95         except (AttributeError, TypeError, IndexError, ValueError):
96             raise ValueError('Invalid core spec {}'.format(args))
97
98     def is_hyperthread(self):
99         return self.hyperthread == 'h'
100
101     @property
102     def index(self):
103         return int(self.is_hyperthread())
104
105     def find_in_topology(self, cpu_topology):
106         try:
107             socket_core_match = cpu_topology[self.socket_id][self.core_id]
108             sorted_match = sorted(socket_core_match.values())
109             return sorted_match[self.index][0]
110         except (KeyError, IndexError):
111             template = "Core {}{} on socket {} does not exist"
112             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
113
114
115 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
116     def __new__(cls, *args):
117         try:
118             assert args[0] is not str(args[0])
119             args = tuple(args[0])
120         except (AssertionError, IndexError, TypeError):
121             pass
122
123         return super(TotStatsTuple, cls).__new__(cls, *args)
124
125
126 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
127                                                         'delta_tx,delta_tsc,'
128                                                         'latency,rx_total,tx_total,'
129                                                         'requested_pps')):
130     @property
131     def pkt_loss(self):
132         try:
133             return 1e2 * self.drop_total / float(self.tx_total)
134         except ZeroDivisionError:
135             return 100.0
136
137     @property
138     def tx_mpps(self):
139         # calculate the effective throughput in Mpps
140         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
141
142     @property
143     def rx_mpps(self):
144         # calculate the effective throughput in Mpps
145         return float(self.delta_rx) * self.tsc_hz / self.delta_tsc / 1e6
146
147     @property
148     def can_be_lost(self):
149         return int(self.tx_total * self.tolerated / 1e2)
150
151     @property
152     def drop_total(self):
153         return self.tx_total - self.rx_total
154
155     @property
156     def success(self):
157         return self.drop_total <= self.can_be_lost
158
159     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
160         if pkt_loss is None:
161             pkt_loss = self.pkt_loss
162
163         if port_samples is None:
164             port_samples = {}
165
166         latency_keys = [
167             "LatencyMin",
168             "LatencyMax",
169             "LatencyAvg",
170         ]
171
172         samples = {
173             "Throughput": self.rx_mpps,
174             "RxThroughput": self.rx_mpps,
175             "DropPackets": pkt_loss,
176             "CurrentDropPackets": pkt_loss,
177             "RequestedTxThroughput": self.requested_pps / 1e6,
178             "TxThroughput": self.tx_mpps,
179             "PktSize": pkt_size,
180         }
181         if port_samples:
182             samples.update(port_samples)
183
184         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
185         return samples
186
187     def log_data(self, logger=None):
188         if logger is None:
189             logger = LOG_RESULT
190
191         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
192         logger.info(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
193         logger.info("Mpps configured: %f; Mpps generated %f; Mpps received %f",
194                     self.requested_pps / 1e6, self.tx_mpps, self.rx_mpps)
195
196
197 class PacketDump(object):
198     @staticmethod
199     def assert_func(func, value1, value2, template=None):
200         assert func(value1, value2), template.format(value1, value2)
201
202     def __init__(self, port_id, data_len, payload):
203         template = "Packet dump has specified length {}, but payload is {} bytes long"
204         self.assert_func(operator.eq, data_len, len(payload), template)
205         self._port_id = port_id
206         self._data_len = data_len
207         self._payload = payload
208
209     @property
210     def port_id(self):
211         """Get the port id of the packet dump"""
212         return self._port_id
213
214     @property
215     def data_len(self):
216         """Get the length of the data received"""
217         return self._data_len
218
219     def __str__(self):
220         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
221
222     def payload(self, start=None, end=None):
223         """Get part of the payload as a list of ordinals.
224
225         Returns a list of byte values, matching the contents of the packet dump.
226         Optional start and end parameters can be specified to retrieve only a
227         part of the packet contents.
228
229         The number of elements in the list is equal to end - start + 1, so end
230         is the offset of the last character.
231
232         Args:
233             start (pos. int): the starting offset in the payload. If it is not
234                 specified or None, offset 0 is assumed.
235             end (pos. int): the ending offset of the payload. If it is not
236                 specified or None, the contents until the end of the packet are
237                 returned.
238
239         Returns:
240             [int, int, ...]. Each int represents the ordinal value of a byte in
241             the packet payload.
242         """
243         if start is None:
244             start = 0
245
246         if end is None:
247             end = self.data_len - 1
248
249         # Bounds checking on offsets
250         template = "Start offset must be non-negative"
251         self.assert_func(operator.ge, start, 0, template)
252
253         template = "End offset must be less than {1}"
254         self.assert_func(operator.lt, end, self.data_len, template)
255
256         # Adjust for splice operation: end offset must be 1 more than the offset
257         # of the last desired character.
258         end += 1
259
260         return self._payload[start:end]
261
262
263 class ProxSocketHelper(object):
264
265     def __init__(self, sock=None):
266         """ creates new prox instance """
267         super(ProxSocketHelper, self).__init__()
268
269         if sock is None:
270             sock = socket.socket()
271
272         self._sock = sock
273         self._pkt_dumps = []
274         self.master_stats = None
275
276     def connect(self, ip, port):
277         """Connect to the prox instance on the remote system"""
278         self._sock.connect((ip, port))
279
280     def get_socket(self):
281         """ get the socket connected to the remote instance """
282         return self._sock
283
284     def _parse_socket_data(self, decoded_data, pkt_dump_only):
285         def get_newline_index():
286             return decoded_data.find('\n', index)
287
288         ret_str = ''
289         index = 0
290         for newline_index in iter(get_newline_index, -1):
291             ret_str = decoded_data[index:newline_index]
292
293             try:
294                 mode, port_id, data_len = ret_str.split(',', 2)
295             except ValueError:
296                 mode, port_id, data_len = None, None, None
297
298             if mode != 'pktdump':
299                 # Regular 1-line message. Stop reading from the socket.
300                 LOG.debug("Regular response read")
301                 return ret_str, True
302
303             LOG.debug("Packet dump header read: [%s]", ret_str)
304
305             # The line is a packet dump header. Parse it, read the
306             # packet payload, store the dump for later retrieval.
307             # Skip over the packet dump and continue processing: a
308             # 1-line response may follow the packet dump.
309
310             data_len = int(data_len)
311             data_start = newline_index + 1  # + 1 to skip over \n
312             data_end = data_start + data_len
313             sub_data = decoded_data[data_start:data_end]
314             pkt_payload = array.array('B', (ord(v) for v in sub_data))
315             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
316             self._pkt_dumps.append(pkt_dump)
317
318             if pkt_dump_only:
319                 # Return boolean instead of string to signal
320                 # successful reception of the packet dump.
321                 LOG.debug("Packet dump stored, returning")
322                 return True, False
323
324             index = data_end + 1
325
326         return ret_str, False
327
328     def get_data(self, pkt_dump_only=False, timeout=1):
329         """ read data from the socket """
330
331         # This method behaves slightly differently depending on whether it is
332         # called to read the response to a command (pkt_dump_only = 0) or if
333         # it is called specifically to read a packet dump (pkt_dump_only = 1).
334         #
335         # Packet dumps look like:
336         #   pktdump,<port_id>,<data_len>\n
337         #   <packet contents as byte array>\n
338         # This means the total packet dump message consists of 2 lines instead
339         # of 1 line.
340         #
341         # - Response for a command (pkt_dump_only = 0):
342         #   1) Read response from the socket until \n (end of message)
343         #   2a) If the response is a packet dump header (starts with "pktdump,"):
344         #     - Read the packet payload and store the packet dump for later
345         #       retrieval.
346         #     - Reset the state and restart from 1). Eventually state 2b) will
347         #       be reached and the function will return.
348         #   2b) If the response is not a packet dump:
349         #     - Return the received message as a string
350         #
351         # - Explicit request to read a packet dump (pkt_dump_only = 1):
352         #   - Read the dump header and payload
353         #   - Store the packet dump for later retrieval
354         #   - Return True to signify a packet dump was successfully read
355
356         def is_ready():
357             # recv() is blocking, so avoid calling it when no data is waiting.
358             ready = select.select([self._sock], [], [], timeout)
359             return bool(ready[0])
360
361         status = False
362         ret_str = ""
363         for status in iter(is_ready, False):
364             decoded_data = self._sock.recv(256).decode('utf-8')
365             ret_str, done = self._parse_socket_data(decoded_data, pkt_dump_only)
366             if (done):
367                 break
368
369         LOG.debug("Received data from socket: [%s]", ret_str)
370         return ret_str if status else ''
371
372     def put_command(self, to_send):
373         """ send data to the remote instance """
374         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
375         try:
376             # NOTE: sendall will block, we need a timeout
377             self._sock.sendall(to_send.encode('utf-8'))
378         except:  # pylint: disable=bare-except
379             pass
380
381     def get_packet_dump(self):
382         """ get the next packet dump """
383         if self._pkt_dumps:
384             return self._pkt_dumps.pop(0)
385         return None
386
387     def stop_all_reset(self):
388         """ stop the remote instance and reset stats """
389         LOG.debug("Stop all and reset stats")
390         self.stop_all()
391         self.reset_stats()
392
393     def stop_all(self):
394         """ stop all cores on the remote instance """
395         LOG.debug("Stop all")
396         self.put_command("stop all\n")
397         time.sleep(3)
398
399     def stop(self, cores, task=''):
400         """ stop specific cores on the remote instance """
401
402         tmpcores = []
403         for core in cores:
404             if core not in tmpcores:
405                 tmpcores.append(core)
406
407         LOG.debug("Stopping cores %s", tmpcores)
408         self.put_command("stop {} {}\n".format(join_non_strings(',', tmpcores), task))
409         time.sleep(3)
410
411     def start_all(self):
412         """ start all cores on the remote instance """
413         LOG.debug("Start all")
414         self.put_command("start all\n")
415
416     def start(self, cores):
417         """ start specific cores on the remote instance """
418
419         tmpcores = []
420         for core in cores:
421             if core not in tmpcores:
422                 tmpcores.append(core)
423
424         LOG.debug("Starting cores %s", tmpcores)
425         self.put_command("start {}\n".format(join_non_strings(',', tmpcores)))
426         time.sleep(3)
427
428     def reset_stats(self):
429         """ reset the statistics on the remote instance """
430         LOG.debug("Reset stats")
431         self.put_command("reset stats\n")
432         time.sleep(1)
433
434     def _run_template_over_cores(self, template, cores, *args):
435         for core in cores:
436             self.put_command(template.format(core, *args))
437
438     def set_pkt_size(self, cores, pkt_size):
439         """ set the packet size to generate on the remote instance """
440         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
441         pkt_size -= 4
442         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
443         time.sleep(1)
444
445     def set_value(self, cores, offset, value, length):
446         """ set value on the remote instance """
447         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
448         LOG.debug(msg, cores, value, length, offset)
449         template = "set value {} 0 {} {} {}\n"
450         self._run_template_over_cores(template, cores, offset, value, length)
451
452     def reset_values(self, cores):
453         """ reset values on the remote instance """
454         LOG.debug("Set value for core(s) %s", cores)
455         self._run_template_over_cores("reset values {} 0\n", cores)
456
457     def set_speed(self, cores, speed, tasks=None):
458         """ set speed on the remote instance """
459         if tasks is None:
460             tasks = [0] * len(cores)
461         elif len(tasks) != len(cores):
462             LOG.error("set_speed: cores and tasks must have the same len")
463         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
464         for (core, task) in list(zip(cores, tasks)):
465             self.put_command("speed {} {} {}\n".format(core, task, speed))
466
467     def slope_speed(self, cores_speed, duration, n_steps=0):
468         """will start to increase speed from 0 to N where N is taken from
469         a['speed'] for each a in cores_speed"""
470         # by default, each step will take 0.5 sec
471         if n_steps == 0:
472             n_steps = duration * 2
473
474         private_core_data = []
475         step_duration = float(duration) / n_steps
476         for core_data in cores_speed:
477             target = float(core_data['speed'])
478             private_core_data.append({
479                 'cores': core_data['cores'],
480                 'zero': 0,
481                 'delta': target / n_steps,
482                 'current': 0,
483                 'speed': target,
484             })
485
486         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
487         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
488             time.sleep(step_duration)
489             for core_data in private_core_data:
490                 core_data['current'] = core_data[key1] + core_data[key2]
491                 self.set_speed(core_data['cores'], core_data['current'])
492
493     def set_pps(self, cores, pps, pkt_size,
494                 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
495         """ set packets per second for specific cores on the remote instance """
496         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
497         LOG.debug(msg, cores, pps, pkt_size)
498
499         # speed in percent of line-rate
500         speed = float(pps) * (pkt_size + 20) / line_speed / BITS_PER_BYTE
501         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
502
503     def lat_stats(self, cores, task=0):
504         """Get the latency statistics from the remote system"""
505         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
506         lat_min = {}
507         lat_max = {}
508         lat_avg = {}
509         for core in cores:
510             self.put_command("lat stats {} {} \n".format(core, task))
511             ret = self.get_data()
512
513             try:
514                 lat_min[core], lat_max[core], lat_avg[core] = \
515                     tuple(int(n) for n in ret.split(",")[:3])
516
517             except (AttributeError, ValueError, TypeError):
518                 pass
519
520         return lat_min, lat_max, lat_avg
521
522     def get_all_tot_stats(self):
523         self.put_command("tot stats\n")
524         all_stats_str = self.get_data().split(",")
525         if len(all_stats_str) != 4:
526             all_stats = [0] * 4
527             return all_stats
528         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
529         self.master_stats = all_stats
530         return all_stats
531
532     def hz(self):
533         return self.get_all_tot_stats()[3]
534
535     def core_stats(self, cores, task=0):
536         """Get the receive statistics from the remote system"""
537         rx = tx = drop = tsc = 0
538         for core in cores:
539             self.put_command("core stats {} {}\n".format(core, task))
540             ret = self.get_data().split(",")
541             rx += int(ret[0])
542             tx += int(ret[1])
543             drop += int(ret[2])
544             tsc = int(ret[3])
545         return rx, tx, drop, tsc
546
547     def port_stats(self, ports):
548         """get counter values from a specific port"""
549         tot_result = [0] * 12
550         for port in ports:
551             self.put_command("port_stats {}\n".format(port))
552             ret = [try_int(s, 0) for s in self.get_data().split(",")]
553             tot_result = [sum(x) for x in zip(tot_result, ret)]
554         return tot_result
555
556     @contextmanager
557     def measure_tot_stats(self):
558         start = self.get_all_tot_stats()
559         container = {'start_tot': start}
560         try:
561             yield container
562         finally:
563             container['end_tot'] = end = self.get_all_tot_stats()
564
565         container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
566
567     def tot_stats(self):
568         """Get the total statistics from the remote system"""
569         stats = self.get_all_tot_stats()
570         return stats[:3]
571
572     def tot_ierrors(self):
573         """Get the total ierrors from the remote system"""
574         self.put_command("tot ierrors tot\n")
575         recv = self.get_data().split(',')
576         tot_ierrors = int(recv[0])
577         tsc = int(recv[0])
578         return tot_ierrors, tsc
579
580     def set_count(self, count, cores):
581         """Set the number of packets to send on the specified core"""
582         self._run_template_over_cores("count {} 0 {}\n", cores, count)
583
584     def dump_rx(self, core_id, task_id=0, count=1):
585         """Activate dump on rx on the specified core"""
586         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
587         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
588         time.sleep(1.5)  # Give PROX time to set up packet dumping
589
590     def quit(self):
591         self.stop_all()
592         self._quit()
593         self.force_quit()
594
595     def _quit(self):
596         """ stop all cores on the remote instance """
597         LOG.debug("Quit prox")
598         self.put_command("quit\n")
599         time.sleep(3)
600
601     def force_quit(self):
602         """ stop all cores on the remote instance """
603         LOG.debug("Force Quit prox")
604         self.put_command("quit_force\n")
605         time.sleep(3)
606
607
608 _LOCAL_OBJECT = object()
609
610
611 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
612     # the actual app is lowercase
613     APP_NAME = 'prox'
614     # not used for Prox but added for consistency
615     VNF_TYPE = "PROX"
616
617     LUA_PARAMETER_NAME = ""
618     LUA_PARAMETER_PEER = {
619         "gen": "sut",
620         "sut": "gen",
621     }
622
623     CONFIG_QUEUE_TIMEOUT = 120
624
625     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
626         self.remote_path = None
627         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
628         self.remote_prox_file_name = None
629         self._prox_config_data = None
630         self.additional_files = {}
631         self.config_queue = Queue()
632         # allow_exit_without_flush
633         self.config_queue.cancel_join_thread()
634         self._global_section = None
635
636     @property
637     def prox_config_data(self):
638         if self._prox_config_data is None:
639             # this will block, but it needs too
640             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
641         return self._prox_config_data
642
643     @property
644     def global_section(self):
645         if self._global_section is None and self.prox_config_data:
646             self._global_section = self.find_section("global")
647         return self._global_section
648
649     def find_section(self, name, default=_LOCAL_OBJECT):
650         result = next((value for key, value in self.prox_config_data if key == name), default)
651         if result is _LOCAL_OBJECT:
652             raise KeyError('{} not found in Prox config'.format(name))
653         return result
654
655     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
656         section = self.find_section(section_name, [])
657         result = next((value for key, value in section if key == section_key), default)
658         if result is _LOCAL_OBJECT:
659             template = '{} not found in {} section of Prox config'
660             raise KeyError(template.format(section_key, section_name))
661         return result
662
663     def copy_to_target(self, config_file_path, prox_file):
664         remote_path = os.path.join("/tmp", prox_file)
665         self.ssh_helper.put(config_file_path, remote_path)
666         return remote_path
667
668     @staticmethod
669     def _get_tx_port(section, sections):
670         iface_port = [-1]
671         for item in sections[section]:
672             if item[0] == "tx port":
673                 iface_port = re.findall(r'\d+', item[1])
674                 # do we want the last one?
675                 #   if yes, then can we reverse?
676         return int(iface_port[0])
677
678     @staticmethod
679     def _replace_quoted_with_value(quoted, value, count=1):
680         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
681         return new_string
682
683     def _insert_additional_file(self, value):
684         file_str = value.split('"')
685         base_name = os.path.basename(file_str[1])
686         file_str[1] = self.additional_files[base_name]
687         return '"'.join(file_str)
688
689     def generate_prox_config_file(self, config_path):
690         sections = []
691         prox_config = ConfigParser(config_path, sections)
692         prox_config.parse()
693
694         # Ensure MAC is set "hardware"
695         all_ports = self.vnfd_helper.port_pairs.all_ports
696         # use dpdk port number
697         for port_name in all_ports:
698             port_num = self.vnfd_helper.port_num(port_name)
699             port_section_name = "port {}".format(port_num)
700             for section_name, section in sections:
701                 if port_section_name != section_name:
702                     continue
703
704                 for section_data in section:
705                     if section_data[0] == "mac":
706                         section_data[1] = "hardware"
707
708         # search for dst mac
709         for _, section in sections:
710             for section_data in section:
711                 item_key, item_val = section_data
712                 if item_val.startswith("@@dst_mac"):
713                     tx_port_iter = re.finditer(r'\d+', item_val)
714                     tx_port_no = int(next(tx_port_iter).group(0))
715                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
716                     mac = intf["virtual-interface"]["dst_mac"]
717                     section_data[1] = mac.replace(":", " ", 6)
718
719                 if item_key == "dst mac" and item_val.startswith("@@"):
720                     tx_port_iter = re.finditer(r'\d+', item_val)
721                     tx_port_no = int(next(tx_port_iter).group(0))
722                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
723                     mac = intf["virtual-interface"]["dst_mac"]
724                     section_data[1] = mac
725
726                 if item_val.startswith("@@src_mac"):
727                     tx_port_iter = re.finditer(r'\d+', item_val)
728                     tx_port_no = int(next(tx_port_iter).group(0))
729                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
730                     mac = intf["virtual-interface"]["local_mac"]
731                     section_data[1] = mac.replace(":", " ", 6)
732
733                 if item_key == "src mac" and item_val.startswith("@@"):
734                     tx_port_iter = re.finditer(r'\d+', item_val)
735                     tx_port_no = int(next(tx_port_iter).group(0))
736                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
737                     mac = intf["virtual-interface"]["local_mac"]
738                     section_data[1] = mac
739
740         # if addition file specified in prox config
741         if not self.additional_files:
742             return sections
743
744         for section_name, section in sections:
745             for section_data in section:
746                 try:
747                     if section_data[0].startswith("dofile"):
748                         section_data[0] = self._insert_additional_file(section_data[0])
749
750                     if section_data[1].startswith("dofile"):
751                         section_data[1] = self._insert_additional_file(section_data[1])
752                 except:  # pylint: disable=bare-except
753                     pass
754
755         return sections
756
757     @staticmethod
758     def write_prox_lua(lua_config):
759         """
760         Write an .ini-format config file for PROX (parameters.lua)
761         PROX does not allow a space before/after the =, so we need
762         a custom method
763         """
764         out = []
765         for key in lua_config:
766             value = '"' + lua_config[key] + '"'
767             if key == "__name__":
768                 continue
769             if value is not None and value != '@':
770                 key = "=".join((key, str(value).replace('\n', '\n\t')))
771                 out.append(key)
772             else:
773                 key = str(key).replace('\n', '\n\t')
774                 out.append(key)
775         return os.linesep.join(out)
776
777     @staticmethod
778     def write_prox_config(prox_config):
779         """
780         Write an .ini-format config file for PROX
781         PROX does not allow a space before/after the =, so we need
782         a custom method
783         """
784         out = []
785         for (section_name, section) in prox_config:
786             out.append("[{}]".format(section_name))
787             for item in section:
788                 key, value = item
789                 if key == "__name__":
790                     continue
791                 if value is not None and value != '@':
792                     key = "=".join((key, str(value).replace('\n', '\n\t')))
793                     out.append(key)
794                 else:
795                     key = str(key).replace('\n', '\n\t')
796                     out.append(key)
797         return os.linesep.join(out)
798
799     def put_string_to_file(self, s, remote_path):
800         file_obj = cStringIO(s)
801         self.ssh_helper.put_file_obj(file_obj, remote_path)
802         return remote_path
803
804     def generate_prox_lua_file(self):
805         p = OrderedDict()
806         all_ports = self.vnfd_helper.port_pairs.all_ports
807         for port_name in all_ports:
808             port_num = self.vnfd_helper.port_num(port_name)
809             intf = self.vnfd_helper.find_interface(name=port_name)
810             vintf = intf['virtual-interface']
811             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
812             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
813
814         return p
815
816     def upload_prox_lua(self, config_file, lua_data):
817         # prox can't handle spaces around ' = ' so use custom method
818         out = StringIO(self.write_prox_lua(lua_data))
819         out.seek(0)
820         remote_path = os.path.join("/tmp", config_file)
821         self.ssh_helper.put_file_obj(out, remote_path)
822
823         return remote_path
824
825     def upload_prox_config(self, config_file, prox_config_data):
826         # prox can't handle spaces around ' = ' so use custom method
827         out = StringIO(self.write_prox_config(prox_config_data))
828         out.seek(0)
829         remote_path = os.path.join("/tmp", config_file)
830         self.ssh_helper.put_file_obj(out, remote_path)
831
832         return remote_path
833
834     def build_config_file(self):
835         task_path = self.scenario_helper.task_path
836         options = self.scenario_helper.options
837         config_path = options['prox_config']
838         config_file = os.path.basename(config_path)
839         config_path = utils.find_relative_file(config_path, task_path)
840         self.additional_files = {}
841
842         try:
843             if options['prox_generate_parameter']:
844                 self.lua = []
845                 self.lua = self.generate_prox_lua_file()
846                 if len(self.lua) > 0:
847                     self.upload_prox_lua("parameters.lua", self.lua)
848         except:  # pylint: disable=bare-except
849             pass
850
851         prox_files = options.get('prox_files', [])
852         if isinstance(prox_files, six.string_types):
853             prox_files = [prox_files]
854         for key_prox_file in prox_files:
855             base_prox_file = os.path.basename(key_prox_file)
856             key_prox_path = utils.find_relative_file(key_prox_file, task_path)
857             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
858             self.additional_files[base_prox_file] = remote_prox_file
859
860         self._prox_config_data = self.generate_prox_config_file(config_path)
861         # copy config to queue so we can read it from traffic_runner process
862         self.config_queue.put(self._prox_config_data)
863         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
864
865     def build_config(self):
866         self.build_config_file()
867
868         options = self.scenario_helper.options
869         prox_args = options['prox_args']
870         tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
871
872         self.pipeline_kwargs = {
873             'tool_path': tool_path,
874             'tool_dir': os.path.dirname(tool_path),
875             'cfg_file': self.remote_path,
876             'args': ' '.join(' '.join([str(k), str(v) if v else ''])
877                              for k, v in prox_args.items())
878         }
879
880         cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
881                         "{args} -f {cfg_file} '")
882         return cmd_template.format(**self.pipeline_kwargs)
883
884
885 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
886 class ProxResourceHelper(ClientResourceHelper):
887
888     RESOURCE_WORD = 'prox'
889
890     PROX_MODE = ""
891
892     WAIT_TIME = 3
893
894     @staticmethod
895     def find_pci(pci, bound_pci):
896         # we have to substring match PCI bus address from the end
897         return any(b.endswith(pci) for b in bound_pci)
898
899     def __init__(self, setup_helper):
900         super(ProxResourceHelper, self).__init__(setup_helper)
901         self.mgmt_interface = self.vnfd_helper.mgmt_interface
902         self._user = self.mgmt_interface["user"]
903         self._ip = self.mgmt_interface["ip"]
904
905         self.done = False
906         self._vpci_to_if_name_map = None
907         self.additional_file = {}
908         self.remote_prox_file_name = None
909         self.lower = None
910         self.upper = None
911         self.step_delta = 1
912         self.step_time = 0.5
913         self._test_type = None
914
915     @property
916     def sut(self):
917         if not self.client:
918             self.client = self._connect()
919         return self.client
920
921     @property
922     def test_type(self):
923         if self._test_type is None:
924             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
925         return self._test_type
926
927     def run_traffic(self, traffic_profile):
928         self._queue.cancel_join_thread()
929         self.lower = 0.0
930         self.upper = 100.0
931
932         traffic_profile.init(self._queue)
933         # this frees up the run_traffic loop
934         self.client_started.value = 1
935
936         while not self._terminated.value:
937             # move it all to traffic_profile
938             self._run_traffic_once(traffic_profile)
939
940     def _run_traffic_once(self, traffic_profile):
941         traffic_profile.execute_traffic(self)
942         if traffic_profile.done:
943             self._queue.put({'done': True})
944             LOG.debug("tg_prox done")
945             self._terminated.value = 1
946
947     # For VNF use ResourceHelper method to collect KPIs directly.
948     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
949     def collect_collectd_kpi(self):
950         return self._collect_resource_kpi()
951
952     def collect_kpi(self):
953         result = super(ProxResourceHelper, self).collect_kpi()
954         # add in collectd kpis manually
955         if result:
956             result['collect_stats'] = self._collect_resource_kpi()
957         return result
958
959     def terminate(self):
960         # should not be called, use VNF terminate
961         raise NotImplementedError()
962
963     def up_post(self):
964         return self.sut  # force connection
965
966     def execute(self, cmd, *args, **kwargs):
967         func = getattr(self.sut, cmd, None)
968         if func:
969             return func(*args, **kwargs)
970         return None
971
972     def _connect(self, client=None):
973         """Run and connect to prox on the remote system """
974         # De-allocating a large amount of hugepages takes some time. If a new
975         # PROX instance is started immediately after killing the previous one,
976         # it might not be able to allocate hugepages, because they are still
977         # being freed. Hence the -w switch.
978         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
979         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
980         # -f ./handle_none-4.cfg"
981         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
982         #  "; " \
983         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
984         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
985         # sudo " \
986         #    + "./build/Prox " + prox_args
987         # log.debug("Starting PROX with command [%s]", prox_cmd)
988         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
989         # self._ip, prox_cmd))
990         if client is None:
991             client = ProxSocketHelper()
992
993         # try connecting to Prox for 60s
994         for _ in range(RETRY_SECONDS):
995             time.sleep(RETRY_INTERVAL)
996             try:
997                 client.connect(self._ip, PROX_PORT)
998             except (socket.gaierror, socket.error):
999                 continue
1000             else:
1001                 return client
1002
1003         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1004         raise Exception(msg.format(self._ip, PROX_PORT))
1005
1006
1007 class ProxDataHelper(object):
1008
1009     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss, line_speed):
1010         super(ProxDataHelper, self).__init__()
1011         self.vnfd_helper = vnfd_helper
1012         self.sut = sut
1013         self.pkt_size = pkt_size
1014         self.value = value
1015         self.line_speed = line_speed
1016         self.tolerated_loss = tolerated_loss
1017         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
1018         self.tsc_hz = None
1019         self.measured_stats = None
1020         self.latency = None
1021         self._totals_and_pps = None
1022         self.result_tuple = None
1023
1024     @property
1025     def totals_and_pps(self):
1026         if self._totals_and_pps is None:
1027             rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
1028             requested_pps = self.value / 100.0 * self.line_rate_to_pps()
1029             self._totals_and_pps = rx_total, tx_total, requested_pps
1030         return self._totals_and_pps
1031
1032     @property
1033     def rx_total(self):
1034         return self.totals_and_pps[0]
1035
1036     @property
1037     def tx_total(self):
1038         return self.totals_and_pps[1]
1039
1040     @property
1041     def requested_pps(self):
1042         return self.totals_and_pps[2]
1043
1044     @property
1045     def samples(self):
1046         samples = {}
1047         for port_name, port_num in self.vnfd_helper.ports_iter():
1048             try:
1049                 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1050                 samples[port_name] = {
1051                     "in_packets": port_rx_total,
1052                     "out_packets": port_tx_total,
1053                 }
1054             except (KeyError, TypeError, NameError, MemoryError, ValueError,
1055                     SystemError, BufferError):
1056                 samples[port_name] = {
1057                     "in_packets": 0,
1058                     "out_packets": 0,
1059                 }
1060         return samples
1061
1062     def __enter__(self):
1063         self.check_interface_count()
1064         return self
1065
1066     def __exit__(self, exc_type, exc_val, exc_tb):
1067         self.make_tuple()
1068
1069     def make_tuple(self):
1070         if self.result_tuple:
1071             return
1072
1073         self.result_tuple = ProxTestDataTuple(
1074             self.tolerated_loss,
1075             self.tsc_hz,
1076             self.measured_stats['delta'].rx,
1077             self.measured_stats['delta'].tx,
1078             self.measured_stats['delta'].tsc,
1079             self.latency,
1080             self.rx_total,
1081             self.tx_total,
1082             self.requested_pps,
1083         )
1084         self.result_tuple.log_data()
1085
1086     @contextmanager
1087     def measure_tot_stats(self):
1088         with self.sut.measure_tot_stats() as self.measured_stats:
1089             yield
1090
1091     def check_interface_count(self):
1092         # do this assert in init?  unless we expect interface count to
1093         # change from one run to another run...
1094         assert self.port_count in {1, 2, 4}, \
1095             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1096
1097     def capture_tsc_hz(self):
1098         self.tsc_hz = float(self.sut.hz())
1099
1100     def line_rate_to_pps(self):
1101       return self.port_count * self.line_speed  / BITS_PER_BYTE / (self.pkt_size + 20)
1102
1103 class ProxProfileHelper(object):
1104
1105     __prox_profile_type__ = "Generic"
1106
1107     PROX_CORE_GEN_MODE = "gen"
1108     PROX_CORE_LAT_MODE = "lat"
1109
1110     @classmethod
1111     def get_cls(cls, helper_type):
1112         """Return class of specified type."""
1113         if not helper_type:
1114             return ProxProfileHelper
1115
1116         for profile_helper_class in utils.itersubclasses(cls):
1117             if helper_type == profile_helper_class.__prox_profile_type__:
1118                 return profile_helper_class
1119
1120         return ProxProfileHelper
1121
1122     @classmethod
1123     def make_profile_helper(cls, resource_helper):
1124         return cls.get_cls(resource_helper.test_type)(resource_helper)
1125
1126     def __init__(self, resource_helper):
1127         super(ProxProfileHelper, self).__init__()
1128         self.resource_helper = resource_helper
1129         self._cpu_topology = None
1130         self._test_cores = None
1131         self._latency_cores = None
1132
1133     @property
1134     def cpu_topology(self):
1135         if not self._cpu_topology:
1136             stdout = io.BytesIO()
1137             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1138             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1139         return self._cpu_topology
1140
1141     @property
1142     def test_cores(self):
1143         if not self._test_cores:
1144             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1145         return self._test_cores
1146
1147     @property
1148     def latency_cores(self):
1149         if not self._latency_cores:
1150             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1151         return self._latency_cores
1152
1153     @contextmanager
1154     def traffic_context(self, pkt_size, value):
1155         self.sut.stop_all()
1156         self.sut.reset_stats()
1157         try:
1158             self.sut.set_pkt_size(self.test_cores, pkt_size)
1159             self.sut.set_speed(self.test_cores, value)
1160             self.sut.start_all()
1161             time.sleep(1)
1162             yield
1163         finally:
1164             self.sut.stop_all()
1165
1166     def get_cores(self, mode):
1167         cores = []
1168
1169         for section_name, section in self.setup_helper.prox_config_data:
1170             if not section_name.startswith("core"):
1171                 continue
1172
1173             for key, value in section:
1174                 if key == "mode" and value == mode:
1175                     core_tuple = CoreSocketTuple(section_name)
1176                     core = core_tuple.core_id
1177                     cores.append(core)
1178
1179         return cores
1180
1181     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1182                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1183         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1184                                      value, tolerated_loss, line_speed)
1185
1186         with data_helper, self.traffic_context(pkt_size, value):
1187             with data_helper.measure_tot_stats():
1188                 time.sleep(duration)
1189                 # Getting statistics to calculate PPS at right speed....
1190                 data_helper.capture_tsc_hz()
1191                 data_helper.latency = self.get_latency()
1192
1193         return data_helper.result_tuple, data_helper.samples
1194
1195     def get_latency(self):
1196         """
1197         :return: return lat_min, lat_max, lat_avg
1198         :rtype: list
1199         """
1200
1201         if not self._latency_cores:
1202             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1203
1204         if self._latency_cores:
1205             return self.sut.lat_stats(self._latency_cores)
1206         return []
1207
1208     def terminate(self):
1209         pass
1210
1211     def __getattr__(self, item):
1212         return getattr(self.resource_helper, item)
1213
1214
1215 class ProxMplsProfileHelper(ProxProfileHelper):
1216
1217     __prox_profile_type__ = "MPLS tag/untag"
1218
1219     def __init__(self, resource_helper):
1220         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1221         self._cores_tuple = None
1222
1223     @property
1224     def mpls_cores(self):
1225         if not self._cores_tuple:
1226             self._cores_tuple = self.get_cores_mpls()
1227         return self._cores_tuple
1228
1229     @property
1230     def tagged_cores(self):
1231         return self.mpls_cores[0]
1232
1233     @property
1234     def plain_cores(self):
1235         return self.mpls_cores[1]
1236
1237     def get_cores_mpls(self):
1238         cores_tagged = []
1239         cores_plain = []
1240         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1241             if not section_name.startswith("core"):
1242                 continue
1243
1244             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1245                 continue
1246
1247             for item_key, item_value in section:
1248                 if item_key != 'name':
1249                     continue
1250
1251                 if item_value.startswith("tag"):
1252                     core_tuple = CoreSocketTuple(section_name)
1253                     core_tag = core_tuple.core_id
1254                     cores_tagged.append(core_tag)
1255
1256                 elif item_value.startswith("udp"):
1257                     core_tuple = CoreSocketTuple(section_name)
1258                     core_udp = core_tuple.core_id
1259                     cores_plain.append(core_udp)
1260
1261         return cores_tagged, cores_plain
1262
1263     @contextmanager
1264     def traffic_context(self, pkt_size, value):
1265         self.sut.stop_all()
1266         self.sut.reset_stats()
1267         try:
1268             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1269             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1270             self.sut.set_speed(self.tagged_cores, value)
1271             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1272             self.sut.set_speed(self.plain_cores, value * ratio)
1273             self.sut.start_all()
1274             time.sleep(1)
1275             yield
1276         finally:
1277             self.sut.stop_all()
1278
1279
1280 class ProxBngProfileHelper(ProxProfileHelper):
1281
1282     __prox_profile_type__ = "BNG gen"
1283
1284     def __init__(self, resource_helper):
1285         super(ProxBngProfileHelper, self).__init__(resource_helper)
1286         self._cores_tuple = None
1287
1288     @property
1289     def bng_cores(self):
1290         if not self._cores_tuple:
1291             self._cores_tuple = self.get_cores_gen_bng_qos()
1292         return self._cores_tuple
1293
1294     @property
1295     def cpe_cores(self):
1296         return self.bng_cores[0]
1297
1298     @property
1299     def inet_cores(self):
1300         return self.bng_cores[1]
1301
1302     @property
1303     def arp_cores(self):
1304         return self.bng_cores[2]
1305
1306     @property
1307     def arp_task_cores(self):
1308         return self.bng_cores[3]
1309
1310     @property
1311     def all_rx_cores(self):
1312         return self.latency_cores
1313
1314     def get_cores_gen_bng_qos(self):
1315         cpe_cores = []
1316         inet_cores = []
1317         arp_cores = []
1318         arp_tasks_core = [0]
1319         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1320             if not section_name.startswith("core"):
1321                 continue
1322
1323             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1324                 continue
1325
1326             for item_key, item_value in section:
1327                 if item_key != 'name':
1328                     continue
1329
1330                 if item_value.startswith("cpe"):
1331                     core_tuple = CoreSocketTuple(section_name)
1332                     cpe_core = core_tuple.core_id
1333                     cpe_cores.append(cpe_core)
1334
1335                 elif item_value.startswith("inet"):
1336                     core_tuple = CoreSocketTuple(section_name)
1337                     inet_core = core_tuple.core_id
1338                     inet_cores.append(inet_core)
1339
1340                 elif item_value.startswith("arp"):
1341                     core_tuple = CoreSocketTuple(section_name)
1342                     arp_core = core_tuple.core_id
1343                     arp_cores.append(arp_core)
1344
1345                 # We check the tasks/core separately
1346                 if item_value.startswith("arp_task"):
1347                     core_tuple = CoreSocketTuple(section_name)
1348                     arp_task_core = core_tuple.core_id
1349                     arp_tasks_core.append(arp_task_core)
1350
1351         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1352
1353     @contextmanager
1354     def traffic_context(self, pkt_size, value):
1355         # Tester is sending packets at the required speed already after
1356         # setup_test(). Just get the current statistics, sleep the required
1357         # amount of time and calculate packet loss.
1358         inet_pkt_size = pkt_size
1359         cpe_pkt_size = pkt_size - 24
1360         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1361
1362         curr_up_speed = curr_down_speed = 0
1363         max_up_speed = max_down_speed = value
1364         if ratio < 1:
1365             max_down_speed = value * ratio
1366         else:
1367             max_up_speed = value / ratio
1368
1369         # Initialize cores
1370         self.sut.stop_all()
1371         time.sleep(0.5)
1372
1373         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1374         # wrong.
1375         self.sut.start(self.all_rx_cores)
1376         time.sleep(0.5)
1377         self.sut.stop(self.all_rx_cores)
1378         time.sleep(0.5)
1379         self.sut.reset_stats()
1380
1381         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1382         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1383
1384         self.sut.reset_values(self.cpe_cores)
1385         self.sut.reset_values(self.inet_cores)
1386
1387         # Set correct IP and UDP lengths in packet headers
1388         # CPE
1389         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1390         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1391         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1392         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1393
1394         # INET
1395         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1396         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1397         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1398         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1399         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1400         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1401
1402         # Sending ARP to initialize tables - need a few seconds of generation
1403         # to make sure all CPEs are initialized
1404         LOG.info("Initializing SUT: sending ARP packets")
1405         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1406         self.sut.set_speed(self.inet_cores, curr_up_speed)
1407         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1408         self.sut.start(self.arp_cores)
1409         time.sleep(4)
1410
1411         # Ramp up the transmission speed. First go to the common speed, then
1412         # increase steps for the faster one.
1413         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1414
1415         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1416
1417         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1418             # The min(..., ...) takes care of 1) floating point rounding errors
1419             # that could make curr_*_speed to be slightly greater than
1420             # max_*_speed and 2) max_*_speed not being an exact multiple of
1421             # self._step_delta.
1422             if curr_up_speed < max_up_speed:
1423                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1424             if curr_down_speed < max_down_speed:
1425                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1426
1427             self.sut.set_speed(self.inet_cores, curr_up_speed)
1428             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1429             time.sleep(self.step_time)
1430
1431         LOG.info("Target speeds reached. Starting real test.")
1432
1433         yield
1434
1435         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1436         LOG.info("Test ended. Flushing NIC buffers")
1437         self.sut.start(self.all_rx_cores)
1438         time.sleep(3)
1439         self.sut.stop(self.all_rx_cores)
1440
1441     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1442                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1443         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1444                                      value, tolerated_loss, line_speed)
1445
1446         with data_helper, self.traffic_context(pkt_size, value):
1447             with data_helper.measure_tot_stats():
1448                 time.sleep(duration)
1449                 # Getting statistics to calculate PPS at right speed....
1450                 data_helper.capture_tsc_hz()
1451                 data_helper.latency = self.get_latency()
1452
1453         return data_helper.result_tuple, data_helper.samples
1454
1455
1456 class ProxVpeProfileHelper(ProxProfileHelper):
1457
1458     __prox_profile_type__ = "vPE gen"
1459
1460     def __init__(self, resource_helper):
1461         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1462         self._cores_tuple = None
1463         self._ports_tuple = None
1464
1465     @property
1466     def vpe_cores(self):
1467         if not self._cores_tuple:
1468             self._cores_tuple = self.get_cores_gen_vpe()
1469         return self._cores_tuple
1470
1471     @property
1472     def cpe_cores(self):
1473         return self.vpe_cores[0]
1474
1475     @property
1476     def inet_cores(self):
1477         return self.vpe_cores[1]
1478
1479     @property
1480     def all_rx_cores(self):
1481         return self.latency_cores
1482
1483     @property
1484     def vpe_ports(self):
1485         if not self._ports_tuple:
1486             self._ports_tuple = self.get_ports_gen_vpe()
1487         return self._ports_tuple
1488
1489     @property
1490     def cpe_ports(self):
1491         return self.vpe_ports[0]
1492
1493     @property
1494     def inet_ports(self):
1495         return self.vpe_ports[1]
1496
1497     def get_cores_gen_vpe(self):
1498         cpe_cores = []
1499         inet_cores = []
1500         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1501             if not section_name.startswith("core"):
1502                 continue
1503
1504             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1505                 continue
1506
1507             for item_key, item_value in section:
1508                 if item_key != 'name':
1509                     continue
1510
1511                 if item_value.startswith("cpe"):
1512                     core_tuple = CoreSocketTuple(section_name)
1513                     core_tag = core_tuple.core_id
1514                     cpe_cores.append(core_tag)
1515
1516                 elif item_value.startswith("inet"):
1517                     core_tuple = CoreSocketTuple(section_name)
1518                     inet_core = core_tuple.core_id
1519                     inet_cores.append(inet_core)
1520
1521         return cpe_cores, inet_cores
1522
1523     def get_ports_gen_vpe(self):
1524         cpe_ports = []
1525         inet_ports = []
1526
1527         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1528             if not section_name.startswith("port"):
1529                 continue
1530             tx_port_iter = re.finditer(r'\d+', section_name)
1531             tx_port_no = int(next(tx_port_iter).group(0))
1532
1533             for item_key, item_value in section:
1534                 if item_key != 'name':
1535                     continue
1536
1537                 if item_value.startswith("cpe"):
1538                     cpe_ports.append(tx_port_no)
1539
1540                 elif item_value.startswith("inet"):
1541                     inet_ports.append(tx_port_no)
1542
1543         return cpe_ports, inet_ports
1544
1545     @contextmanager
1546     def traffic_context(self, pkt_size, value):
1547         # Calculate the target upload and download speed. The upload and
1548         # download packets have different packet sizes, so in order to get
1549         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1550         # of the packet sizes.
1551         cpe_pkt_size = pkt_size
1552         inet_pkt_size = pkt_size - 4
1553         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1554
1555         curr_up_speed = curr_down_speed = 0
1556         max_up_speed = max_down_speed = value
1557         if ratio < 1:
1558             max_down_speed = value * ratio
1559         else:
1560             max_up_speed = value / ratio
1561
1562         # Adjust speed when multiple cores per port are used to generate traffic
1563         if len(self.cpe_ports) != len(self.cpe_cores):
1564             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1565         if len(self.inet_ports) != len(self.inet_cores):
1566             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1567
1568         # Initialize cores
1569         self.sut.stop_all()
1570         time.sleep(2)
1571
1572         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1573         # wrong.
1574         self.sut.start(self.all_rx_cores)
1575         time.sleep(2)
1576         self.sut.stop(self.all_rx_cores)
1577         time.sleep(2)
1578         self.sut.reset_stats()
1579
1580         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1581         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1582
1583         self.sut.reset_values(self.cpe_cores)
1584         self.sut.reset_values(self.inet_cores)
1585
1586         # Set correct IP and UDP lengths in packet headers
1587         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1588         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1589         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1590         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1591
1592         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1593         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1594         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1595         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1596
1597         self.sut.set_speed(self.inet_cores, curr_up_speed)
1598         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1599
1600         # Ramp up the transmission speed. First go to the common speed, then
1601         # increase steps for the faster one.
1602         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1603
1604         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1605
1606         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1607             # The min(..., ...) takes care of 1) floating point rounding errors
1608             # that could make curr_*_speed to be slightly greater than
1609             # max_*_speed and 2) max_*_speed not being an exact multiple of
1610             # self._step_delta.
1611             if curr_up_speed < max_up_speed:
1612                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1613             if curr_down_speed < max_down_speed:
1614                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1615
1616             self.sut.set_speed(self.inet_cores, curr_up_speed)
1617             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1618             time.sleep(self.step_time)
1619
1620         LOG.info("Target speeds reached. Starting real test.")
1621
1622         yield
1623
1624         self.sut.stop(self.cpe_cores + self.inet_cores)
1625         LOG.info("Test ended. Flushing NIC buffers")
1626         self.sut.start(self.all_rx_cores)
1627         time.sleep(3)
1628         self.sut.stop(self.all_rx_cores)
1629
1630     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1631                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1632         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1633                                      value, tolerated_loss, line_speed)
1634
1635         with data_helper, self.traffic_context(pkt_size, value):
1636             with data_helper.measure_tot_stats():
1637                 time.sleep(duration)
1638                 # Getting statistics to calculate PPS at right speed....
1639                 data_helper.capture_tsc_hz()
1640                 data_helper.latency = self.get_latency()
1641
1642         return data_helper.result_tuple, data_helper.samples
1643
1644
1645 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1646
1647     __prox_profile_type__ = "lwAFTR gen"
1648
1649     def __init__(self, resource_helper):
1650         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1651         self._cores_tuple = None
1652         self._ports_tuple = None
1653         self.step_delta = 5
1654         self.step_time = 0.5
1655
1656     @property
1657     def _lwaftr_cores(self):
1658         if not self._cores_tuple:
1659             self._cores_tuple = self._get_cores_gen_lwaftr()
1660         return self._cores_tuple
1661
1662     @property
1663     def tun_cores(self):
1664         return self._lwaftr_cores[0]
1665
1666     @property
1667     def inet_cores(self):
1668         return self._lwaftr_cores[1]
1669
1670     @property
1671     def _lwaftr_ports(self):
1672         if not self._ports_tuple:
1673             self._ports_tuple = self._get_ports_gen_lw_aftr()
1674         return self._ports_tuple
1675
1676     @property
1677     def tun_ports(self):
1678         return self._lwaftr_ports[0]
1679
1680     @property
1681     def inet_ports(self):
1682         return self._lwaftr_ports[1]
1683
1684     @property
1685     def all_rx_cores(self):
1686         return self.latency_cores
1687
1688     def _get_cores_gen_lwaftr(self):
1689         tun_cores = []
1690         inet_cores = []
1691         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1692             if not section_name.startswith("core"):
1693                 continue
1694
1695             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1696                 continue
1697
1698             core_tuple = CoreSocketTuple(section_name)
1699             core_tag = core_tuple.core_id
1700             for item_value in (v for k, v in section if k == 'name'):
1701                 if item_value.startswith('tun'):
1702                     tun_cores.append(core_tag)
1703                 elif item_value.startswith('inet'):
1704                     inet_cores.append(core_tag)
1705
1706         return tun_cores, inet_cores
1707
1708     def _get_ports_gen_lw_aftr(self):
1709         tun_ports = []
1710         inet_ports = []
1711
1712         re_port = re.compile(r'port (\d+)')
1713         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1714             match = re_port.search(section_name)
1715             if not match:
1716                 continue
1717
1718             tx_port_no = int(match.group(1))
1719             for item_value in (v for k, v in section if k == 'name'):
1720                 if item_value.startswith('lwB4'):
1721                     tun_ports.append(tx_port_no)
1722                 elif item_value.startswith('inet'):
1723                     inet_ports.append(tx_port_no)
1724
1725         return tun_ports, inet_ports
1726
1727     @staticmethod
1728     def _resize(len1, len2):
1729         if len1 == len2:
1730             return 1.0
1731         return 1.0 * len1 / len2
1732
1733     @contextmanager
1734     def traffic_context(self, pkt_size, value):
1735         # Tester is sending packets at the required speed already after
1736         # setup_test(). Just get the current statistics, sleep the required
1737         # amount of time and calculate packet loss.
1738         tun_pkt_size = pkt_size
1739         inet_pkt_size = pkt_size - 40
1740         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1741
1742         curr_up_speed = curr_down_speed = 0
1743         max_up_speed = max_down_speed = value
1744
1745         max_up_speed = value / ratio
1746
1747         # Adjust speed when multiple cores per port are used to generate traffic
1748         if len(self.tun_ports) != len(self.tun_cores):
1749             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1750         if len(self.inet_ports) != len(self.inet_cores):
1751             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1752
1753         # Initialize cores
1754         self.sut.stop_all()
1755         time.sleep(0.5)
1756
1757         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1758         # wrong.
1759         self.sut.start(self.all_rx_cores)
1760         time.sleep(0.5)
1761         self.sut.stop(self.all_rx_cores)
1762         time.sleep(0.5)
1763         self.sut.reset_stats()
1764
1765         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1766         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1767
1768         self.sut.reset_values(self.tun_cores)
1769         self.sut.reset_values(self.inet_cores)
1770
1771         # Set correct IP and UDP lengths in packet headers
1772         # tun
1773         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1774         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1775         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1776         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1777         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1778         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1779
1780         # INET
1781         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1782         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1783         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1784         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1785
1786         LOG.info("Initializing SUT: sending lwAFTR packets")
1787         self.sut.set_speed(self.inet_cores, curr_up_speed)
1788         self.sut.set_speed(self.tun_cores, curr_down_speed)
1789         time.sleep(4)
1790
1791         # Ramp up the transmission speed. First go to the common speed, then
1792         # increase steps for the faster one.
1793         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1794
1795         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1796
1797         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1798             # The min(..., ...) takes care of 1) floating point rounding errors
1799             # that could make curr_*_speed to be slightly greater than
1800             # max_*_speed and 2) max_*_speed not being an exact multiple of
1801             # self._step_delta.
1802             if curr_up_speed < max_up_speed:
1803                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1804             if curr_down_speed < max_down_speed:
1805                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1806
1807             self.sut.set_speed(self.inet_cores, curr_up_speed)
1808             self.sut.set_speed(self.tun_cores, curr_down_speed)
1809             time.sleep(self.step_time)
1810
1811         LOG.info("Target speeds reached. Starting real test.")
1812
1813         yield
1814
1815         self.sut.stop(self.tun_cores + self.inet_cores)
1816         LOG.info("Test ended. Flushing NIC buffers")
1817         self.sut.start(self.all_rx_cores)
1818         time.sleep(3)
1819         self.sut.stop(self.all_rx_cores)
1820
1821     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1822                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1823         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1824                                      value, tolerated_loss, line_speed)
1825
1826         with data_helper, self.traffic_context(pkt_size, value):
1827             with data_helper.measure_tot_stats():
1828                 time.sleep(duration)
1829                 # Getting statistics to calculate PPS at right speed....
1830                 data_helper.capture_tsc_hz()
1831                 data_helper.latency = self.get_latency()
1832
1833         return data_helper.result_tuple, data_helper.samples