Merge "Create get_description and get_scenario_type for Scenario"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
15
16 import array
17 import io
18 import logging
19 import operator
20 import os
21 import re
22 import select
23 import socket
24 import time
25 from collections import OrderedDict, namedtuple
26 from contextlib import contextmanager
27 from itertools import repeat, chain
28 from multiprocessing import Queue
29
30 import six
31 from six.moves import cStringIO
32 from six.moves import zip, StringIO
33
34 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
35 from yardstick.common import utils
36 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
37 from yardstick.network_services.helpers.iniparser import ConfigParser
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
39 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
40
41 PROX_PORT = 8474
42
43 SECTION_NAME = 0
44 SECTION_CONTENTS = 1
45
46 LOG = logging.getLogger(__name__)
47 LOG.setLevel(logging.DEBUG)
48
49 TEN_GIGABIT = 1e10
50 BITS_PER_BYTE = 8
51 RETRY_SECONDS = 60
52 RETRY_INTERVAL = 1
53
54 CONFIGURATION_OPTIONS = (
55     # dict key           section     key               default value
56     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
57     ('testDuration', 'general', 'test_duration', 5.0),
58     ('testPrecision', 'general', 'test_precision', 1.0),
59     ('tests', 'general', 'tests', None),
60     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
61
62     ('logFile', 'logging', 'file', 'dats.log'),
63     ('logDateFormat', 'logging', 'datefmt', None),
64     ('logLevel', 'logging', 'level', 'INFO'),
65     ('logOverwrite', 'logging', 'overwrite', 1),
66
67     ('testerIp', 'tester', 'ip', None),
68     ('testerUser', 'tester', 'user', 'root'),
69     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
70     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
71     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
72     ('testerSocketId', 'tester', 'socket_id', 0),
73
74     ('sutIp', 'sut', 'ip', None),
75     ('sutUser', 'sut', 'user', 'root'),
76     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
77     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
78     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
79     ('sutSocketId', 'sut', 'socket_id', 0),
80 )
81
82
83 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
84     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
85
86     def __new__(cls, *args):
87         try:
88             matches = cls.CORE_RE.search(str(args[0]))
89             if matches:
90                 args = matches.groups()
91
92             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
93                                                        'h' if args[2] else '')
94
95         except (AttributeError, TypeError, IndexError, ValueError):
96             raise ValueError('Invalid core spec {}'.format(args))
97
98     def is_hyperthread(self):
99         return self.hyperthread == 'h'
100
101     @property
102     def index(self):
103         return int(self.is_hyperthread())
104
105     def find_in_topology(self, cpu_topology):
106         try:
107             socket_core_match = cpu_topology[self.socket_id][self.core_id]
108             sorted_match = sorted(socket_core_match.values())
109             return sorted_match[self.index][0]
110         except (KeyError, IndexError):
111             template = "Core {}{} on socket {} does not exist"
112             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
113
114
115 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
116     def __new__(cls, *args):
117         try:
118             assert args[0] is not str(args[0])
119             args = tuple(args[0])
120         except (AssertionError, IndexError, TypeError):
121             pass
122
123         return super(TotStatsTuple, cls).__new__(cls, *args)
124
125
126 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
127                                                         'delta_tx,delta_tsc,'
128                                                         'latency,rx_total,tx_total,pps')):
129     @property
130     def pkt_loss(self):
131         try:
132             return 1e2 * self.drop_total / float(self.tx_total)
133         except ZeroDivisionError:
134             return 100.0
135
136     @property
137     def mpps(self):
138         # calculate the effective throughput in Mpps
139         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
140
141     @property
142     def can_be_lost(self):
143         return int(self.tx_total * self.tolerated / 1e2)
144
145     @property
146     def drop_total(self):
147         return self.tx_total - self.rx_total
148
149     @property
150     def success(self):
151         return self.drop_total <= self.can_be_lost
152
153     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
154         if pkt_loss is None:
155             pkt_loss = self.pkt_loss
156
157         if port_samples is None:
158             port_samples = {}
159
160         latency_keys = [
161             "LatencyMin",
162             "LatencyMax",
163             "LatencyAvg",
164         ]
165
166         samples = {
167             "Throughput": self.mpps,
168             "DropPackets": pkt_loss,
169             "CurrentDropPackets": pkt_loss,
170             "TxThroughput": self.pps / 1e6,
171             "RxThroughput": self.mpps,
172             "PktSize": pkt_size,
173         }
174         if port_samples:
175             samples.update(port_samples)
176
177         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
178         return samples
179
180     def log_data(self, logger=None):
181         if logger is None:
182             logger = LOG
183
184         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
185         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
186         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
187
188
189 class PacketDump(object):
190     @staticmethod
191     def assert_func(func, value1, value2, template=None):
192         assert func(value1, value2), template.format(value1, value2)
193
194     def __init__(self, port_id, data_len, payload):
195         template = "Packet dump has specified length {}, but payload is {} bytes long"
196         self.assert_func(operator.eq, data_len, len(payload), template)
197         self._port_id = port_id
198         self._data_len = data_len
199         self._payload = payload
200
201     @property
202     def port_id(self):
203         """Get the port id of the packet dump"""
204         return self._port_id
205
206     @property
207     def data_len(self):
208         """Get the length of the data received"""
209         return self._data_len
210
211     def __str__(self):
212         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
213
214     def payload(self, start=None, end=None):
215         """Get part of the payload as a list of ordinals.
216
217         Returns a list of byte values, matching the contents of the packet dump.
218         Optional start and end parameters can be specified to retrieve only a
219         part of the packet contents.
220
221         The number of elements in the list is equal to end - start + 1, so end
222         is the offset of the last character.
223
224         Args:
225             start (pos. int): the starting offset in the payload. If it is not
226                 specified or None, offset 0 is assumed.
227             end (pos. int): the ending offset of the payload. If it is not
228                 specified or None, the contents until the end of the packet are
229                 returned.
230
231         Returns:
232             [int, int, ...]. Each int represents the ordinal value of a byte in
233             the packet payload.
234         """
235         if start is None:
236             start = 0
237
238         if end is None:
239             end = self.data_len - 1
240
241         # Bounds checking on offsets
242         template = "Start offset must be non-negative"
243         self.assert_func(operator.ge, start, 0, template)
244
245         template = "End offset must be less than {1}"
246         self.assert_func(operator.lt, end, self.data_len, template)
247
248         # Adjust for splice operation: end offset must be 1 more than the offset
249         # of the last desired character.
250         end += 1
251
252         return self._payload[start:end]
253
254
255 class ProxSocketHelper(object):
256
257     def __init__(self, sock=None):
258         """ creates new prox instance """
259         super(ProxSocketHelper, self).__init__()
260
261         if sock is None:
262             sock = socket.socket()
263
264         self._sock = sock
265         self._pkt_dumps = []
266         self.master_stats = None
267
268     def connect(self, ip, port):
269         """Connect to the prox instance on the remote system"""
270         self._sock.connect((ip, port))
271
272     def get_socket(self):
273         """ get the socket connected to the remote instance """
274         return self._sock
275
276     def _parse_socket_data(self, decoded_data, pkt_dump_only):
277         def get_newline_index():
278             return decoded_data.find('\n', index)
279
280         ret_str = ''
281         index = 0
282         for newline_index in iter(get_newline_index, -1):
283             ret_str = decoded_data[index:newline_index]
284
285             try:
286                 mode, port_id, data_len = ret_str.split(',', 2)
287             except ValueError:
288                 mode, port_id, data_len = None, None, None
289
290             if mode != 'pktdump':
291                 # Regular 1-line message. Stop reading from the socket.
292                 LOG.debug("Regular response read")
293                 return ret_str
294
295             LOG.debug("Packet dump header read: [%s]", ret_str)
296
297             # The line is a packet dump header. Parse it, read the
298             # packet payload, store the dump for later retrieval.
299             # Skip over the packet dump and continue processing: a
300             # 1-line response may follow the packet dump.
301
302             data_len = int(data_len)
303             data_start = newline_index + 1  # + 1 to skip over \n
304             data_end = data_start + data_len
305             sub_data = decoded_data[data_start:data_end]
306             pkt_payload = array.array('B', (ord(v) for v in sub_data))
307             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
308             self._pkt_dumps.append(pkt_dump)
309
310             if pkt_dump_only:
311                 # Return boolean instead of string to signal
312                 # successful reception of the packet dump.
313                 LOG.debug("Packet dump stored, returning")
314                 return True
315
316             index = data_end + 1
317
318         return ret_str
319
320     def get_data(self, pkt_dump_only=False, timeout=1):
321         """ read data from the socket """
322
323         # This method behaves slightly differently depending on whether it is
324         # called to read the response to a command (pkt_dump_only = 0) or if
325         # it is called specifically to read a packet dump (pkt_dump_only = 1).
326         #
327         # Packet dumps look like:
328         #   pktdump,<port_id>,<data_len>\n
329         #   <packet contents as byte array>\n
330         # This means the total packet dump message consists of 2 lines instead
331         # of 1 line.
332         #
333         # - Response for a command (pkt_dump_only = 0):
334         #   1) Read response from the socket until \n (end of message)
335         #   2a) If the response is a packet dump header (starts with "pktdump,"):
336         #     - Read the packet payload and store the packet dump for later
337         #       retrieval.
338         #     - Reset the state and restart from 1). Eventually state 2b) will
339         #       be reached and the function will return.
340         #   2b) If the response is not a packet dump:
341         #     - Return the received message as a string
342         #
343         # - Explicit request to read a packet dump (pkt_dump_only = 1):
344         #   - Read the dump header and payload
345         #   - Store the packet dump for later retrieval
346         #   - Return True to signify a packet dump was successfully read
347
348         def is_ready():
349             # recv() is blocking, so avoid calling it when no data is waiting.
350             ready = select.select([self._sock], [], [], timeout)
351             return bool(ready[0])
352
353         status = False
354         ret_str = ""
355         for status in iter(is_ready, False):
356             decoded_data = self._sock.recv(256).decode('utf-8')
357             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
358
359         LOG.debug("Received data from socket: [%s]", ret_str)
360         return ret_str if status else ''
361
362     def put_command(self, to_send):
363         """ send data to the remote instance """
364         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
365         try:
366             # TODO: sendall will block, we need a timeout
367             self._sock.sendall(to_send.encode('utf-8'))
368         except:
369             pass
370
371     def get_packet_dump(self):
372         """ get the next packet dump """
373         if self._pkt_dumps:
374             return self._pkt_dumps.pop(0)
375         return None
376
377     def stop_all_reset(self):
378         """ stop the remote instance and reset stats """
379         LOG.debug("Stop all and reset stats")
380         self.stop_all()
381         self.reset_stats()
382
383     def stop_all(self):
384         """ stop all cores on the remote instance """
385         LOG.debug("Stop all")
386         self.put_command("stop all\n")
387         time.sleep(3)
388
389     def stop(self, cores, task=''):
390         """ stop specific cores on the remote instance """
391         LOG.debug("Stopping cores %s", cores)
392         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
393         time.sleep(3)
394
395     def start_all(self):
396         """ start all cores on the remote instance """
397         LOG.debug("Start all")
398         self.put_command("start all\n")
399
400     def start(self, cores):
401         """ start specific cores on the remote instance """
402         LOG.debug("Starting cores %s", cores)
403         self.put_command("start {}\n".format(join_non_strings(',', cores)))
404         time.sleep(3)
405
406     def reset_stats(self):
407         """ reset the statistics on the remote instance """
408         LOG.debug("Reset stats")
409         self.put_command("reset stats\n")
410         time.sleep(1)
411
412     def _run_template_over_cores(self, template, cores, *args):
413         for core in cores:
414             self.put_command(template.format(core, *args))
415
416     def set_pkt_size(self, cores, pkt_size):
417         """ set the packet size to generate on the remote instance """
418         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
419         pkt_size -= 4
420         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
421         time.sleep(1)
422
423     def set_value(self, cores, offset, value, length):
424         """ set value on the remote instance """
425         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
426         LOG.debug(msg, cores, value, length, offset)
427         template = "set value {} 0 {} {} {}\n"
428         self._run_template_over_cores(template, cores, offset, value, length)
429
430     def reset_values(self, cores):
431         """ reset values on the remote instance """
432         LOG.debug("Set value for core(s) %s", cores)
433         self._run_template_over_cores("reset values {} 0\n", cores)
434
435     def set_speed(self, cores, speed, tasks=None):
436         """ set speed on the remote instance """
437         if tasks is None:
438             tasks = [0] * len(cores)
439         elif len(tasks) != len(cores):
440             LOG.error("set_speed: cores and tasks must have the same len")
441         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
442         for (core, task) in list(zip(cores, tasks)):
443             self.put_command("speed {} {} {}\n".format(core, task, speed))
444
445     def slope_speed(self, cores_speed, duration, n_steps=0):
446         """will start to increase speed from 0 to N where N is taken from
447         a['speed'] for each a in cores_speed"""
448         # by default, each step will take 0.5 sec
449         if n_steps == 0:
450             n_steps = duration * 2
451
452         private_core_data = []
453         step_duration = float(duration) / n_steps
454         for core_data in cores_speed:
455             target = float(core_data['speed'])
456             private_core_data.append({
457                 'cores': core_data['cores'],
458                 'zero': 0,
459                 'delta': target / n_steps,
460                 'current': 0,
461                 'speed': target,
462             })
463
464         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
465         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
466             time.sleep(step_duration)
467             for core_data in private_core_data:
468                 core_data['current'] = core_data[key1] + core_data[key2]
469                 self.set_speed(core_data['cores'], core_data['current'])
470
471     def set_pps(self, cores, pps, pkt_size):
472         """ set packets per second for specific cores on the remote instance """
473         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
474         LOG.debug(msg, cores, pps, pkt_size)
475
476         # speed in percent of line-rate
477         speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
478         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
479
480     def lat_stats(self, cores, task=0):
481         """Get the latency statistics from the remote system"""
482         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
483         lat_min = {}
484         lat_max = {}
485         lat_avg = {}
486         for core in cores:
487             self.put_command("lat stats {} {} \n".format(core, task))
488             ret = self.get_data()
489
490             try:
491                 lat_min[core], lat_max[core], lat_avg[core] = \
492                     tuple(int(n) for n in ret.split(",")[:3])
493
494             except (AttributeError, ValueError, TypeError):
495                 pass
496
497         return lat_min, lat_max, lat_avg
498
499     def get_all_tot_stats(self):
500         self.put_command("tot stats\n")
501         all_stats_str = self.get_data().split(",")
502         if len(all_stats_str) != 4:
503             all_stats = [0] * 4
504             return all_stats
505         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
506         self.master_stats = all_stats
507         return all_stats
508
509     def hz(self):
510         return self.get_all_tot_stats()[3]
511
512     def core_stats(self, cores, task=0):
513         """Get the receive statistics from the remote system"""
514         rx = tx = drop = tsc = 0
515         for core in cores:
516             self.put_command("core stats {} {}\n".format(core, task))
517             ret = self.get_data().split(",")
518             rx += int(ret[0])
519             tx += int(ret[1])
520             drop += int(ret[2])
521             tsc = int(ret[3])
522         return rx, tx, drop, tsc
523
524     def port_stats(self, ports):
525         """get counter values from a specific port"""
526         tot_result = [0] * 12
527         for port in ports:
528             self.put_command("port_stats {}\n".format(port))
529             ret = [try_int(s, 0) for s in self.get_data().split(",")]
530             tot_result = [sum(x) for x in zip(tot_result, ret)]
531         return tot_result
532
533     @contextmanager
534     def measure_tot_stats(self):
535         start = self.get_all_tot_stats()
536         container = {'start_tot': start}
537         try:
538             yield container
539         finally:
540             container['end_tot'] = end = self.get_all_tot_stats()
541
542         container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
543
544     def tot_stats(self):
545         """Get the total statistics from the remote system"""
546         stats = self.get_all_tot_stats()
547         return stats[:3]
548
549     def tot_ierrors(self):
550         """Get the total ierrors from the remote system"""
551         self.put_command("tot ierrors tot\n")
552         recv = self.get_data().split(',')
553         tot_ierrors = int(recv[0])
554         tsc = int(recv[0])
555         return tot_ierrors, tsc
556
557     def set_count(self, count, cores):
558         """Set the number of packets to send on the specified core"""
559         self._run_template_over_cores("count {} 0 {}\n", cores, count)
560
561     def dump_rx(self, core_id, task_id=0, count=1):
562         """Activate dump on rx on the specified core"""
563         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
564         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
565         time.sleep(1.5)  # Give PROX time to set up packet dumping
566
567     def quit(self):
568         self.stop_all()
569         self._quit()
570         self.force_quit()
571
572     def _quit(self):
573         """ stop all cores on the remote instance """
574         LOG.debug("Quit prox")
575         self.put_command("quit\n")
576         time.sleep(3)
577
578     def force_quit(self):
579         """ stop all cores on the remote instance """
580         LOG.debug("Force Quit prox")
581         self.put_command("quit_force\n")
582         time.sleep(3)
583
584
585 _LOCAL_OBJECT = object()
586
587
588 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
589     # the actual app is lowercase
590     APP_NAME = 'prox'
591     # not used for Prox but added for consistency
592     VNF_TYPE = "PROX"
593
594     LUA_PARAMETER_NAME = ""
595     LUA_PARAMETER_PEER = {
596         "gen": "sut",
597         "sut": "gen",
598     }
599
600     CONFIG_QUEUE_TIMEOUT = 120
601
602     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
603         self.remote_path = None
604         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
605         self.remote_prox_file_name = None
606         self._prox_config_data = None
607         self.additional_files = {}
608         self.config_queue = Queue()
609         # allow_exit_without_flush
610         self.config_queue.cancel_join_thread()
611         self._global_section = None
612
613     @property
614     def prox_config_data(self):
615         if self._prox_config_data is None:
616             # this will block, but it needs too
617             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
618         return self._prox_config_data
619
620     @property
621     def global_section(self):
622         if self._global_section is None and self.prox_config_data:
623             self._global_section = self.find_section("global")
624         return self._global_section
625
626     def find_section(self, name, default=_LOCAL_OBJECT):
627         result = next((value for key, value in self.prox_config_data if key == name), default)
628         if result is _LOCAL_OBJECT:
629             raise KeyError('{} not found in Prox config'.format(name))
630         return result
631
632     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
633         section = self.find_section(section_name, [])
634         result = next((value for key, value in section if key == section_key), default)
635         if result is _LOCAL_OBJECT:
636             template = '{} not found in {} section of Prox config'
637             raise KeyError(template.format(section_key, section_name))
638         return result
639
640     def _build_pipeline_kwargs(self):
641         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
642         self.pipeline_kwargs = {
643             'tool_path': tool_path,
644             'tool_dir': os.path.dirname(tool_path),
645         }
646
647     def copy_to_target(self, config_file_path, prox_file):
648         remote_path = os.path.join("/tmp", prox_file)
649         self.ssh_helper.put(config_file_path, remote_path)
650         return remote_path
651
652     @staticmethod
653     def _get_tx_port(section, sections):
654         iface_port = [-1]
655         for item in sections[section]:
656             if item[0] == "tx port":
657                 iface_port = re.findall(r'\d+', item[1])
658                 # do we want the last one?
659                 #   if yes, then can we reverse?
660         return int(iface_port[0])
661
662     @staticmethod
663     def _replace_quoted_with_value(quoted, value, count=1):
664         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
665         return new_string
666
667     def _insert_additional_file(self, value):
668         file_str = value.split('"')
669         base_name = os.path.basename(file_str[1])
670         file_str[1] = self.additional_files[base_name]
671         return '"'.join(file_str)
672
673     def generate_prox_config_file(self, config_path):
674         sections = []
675         prox_config = ConfigParser(config_path, sections)
676         prox_config.parse()
677
678         # Ensure MAC is set "hardware"
679         all_ports = self.vnfd_helper.port_pairs.all_ports
680         # use dpdk port number
681         for port_name in all_ports:
682             port_num = self.vnfd_helper.port_num(port_name)
683             port_section_name = "port {}".format(port_num)
684             for section_name, section in sections:
685                 if port_section_name != section_name:
686                     continue
687
688                 for index, section_data in enumerate(section):
689                     if section_data[0] == "mac":
690                         section_data[1] = "hardware"
691
692         # search for dst mac
693         for _, section in sections:
694             # for index, (item_key, item_val) in enumerate(section):
695             for index, section_data in enumerate(section):
696                 item_key, item_val = section_data
697                 if item_val.startswith("@@dst_mac"):
698                     tx_port_iter = re.finditer(r'\d+', item_val)
699                     tx_port_no = int(next(tx_port_iter).group(0))
700                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
701                     mac = intf["virtual-interface"]["dst_mac"]
702                     section_data[1] = mac.replace(":", " ", 6)
703
704                 if item_key == "dst mac" and item_val.startswith("@@"):
705                     tx_port_iter = re.finditer(r'\d+', item_val)
706                     tx_port_no = int(next(tx_port_iter).group(0))
707                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
708                     mac = intf["virtual-interface"]["dst_mac"]
709                     section_data[1] = mac
710
711         # if addition file specified in prox config
712         if not self.additional_files:
713             return sections
714
715         for section_name, section in sections:
716             for index, section_data in enumerate(section):
717                 try:
718                     if section_data[0].startswith("dofile"):
719                         section_data[0] = self._insert_additional_file(section_data[0])
720
721                     if section_data[1].startswith("dofile"):
722                         section_data[1] = self._insert_additional_file(section_data[1])
723                 except:
724                     pass
725
726         return sections
727
728     @staticmethod
729     def write_prox_lua(lua_config):
730         """
731         Write an .ini-format config file for PROX (parameters.lua)
732         PROX does not allow a space before/after the =, so we need
733         a custom method
734         """
735         out = []
736         for key in lua_config:
737             value = '"' + lua_config[key] + '"'
738             if key == "__name__":
739                 continue
740             if value is not None and value != '@':
741                 key = "=".join((key, str(value).replace('\n', '\n\t')))
742                 out.append(key)
743             else:
744                 key = str(key).replace('\n', '\n\t')
745                 out.append(key)
746         return os.linesep.join(out)
747
748     @staticmethod
749     def write_prox_config(prox_config):
750         """
751         Write an .ini-format config file for PROX
752         PROX does not allow a space before/after the =, so we need
753         a custom method
754         """
755         out = []
756         for i, (section_name, section) in enumerate(prox_config):
757             out.append("[{}]".format(section_name))
758             for index, item in enumerate(section):
759                 key, value = item
760                 if key == "__name__":
761                     continue
762                 if value is not None and value != '@':
763                     key = "=".join((key, str(value).replace('\n', '\n\t')))
764                     out.append(key)
765                 else:
766                     key = str(key).replace('\n', '\n\t')
767                     out.append(key)
768         return os.linesep.join(out)
769
770     def put_string_to_file(self, s, remote_path):
771         file_obj = cStringIO(s)
772         self.ssh_helper.put_file_obj(file_obj, remote_path)
773         return remote_path
774
775     def generate_prox_lua_file(self):
776         p = OrderedDict()
777         all_ports = self.vnfd_helper.port_pairs.all_ports
778         for port_name in all_ports:
779             port_num = self.vnfd_helper.port_num(port_name)
780             intf = self.vnfd_helper.find_interface(name=port_name)
781             vintf = intf['virtual-interface']
782             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
783             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
784
785         return p
786
787     def upload_prox_lua(self, config_file, lua_data):
788         # prox can't handle spaces around ' = ' so use custom method
789         out = StringIO(self.write_prox_lua(lua_data))
790         out.seek(0)
791         remote_path = os.path.join("/tmp", config_file)
792         self.ssh_helper.put_file_obj(out, remote_path)
793
794         return remote_path
795
796     def upload_prox_config(self, config_file, prox_config_data):
797         # prox can't handle spaces around ' = ' so use custom method
798         out = StringIO(self.write_prox_config(prox_config_data))
799         out.seek(0)
800         remote_path = os.path.join("/tmp", config_file)
801         self.ssh_helper.put_file_obj(out, remote_path)
802
803         return remote_path
804
805     def build_config_file(self):
806         task_path = self.scenario_helper.task_path
807         options = self.scenario_helper.options
808         config_path = options['prox_config']
809         config_file = os.path.basename(config_path)
810         config_path = find_relative_file(config_path, task_path)
811         self.additional_files = {}
812
813         try:
814             if options['prox_generate_parameter']:
815                 self.lua = []
816                 self.lua = self.generate_prox_lua_file()
817                 if len(self.lua) > 0:
818                     self.upload_prox_lua("parameters.lua", self.lua)
819         except:
820             pass
821
822         prox_files = options.get('prox_files', [])
823         if isinstance(prox_files, six.string_types):
824             prox_files = [prox_files]
825         for key_prox_file in prox_files:
826             base_prox_file = os.path.basename(key_prox_file)
827             key_prox_path = find_relative_file(key_prox_file, task_path)
828             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
829             self.additional_files[base_prox_file] = remote_prox_file
830
831         self._prox_config_data = self.generate_prox_config_file(config_path)
832         # copy config to queue so we can read it from traffic_runner process
833         self.config_queue.put(self._prox_config_data)
834         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
835
836     def build_config(self):
837         self.build_config_file()
838
839         options = self.scenario_helper.options
840
841         prox_args = options['prox_args']
842         LOG.info("Provision and start the %s", self.APP_NAME)
843         self._build_pipeline_kwargs()
844         self.pipeline_kwargs["args"] = " ".join(
845             " ".join([k, v if v else ""]) for k, v in prox_args.items())
846         self.pipeline_kwargs["cfg_file"] = self.remote_path
847
848         cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
849         prox_cmd = cmd_template.format(**self.pipeline_kwargs)
850         return prox_cmd
851
852
853 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
854 class ProxResourceHelper(ClientResourceHelper):
855
856     RESOURCE_WORD = 'prox'
857
858     PROX_MODE = ""
859
860     WAIT_TIME = 3
861
862     @staticmethod
863     def find_pci(pci, bound_pci):
864         # we have to substring match PCI bus address from the end
865         return any(b.endswith(pci) for b in bound_pci)
866
867     def __init__(self, setup_helper):
868         super(ProxResourceHelper, self).__init__(setup_helper)
869         self.mgmt_interface = self.vnfd_helper.mgmt_interface
870         self._user = self.mgmt_interface["user"]
871         self._ip = self.mgmt_interface["ip"]
872
873         self.done = False
874         self._vpci_to_if_name_map = None
875         self.additional_file = {}
876         self.remote_prox_file_name = None
877         self.lower = None
878         self.upper = None
879         self.step_delta = 1
880         self.step_time = 0.5
881         self._test_type = None
882
883     @property
884     def sut(self):
885         if not self.client:
886             self.client = self._connect()
887         return self.client
888
889     @property
890     def test_type(self):
891         if self._test_type is None:
892             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
893         return self._test_type
894
895     def run_traffic(self, traffic_profile):
896         self._queue.cancel_join_thread()
897         self.lower = 0.0
898         self.upper = 100.0
899
900         traffic_profile.init(self._queue)
901         # this frees up the run_traffic loop
902         self.client_started.value = 1
903
904         while not self._terminated.value:
905             # move it all to traffic_profile
906             self._run_traffic_once(traffic_profile)
907
908     def _run_traffic_once(self, traffic_profile):
909         traffic_profile.execute_traffic(self)
910         if traffic_profile.done:
911             self._queue.put({'done': True})
912             LOG.debug("tg_prox done")
913             self._terminated.value = 1
914
915     # For VNF use ResourceHelper method to collect KPIs directly.
916     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
917     def collect_collectd_kpi(self):
918         return self._collect_resource_kpi()
919
920     def collect_kpi(self):
921         result = super(ProxResourceHelper, self).collect_kpi()
922         # add in collectd kpis manually
923         if result:
924             result['collect_stats'] = self._collect_resource_kpi()
925         return result
926
927     def terminate(self):
928         # should not be called, use VNF terminate
929         raise NotImplementedError()
930
931     def up_post(self):
932         return self.sut  # force connection
933
934     def execute(self, cmd, *args, **kwargs):
935         func = getattr(self.sut, cmd, None)
936         if func:
937             return func(*args, **kwargs)
938
939     def _connect(self, client=None):
940         """Run and connect to prox on the remote system """
941         # De-allocating a large amount of hugepages takes some time. If a new
942         # PROX instance is started immediately after killing the previous one,
943         # it might not be able to allocate hugepages, because they are still
944         # being freed. Hence the -w switch.
945         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
946         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
947         # -f ./handle_none-4.cfg"
948         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
949         #  "; " \
950         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
951         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
952         # sudo " \
953         #    + "./build/Prox " + prox_args
954         # log.debug("Starting PROX with command [%s]", prox_cmd)
955         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
956         # self._ip, prox_cmd))
957         if client is None:
958             client = ProxSocketHelper()
959
960         # try connecting to Prox for 60s
961         for _ in range(RETRY_SECONDS):
962             time.sleep(RETRY_INTERVAL)
963             try:
964                 client.connect(self._ip, PROX_PORT)
965             except (socket.gaierror, socket.error):
966                 continue
967             else:
968                 return client
969
970         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
971         raise Exception(msg.format(self._ip, PROX_PORT))
972
973
974 class ProxDataHelper(object):
975
976     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
977         super(ProxDataHelper, self).__init__()
978         self.vnfd_helper = vnfd_helper
979         self.sut = sut
980         self.pkt_size = pkt_size
981         self.value = value
982         self.tolerated_loss = tolerated_loss
983         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
984         self.tsc_hz = None
985         self.measured_stats = None
986         self.latency = None
987         self._totals_and_pps = None
988         self.result_tuple = None
989
990     @property
991     def totals_and_pps(self):
992         if self._totals_and_pps is None:
993             rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
994             pps = self.value / 100.0 * self.line_rate_to_pps()
995             self._totals_and_pps = rx_total, tx_total, pps
996         return self._totals_and_pps
997
998     @property
999     def rx_total(self):
1000         return self.totals_and_pps[0]
1001
1002     @property
1003     def tx_total(self):
1004         return self.totals_and_pps[1]
1005
1006     @property
1007     def pps(self):
1008         return self.totals_and_pps[2]
1009
1010     @property
1011     def samples(self):
1012         samples = {}
1013         for port_name, port_num in self.vnfd_helper.ports_iter():
1014             port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1015             samples[port_name] = {
1016                 "in_packets": port_rx_total,
1017                 "out_packets": port_tx_total,
1018             }
1019         return samples
1020
1021     def __enter__(self):
1022         self.check_interface_count()
1023         return self
1024
1025     def __exit__(self, exc_type, exc_val, exc_tb):
1026         self.make_tuple()
1027
1028     def make_tuple(self):
1029         if self.result_tuple:
1030             return
1031
1032         self.result_tuple = ProxTestDataTuple(
1033             self.tolerated_loss,
1034             self.tsc_hz,
1035             self.measured_stats['delta'].rx,
1036             self.measured_stats['delta'].tx,
1037             self.measured_stats['delta'].tsc,
1038             self.latency,
1039             self.rx_total,
1040             self.tx_total,
1041             self.pps,
1042         )
1043         self.result_tuple.log_data()
1044
1045     @contextmanager
1046     def measure_tot_stats(self):
1047         with self.sut.measure_tot_stats() as self.measured_stats:
1048             yield
1049
1050     def check_interface_count(self):
1051         # do this assert in init?  unless we expect interface count to
1052         # change from one run to another run...
1053         assert self.port_count in {1, 2, 4}, \
1054             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1055
1056     def capture_tsc_hz(self):
1057         self.tsc_hz = float(self.sut.hz())
1058
1059     def line_rate_to_pps(self):
1060         # FIXME Don't hardcode 10Gb/s
1061         return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1062
1063
1064 class ProxProfileHelper(object):
1065
1066     __prox_profile_type__ = "Generic"
1067
1068     PROX_CORE_GEN_MODE = "gen"
1069     PROX_CORE_LAT_MODE = "lat"
1070
1071     @classmethod
1072     def get_cls(cls, helper_type):
1073         """Return class of specified type."""
1074         if not helper_type:
1075             return ProxProfileHelper
1076
1077         for profile_helper_class in utils.itersubclasses(cls):
1078             if helper_type == profile_helper_class.__prox_profile_type__:
1079                 return profile_helper_class
1080
1081         return ProxProfileHelper
1082
1083     @classmethod
1084     def make_profile_helper(cls, resource_helper):
1085         return cls.get_cls(resource_helper.test_type)(resource_helper)
1086
1087     def __init__(self, resource_helper):
1088         super(ProxProfileHelper, self).__init__()
1089         self.resource_helper = resource_helper
1090         self._cpu_topology = None
1091         self._test_cores = None
1092         self._latency_cores = None
1093
1094     @property
1095     def cpu_topology(self):
1096         if not self._cpu_topology:
1097             stdout = io.BytesIO()
1098             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1099             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1100         return self._cpu_topology
1101
1102     @property
1103     def test_cores(self):
1104         if not self._test_cores:
1105             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1106         return self._test_cores
1107
1108     @property
1109     def latency_cores(self):
1110         if not self._latency_cores:
1111             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1112         return self._latency_cores
1113
1114     @contextmanager
1115     def traffic_context(self, pkt_size, value):
1116         self.sut.stop_all()
1117         self.sut.reset_stats()
1118         try:
1119             self.sut.set_pkt_size(self.test_cores, pkt_size)
1120             self.sut.set_speed(self.test_cores, value)
1121             self.sut.start_all()
1122             yield
1123         finally:
1124             self.sut.stop_all()
1125
1126     def get_cores(self, mode):
1127         cores = []
1128
1129         for section_name, section in self.setup_helper.prox_config_data:
1130             if not section_name.startswith("core"):
1131                 continue
1132
1133             for key, value in section:
1134                 if key == "mode" and value == mode:
1135                     core_tuple = CoreSocketTuple(section_name)
1136                     core = core_tuple.find_in_topology(self.cpu_topology)
1137                     cores.append(core)
1138
1139         return cores
1140
1141     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1142         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1143
1144         with data_helper, self.traffic_context(pkt_size, value):
1145             with data_helper.measure_tot_stats():
1146                 time.sleep(duration)
1147                 # Getting statistics to calculate PPS at right speed....
1148                 data_helper.capture_tsc_hz()
1149                 data_helper.latency = self.get_latency()
1150
1151         return data_helper.result_tuple, data_helper.samples
1152
1153     def get_latency(self):
1154         """
1155         :return: return lat_min, lat_max, lat_avg
1156         :rtype: list
1157         """
1158         if self._latency_cores:
1159             return self.sut.lat_stats(self._latency_cores)
1160         return []
1161
1162     def terminate(self):
1163         pass
1164
1165     def __getattr__(self, item):
1166         return getattr(self.resource_helper, item)
1167
1168
1169 class ProxMplsProfileHelper(ProxProfileHelper):
1170
1171     __prox_profile_type__ = "MPLS tag/untag"
1172
1173     def __init__(self, resource_helper):
1174         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1175         self._cores_tuple = None
1176
1177     @property
1178     def mpls_cores(self):
1179         if not self._cores_tuple:
1180             self._cores_tuple = self.get_cores_mpls()
1181         return self._cores_tuple
1182
1183     @property
1184     def tagged_cores(self):
1185         return self.mpls_cores[0]
1186
1187     @property
1188     def plain_cores(self):
1189         return self.mpls_cores[1]
1190
1191     def get_cores_mpls(self):
1192         cores_tagged = []
1193         cores_plain = []
1194         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1195             if not section_name.startswith("core"):
1196                 continue
1197
1198             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1199                 continue
1200
1201             for item_key, item_value in section:
1202                 if item_key != 'name':
1203                     continue
1204
1205                 if item_value.startswith("tag"):
1206                     core_tuple = CoreSocketTuple(section_name)
1207                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1208                     cores_tagged.append(core_tag)
1209
1210                 elif item_value.startswith("udp"):
1211                     core_tuple = CoreSocketTuple(section_name)
1212                     core_udp = core_tuple.find_in_topology(self.cpu_topology)
1213                     cores_plain.append(core_udp)
1214
1215         return cores_tagged, cores_plain
1216
1217     @contextmanager
1218     def traffic_context(self, pkt_size, value):
1219         self.sut.stop_all()
1220         self.sut.reset_stats()
1221         try:
1222             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1223             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1224             self.sut.set_speed(self.tagged_cores, value)
1225             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1226             self.sut.set_speed(self.plain_cores, value * ratio)
1227             self.sut.start_all()
1228             yield
1229         finally:
1230             self.sut.stop_all()
1231
1232
1233 class ProxBngProfileHelper(ProxProfileHelper):
1234
1235     __prox_profile_type__ = "BNG gen"
1236
1237     def __init__(self, resource_helper):
1238         super(ProxBngProfileHelper, self).__init__(resource_helper)
1239         self._cores_tuple = None
1240
1241     @property
1242     def bng_cores(self):
1243         if not self._cores_tuple:
1244             self._cores_tuple = self.get_cores_gen_bng_qos()
1245         return self._cores_tuple
1246
1247     @property
1248     def cpe_cores(self):
1249         return self.bng_cores[0]
1250
1251     @property
1252     def inet_cores(self):
1253         return self.bng_cores[1]
1254
1255     @property
1256     def arp_cores(self):
1257         return self.bng_cores[2]
1258
1259     @property
1260     def arp_task_cores(self):
1261         return self.bng_cores[3]
1262
1263     @property
1264     def all_rx_cores(self):
1265         return self.latency_cores
1266
1267     def get_cores_gen_bng_qos(self):
1268         cpe_cores = []
1269         inet_cores = []
1270         arp_cores = []
1271         arp_tasks_core = [0]
1272         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1273             if not section_name.startswith("core"):
1274                 continue
1275
1276             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1277                 continue
1278
1279             for item_key, item_value in section:
1280                 if item_key != 'name':
1281                     continue
1282
1283                 if item_value.startswith("cpe"):
1284                     core_tuple = CoreSocketTuple(section_name)
1285                     cpe_core = core_tuple.find_in_topology(self.cpu_topology)
1286                     cpe_cores.append(cpe_core)
1287
1288                 elif item_value.startswith("inet"):
1289                     core_tuple = CoreSocketTuple(section_name)
1290                     inet_core = core_tuple.find_in_topology(self.cpu_topology)
1291                     inet_cores.append(inet_core)
1292
1293                 elif item_value.startswith("arp"):
1294                     core_tuple = CoreSocketTuple(section_name)
1295                     arp_core = core_tuple.find_in_topology(self.cpu_topology)
1296                     arp_cores.append(arp_core)
1297
1298                 # We check the tasks/core separately
1299                 if item_value.startswith("arp_task"):
1300                     core_tuple = CoreSocketTuple(section_name)
1301                     arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
1302                     arp_tasks_core.append(arp_task_core)
1303
1304         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1305
1306     @contextmanager
1307     def traffic_context(self, pkt_size, value):
1308         # Tester is sending packets at the required speed already after
1309         # setup_test(). Just get the current statistics, sleep the required
1310         # amount of time and calculate packet loss.
1311         inet_pkt_size = pkt_size
1312         cpe_pkt_size = pkt_size - 24
1313         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1314
1315         curr_up_speed = curr_down_speed = 0
1316         max_up_speed = max_down_speed = value
1317         if ratio < 1:
1318             max_down_speed = value * ratio
1319         else:
1320             max_up_speed = value / ratio
1321
1322         # Initialize cores
1323         self.sut.stop_all()
1324         time.sleep(0.5)
1325
1326         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1327         # wrong.
1328         self.sut.start(self.all_rx_cores)
1329         time.sleep(0.5)
1330         self.sut.stop(self.all_rx_cores)
1331         time.sleep(0.5)
1332         self.sut.reset_stats()
1333
1334         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1335         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1336
1337         self.sut.reset_values(self.cpe_cores)
1338         self.sut.reset_values(self.inet_cores)
1339
1340         # Set correct IP and UDP lengths in packet headers
1341         # CPE
1342         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1343         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1344         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1345         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1346
1347         # INET
1348         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1349         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1350         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1351         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1352         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1353         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1354
1355         # Sending ARP to initialize tables - need a few seconds of generation
1356         # to make sure all CPEs are initialized
1357         LOG.info("Initializing SUT: sending ARP packets")
1358         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1359         self.sut.set_speed(self.inet_cores, curr_up_speed)
1360         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1361         self.sut.start(self.arp_cores)
1362         time.sleep(4)
1363
1364         # Ramp up the transmission speed. First go to the common speed, then
1365         # increase steps for the faster one.
1366         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1367
1368         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1369
1370         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1371             # The min(..., ...) takes care of 1) floating point rounding errors
1372             # that could make curr_*_speed to be slightly greater than
1373             # max_*_speed and 2) max_*_speed not being an exact multiple of
1374             # self._step_delta.
1375             if curr_up_speed < max_up_speed:
1376                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1377             if curr_down_speed < max_down_speed:
1378                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1379
1380             self.sut.set_speed(self.inet_cores, curr_up_speed)
1381             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1382             time.sleep(self.step_time)
1383
1384         LOG.info("Target speeds reached. Starting real test.")
1385
1386         yield
1387
1388         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1389         LOG.info("Test ended. Flushing NIC buffers")
1390         self.sut.start(self.all_rx_cores)
1391         time.sleep(3)
1392         self.sut.stop(self.all_rx_cores)
1393
1394     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1395         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1396
1397         with data_helper, self.traffic_context(pkt_size, value):
1398             with data_helper.measure_tot_stats():
1399                 time.sleep(duration)
1400                 # Getting statistics to calculate PPS at right speed....
1401                 data_helper.capture_tsc_hz()
1402                 data_helper.latency = self.get_latency()
1403
1404         return data_helper.result_tuple, data_helper.samples
1405
1406
1407 class ProxVpeProfileHelper(ProxProfileHelper):
1408
1409     __prox_profile_type__ = "vPE gen"
1410
1411     def __init__(self, resource_helper):
1412         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1413         self._cores_tuple = None
1414         self._ports_tuple = None
1415
1416     @property
1417     def vpe_cores(self):
1418         if not self._cores_tuple:
1419             self._cores_tuple = self.get_cores_gen_vpe()
1420         return self._cores_tuple
1421
1422     @property
1423     def cpe_cores(self):
1424         return self.vpe_cores[0]
1425
1426     @property
1427     def inet_cores(self):
1428         return self.vpe_cores[1]
1429
1430     @property
1431     def all_rx_cores(self):
1432         return self.latency_cores
1433
1434     @property
1435     def vpe_ports(self):
1436         if not self._ports_tuple:
1437             self._ports_tuple = self.get_ports_gen_vpe()
1438         return self._ports_tuple
1439
1440     @property
1441     def cpe_ports(self):
1442         return self.vpe_ports[0]
1443
1444     @property
1445     def inet_ports(self):
1446         return self.vpe_ports[1]
1447
1448     def get_cores_gen_vpe(self):
1449         cpe_cores = []
1450         inet_cores = []
1451         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1452             if not section_name.startswith("core"):
1453                 continue
1454
1455             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1456                 continue
1457
1458             for item_key, item_value in section:
1459                 if item_key != 'name':
1460                     continue
1461
1462                 if item_value.startswith("cpe"):
1463                     core_tuple = CoreSocketTuple(section_name)
1464                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1465                     cpe_cores.append(core_tag)
1466
1467                 elif item_value.startswith("inet"):
1468                     core_tuple = CoreSocketTuple(section_name)
1469                     inet_core = core_tuple.find_in_topology(self.cpu_topology)
1470                     inet_cores.append(inet_core)
1471
1472         return cpe_cores, inet_cores
1473
1474     def get_ports_gen_vpe(self):
1475         cpe_ports = []
1476         inet_ports = []
1477
1478         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1479             if not section_name.startswith("port"):
1480                 continue
1481             tx_port_iter = re.finditer(r'\d+', section_name)
1482             tx_port_no = int(next(tx_port_iter).group(0))
1483
1484             for item_key, item_value in section:
1485                 if item_key != 'name':
1486                     continue
1487
1488                 if item_value.startswith("cpe"):
1489                     cpe_ports.append(tx_port_no)
1490
1491                 elif item_value.startswith("inet"):
1492                     inet_ports.append(tx_port_no)
1493
1494         return cpe_ports, inet_ports
1495
1496     @contextmanager
1497     def traffic_context(self, pkt_size, value):
1498         # Calculate the target upload and download speed. The upload and
1499         # download packets have different packet sizes, so in order to get
1500         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1501         # of the packet sizes.
1502         cpe_pkt_size = pkt_size
1503         inet_pkt_size = pkt_size - 4
1504         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1505
1506         curr_up_speed = curr_down_speed = 0
1507         max_up_speed = max_down_speed = value
1508         if ratio < 1:
1509             max_down_speed = value * ratio
1510         else:
1511             max_up_speed = value / ratio
1512
1513         # Adjust speed when multiple cores per port are used to generate traffic
1514         if len(self.cpe_ports) != len(self.cpe_cores):
1515             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1516         if len(self.inet_ports) != len(self.inet_cores):
1517             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1518
1519         # Initialize cores
1520         self.sut.stop_all()
1521         time.sleep(2)
1522
1523         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1524         # wrong.
1525         self.sut.start(self.all_rx_cores)
1526         time.sleep(2)
1527         self.sut.stop(self.all_rx_cores)
1528         time.sleep(2)
1529         self.sut.reset_stats()
1530
1531         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1532         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1533
1534         self.sut.reset_values(self.cpe_cores)
1535         self.sut.reset_values(self.inet_cores)
1536
1537         # Set correct IP and UDP lengths in packet headers
1538         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1539         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1540         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1541         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1542
1543         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1544         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1545         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1546         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1547
1548         self.sut.set_speed(self.inet_cores, curr_up_speed)
1549         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1550
1551         # Ramp up the transmission speed. First go to the common speed, then
1552         # increase steps for the faster one.
1553         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1554
1555         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1556
1557         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1558             # The min(..., ...) takes care of 1) floating point rounding errors
1559             # that could make curr_*_speed to be slightly greater than
1560             # max_*_speed and 2) max_*_speed not being an exact multiple of
1561             # self._step_delta.
1562             if curr_up_speed < max_up_speed:
1563                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1564             if curr_down_speed < max_down_speed:
1565                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1566
1567             self.sut.set_speed(self.inet_cores, curr_up_speed)
1568             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1569             time.sleep(self.step_time)
1570
1571         LOG.info("Target speeds reached. Starting real test.")
1572
1573         yield
1574
1575         self.sut.stop(self.cpe_cores + self.inet_cores)
1576         LOG.info("Test ended. Flushing NIC buffers")
1577         self.sut.start(self.all_rx_cores)
1578         time.sleep(3)
1579         self.sut.stop(self.all_rx_cores)
1580
1581     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1582         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1583
1584         with data_helper, self.traffic_context(pkt_size, value):
1585             with data_helper.measure_tot_stats():
1586                 time.sleep(duration)
1587                 # Getting statistics to calculate PPS at right speed....
1588                 data_helper.capture_tsc_hz()
1589                 data_helper.latency = self.get_latency()
1590
1591         return data_helper.result_tuple, data_helper.samples
1592
1593
1594 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1595
1596     __prox_profile_type__ = "lwAFTR gen"
1597
1598     def __init__(self, resource_helper):
1599         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1600         self._cores_tuple = None
1601         self._ports_tuple = None
1602         self.step_delta = 5
1603         self.step_time = 0.5
1604
1605     @property
1606     def _lwaftr_cores(self):
1607         if not self._cores_tuple:
1608             self._cores_tuple = self._get_cores_gen_lwaftr()
1609         return self._cores_tuple
1610
1611     @property
1612     def tun_cores(self):
1613         return self._lwaftr_cores[0]
1614
1615     @property
1616     def inet_cores(self):
1617         return self._lwaftr_cores[1]
1618
1619     @property
1620     def _lwaftr_ports(self):
1621         if not self._ports_tuple:
1622             self._ports_tuple = self._get_ports_gen_lw_aftr()
1623         return self._ports_tuple
1624
1625     @property
1626     def tun_ports(self):
1627         return self._lwaftr_ports[0]
1628
1629     @property
1630     def inet_ports(self):
1631         return self._lwaftr_ports[1]
1632
1633     @property
1634     def all_rx_cores(self):
1635         return self.latency_cores
1636
1637     def _get_cores_gen_lwaftr(self):
1638         tun_cores = []
1639         inet_cores = []
1640         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1641             if not section_name.startswith("core"):
1642                 continue
1643
1644             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1645                 continue
1646
1647             core_tuple = CoreSocketTuple(section_name)
1648             core_tag = core_tuple.find_in_topology(self.cpu_topology)
1649             for item_value in (v for k, v in section if k == 'name'):
1650                 if item_value.startswith('tun'):
1651                     tun_cores.append(core_tag)
1652                 elif item_value.startswith('inet'):
1653                     inet_cores.append(core_tag)
1654
1655         return tun_cores, inet_cores
1656
1657     def _get_ports_gen_lw_aftr(self):
1658         tun_ports = []
1659         inet_ports = []
1660
1661         re_port = re.compile('port (\d+)')
1662         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1663             match = re_port.search(section_name)
1664             if not match:
1665                 continue
1666
1667             tx_port_no = int(match.group(1))
1668             for item_value in (v for k, v in section if k == 'name'):
1669                 if item_value.startswith('lwB4'):
1670                     tun_ports.append(tx_port_no)
1671                 elif item_value.startswith('inet'):
1672                     inet_ports.append(tx_port_no)
1673
1674         return tun_ports, inet_ports
1675
1676     @staticmethod
1677     def _resize(len1, len2):
1678         if len1 == len2:
1679             return 1.0
1680         return 1.0 * len1 / len2
1681
1682     @contextmanager
1683     def traffic_context(self, pkt_size, value):
1684         # Tester is sending packets at the required speed already after
1685         # setup_test(). Just get the current statistics, sleep the required
1686         # amount of time and calculate packet loss.
1687         tun_pkt_size = pkt_size
1688         inet_pkt_size = pkt_size - 40
1689         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1690
1691         curr_up_speed = curr_down_speed = 0
1692         max_up_speed = max_down_speed = value
1693
1694         max_up_speed = value / ratio
1695
1696         # Adjust speed when multiple cores per port are used to generate traffic
1697         if len(self.tun_ports) != len(self.tun_cores):
1698             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1699         if len(self.inet_ports) != len(self.inet_cores):
1700             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1701
1702         # Initialize cores
1703         self.sut.stop_all()
1704         time.sleep(0.5)
1705
1706         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1707         # wrong.
1708         self.sut.start(self.all_rx_cores)
1709         time.sleep(0.5)
1710         self.sut.stop(self.all_rx_cores)
1711         time.sleep(0.5)
1712         self.sut.reset_stats()
1713
1714         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1715         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1716
1717         self.sut.reset_values(self.tun_cores)
1718         self.sut.reset_values(self.inet_cores)
1719
1720         # Set correct IP and UDP lengths in packet headers
1721         # tun
1722         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1723         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1724         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1725         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1726         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1727         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1728
1729         # INET
1730         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1731         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1732         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1733         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1734
1735         LOG.info("Initializing SUT: sending lwAFTR packets")
1736         self.sut.set_speed(self.inet_cores, curr_up_speed)
1737         self.sut.set_speed(self.tun_cores, curr_down_speed)
1738         time.sleep(4)
1739
1740         # Ramp up the transmission speed. First go to the common speed, then
1741         # increase steps for the faster one.
1742         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1743
1744         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1745
1746         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1747             # The min(..., ...) takes care of 1) floating point rounding errors
1748             # that could make curr_*_speed to be slightly greater than
1749             # max_*_speed and 2) max_*_speed not being an exact multiple of
1750             # self._step_delta.
1751             if curr_up_speed < max_up_speed:
1752                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1753             if curr_down_speed < max_down_speed:
1754                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1755
1756             self.sut.set_speed(self.inet_cores, curr_up_speed)
1757             self.sut.set_speed(self.tun_cores, curr_down_speed)
1758             time.sleep(self.step_time)
1759
1760         LOG.info("Target speeds reached. Starting real test.")
1761
1762         yield
1763
1764         self.sut.stop(self.tun_cores + self.inet_cores)
1765         LOG.info("Test ended. Flushing NIC buffers")
1766         self.sut.start(self.all_rx_cores)
1767         time.sleep(3)
1768         self.sut.stop(self.all_rx_cores)
1769
1770     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1771         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1772
1773         with data_helper, self.traffic_context(pkt_size, value):
1774             with data_helper.measure_tot_stats():
1775                 time.sleep(duration)
1776                 # Getting statistics to calculate PPS at right speed....
1777                 data_helper.capture_tsc_hz()
1778                 data_helper.latency = self.get_latency()
1779
1780         return data_helper.result_tuple, data_helper.samples