Merge "Addition of Prox NSB BNG and BNG-QoS test"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
15
16 import array
17 import operator
18 import logging
19 import io
20 import os
21 import re
22 import select
23 import socket
24
25 from collections import OrderedDict, namedtuple
26 import time
27 from contextlib import contextmanager
28 from itertools import repeat, chain
29 from multiprocessing import Queue
30
31 import six
32 from six.moves import zip, StringIO
33 from six.moves import cStringIO
34
35 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
36 from yardstick.common import utils
37 from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
38 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
39 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
40 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
41
42 PROX_PORT = 8474
43
44 SECTION_NAME = 0
45 SECTION_CONTENTS = 1
46
47 LOG = logging.getLogger(__name__)
48 LOG.setLevel(logging.DEBUG)
49
50 TEN_GIGABIT = 1e10
51 BITS_PER_BYTE = 8
52 RETRY_SECONDS = 60
53 RETRY_INTERVAL = 1
54
55 CONFIGURATION_OPTIONS = (
56     # dict key           section     key               default value
57     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
58     ('testDuration', 'general', 'test_duration', 5.0),
59     ('testPrecision', 'general', 'test_precision', 1.0),
60     ('tests', 'general', 'tests', None),
61     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
62
63     ('logFile', 'logging', 'file', 'dats.log'),
64     ('logDateFormat', 'logging', 'datefmt', None),
65     ('logLevel', 'logging', 'level', 'INFO'),
66     ('logOverwrite', 'logging', 'overwrite', 1),
67
68     ('testerIp', 'tester', 'ip', None),
69     ('testerUser', 'tester', 'user', 'root'),
70     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
71     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
72     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
73     ('testerSocketId', 'tester', 'socket_id', 0),
74
75     ('sutIp', 'sut', 'ip', None),
76     ('sutUser', 'sut', 'user', 'root'),
77     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
78     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
79     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
80     ('sutSocketId', 'sut', 'socket_id', 0),
81 )
82
83
84 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
85     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
86
87     def __new__(cls, *args):
88         try:
89             matches = cls.CORE_RE.search(str(args[0]))
90             if matches:
91                 args = matches.groups()
92
93             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94                                                        'h' if args[2] else '')
95
96         except (AttributeError, TypeError, IndexError, ValueError):
97             raise ValueError('Invalid core spec {}'.format(args))
98
99     def is_hyperthread(self):
100         return self.hyperthread == 'h'
101
102     @property
103     def index(self):
104         return int(self.is_hyperthread())
105
106     def find_in_topology(self, cpu_topology):
107         try:
108             socket_core_match = cpu_topology[self.socket_id][self.core_id]
109             sorted_match = sorted(socket_core_match.values())
110             return sorted_match[self.index][0]
111         except (KeyError, IndexError):
112             template = "Core {}{} on socket {} does not exist"
113             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
114
115
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
117     def __new__(cls, *args):
118         try:
119             assert args[0] is not str(args[0])
120             args = tuple(args[0])
121         except (AssertionError, IndexError, TypeError):
122             pass
123
124         return super(TotStatsTuple, cls).__new__(cls, *args)
125
126
127 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
128                                                         'delta_tx,delta_tsc,'
129                                                         'latency,rx_total,tx_total,pps')):
130     @property
131     def pkt_loss(self):
132         try:
133             return 1e2 * self.drop_total / float(self.tx_total)
134         except ZeroDivisionError:
135             return 100.0
136
137     @property
138     def mpps(self):
139         # calculate the effective throughput in Mpps
140         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
141
142     @property
143     def can_be_lost(self):
144         return int(self.tx_total * self.tolerated / 1e2)
145
146     @property
147     def drop_total(self):
148         return self.tx_total - self.rx_total
149
150     @property
151     def success(self):
152         return self.drop_total <= self.can_be_lost
153
154     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
155         if pkt_loss is None:
156             pkt_loss = self.pkt_loss
157
158         if port_samples is None:
159             port_samples = {}
160
161         latency_keys = [
162             "LatencyMin",
163             "LatencyMax",
164             "LatencyAvg",
165         ]
166
167         samples = {
168             "Throughput": self.mpps,
169             "DropPackets": pkt_loss,
170             "CurrentDropPackets": pkt_loss,
171             "TxThroughput": self.pps / 1e6,
172             "RxThroughput": self.mpps,
173             "PktSize": pkt_size,
174         }
175         if port_samples:
176             samples.update(port_samples)
177
178         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
179         return samples
180
181     def log_data(self, logger=None):
182         if logger is None:
183             logger = LOG
184
185         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
186         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
187         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
188
189
190 class PacketDump(object):
191     @staticmethod
192     def assert_func(func, value1, value2, template=None):
193         assert func(value1, value2), template.format(value1, value2)
194
195     def __init__(self, port_id, data_len, payload):
196         template = "Packet dump has specified length {}, but payload is {} bytes long"
197         self.assert_func(operator.eq, data_len, len(payload), template)
198         self._port_id = port_id
199         self._data_len = data_len
200         self._payload = payload
201
202     @property
203     def port_id(self):
204         """Get the port id of the packet dump"""
205         return self._port_id
206
207     @property
208     def data_len(self):
209         """Get the length of the data received"""
210         return self._data_len
211
212     def __str__(self):
213         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
214
215     def payload(self, start=None, end=None):
216         """Get part of the payload as a list of ordinals.
217
218         Returns a list of byte values, matching the contents of the packet dump.
219         Optional start and end parameters can be specified to retrieve only a
220         part of the packet contents.
221
222         The number of elements in the list is equal to end - start + 1, so end
223         is the offset of the last character.
224
225         Args:
226             start (pos. int): the starting offset in the payload. If it is not
227                 specified or None, offset 0 is assumed.
228             end (pos. int): the ending offset of the payload. If it is not
229                 specified or None, the contents until the end of the packet are
230                 returned.
231
232         Returns:
233             [int, int, ...]. Each int represents the ordinal value of a byte in
234             the packet payload.
235         """
236         if start is None:
237             start = 0
238
239         if end is None:
240             end = self.data_len - 1
241
242         # Bounds checking on offsets
243         template = "Start offset must be non-negative"
244         self.assert_func(operator.ge, start, 0, template)
245
246         template = "End offset must be less than {1}"
247         self.assert_func(operator.lt, end, self.data_len, template)
248
249         # Adjust for splice operation: end offset must be 1 more than the offset
250         # of the last desired character.
251         end += 1
252
253         return self._payload[start:end]
254
255
256 class ProxSocketHelper(object):
257
258     def __init__(self, sock=None):
259         """ creates new prox instance """
260         super(ProxSocketHelper, self).__init__()
261
262         if sock is None:
263             sock = socket.socket()
264
265         self._sock = sock
266         self._pkt_dumps = []
267         self.master_stats = None
268
269     def connect(self, ip, port):
270         """Connect to the prox instance on the remote system"""
271         self._sock.connect((ip, port))
272
273     def get_socket(self):
274         """ get the socket connected to the remote instance """
275         return self._sock
276
277     def _parse_socket_data(self, decoded_data, pkt_dump_only):
278         def get_newline_index():
279             return decoded_data.find('\n', index)
280
281         ret_str = ''
282         index = 0
283         for newline_index in iter(get_newline_index, -1):
284             ret_str = decoded_data[index:newline_index]
285
286             try:
287                 mode, port_id, data_len = ret_str.split(',', 2)
288             except ValueError:
289                 mode, port_id, data_len = None, None, None
290
291             if mode != 'pktdump':
292                 # Regular 1-line message. Stop reading from the socket.
293                 LOG.debug("Regular response read")
294                 return ret_str
295
296             LOG.debug("Packet dump header read: [%s]", ret_str)
297
298             # The line is a packet dump header. Parse it, read the
299             # packet payload, store the dump for later retrieval.
300             # Skip over the packet dump and continue processing: a
301             # 1-line response may follow the packet dump.
302
303             data_len = int(data_len)
304             data_start = newline_index + 1  # + 1 to skip over \n
305             data_end = data_start + data_len
306             sub_data = decoded_data[data_start:data_end]
307             pkt_payload = array.array('B', (ord(v) for v in sub_data))
308             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
309             self._pkt_dumps.append(pkt_dump)
310
311             if pkt_dump_only:
312                 # Return boolean instead of string to signal
313                 # successful reception of the packet dump.
314                 LOG.debug("Packet dump stored, returning")
315                 return True
316
317             index = data_end + 1
318
319         return ret_str
320
321     def get_data(self, pkt_dump_only=False, timeout=1):
322         """ read data from the socket """
323
324         # This method behaves slightly differently depending on whether it is
325         # called to read the response to a command (pkt_dump_only = 0) or if
326         # it is called specifically to read a packet dump (pkt_dump_only = 1).
327         #
328         # Packet dumps look like:
329         #   pktdump,<port_id>,<data_len>\n
330         #   <packet contents as byte array>\n
331         # This means the total packet dump message consists of 2 lines instead
332         # of 1 line.
333         #
334         # - Response for a command (pkt_dump_only = 0):
335         #   1) Read response from the socket until \n (end of message)
336         #   2a) If the response is a packet dump header (starts with "pktdump,"):
337         #     - Read the packet payload and store the packet dump for later
338         #       retrieval.
339         #     - Reset the state and restart from 1). Eventually state 2b) will
340         #       be reached and the function will return.
341         #   2b) If the response is not a packet dump:
342         #     - Return the received message as a string
343         #
344         # - Explicit request to read a packet dump (pkt_dump_only = 1):
345         #   - Read the dump header and payload
346         #   - Store the packet dump for later retrieval
347         #   - Return True to signify a packet dump was successfully read
348
349         def is_ready():
350             # recv() is blocking, so avoid calling it when no data is waiting.
351             ready = select.select([self._sock], [], [], timeout)
352             return bool(ready[0])
353
354         status = False
355         ret_str = ""
356         for status in iter(is_ready, False):
357             decoded_data = self._sock.recv(256).decode('utf-8')
358             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
359
360         LOG.debug("Received data from socket: [%s]", ret_str)
361         return ret_str if status else ''
362
363     def put_command(self, to_send):
364         """ send data to the remote instance """
365         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
366         try:
367             self._sock.sendall(to_send.encode('utf-8'))
368         except:
369             pass
370
371     def get_packet_dump(self):
372         """ get the next packet dump """
373         if self._pkt_dumps:
374             return self._pkt_dumps.pop(0)
375         return None
376
377     def stop_all_reset(self):
378         """ stop the remote instance and reset stats """
379         LOG.debug("Stop all and reset stats")
380         self.stop_all()
381         self.reset_stats()
382
383     def stop_all(self):
384         """ stop all cores on the remote instance """
385         LOG.debug("Stop all")
386         self.put_command("stop all\n")
387         time.sleep(3)
388
389     def stop(self, cores, task=''):
390         """ stop specific cores on the remote instance """
391         LOG.debug("Stopping cores %s", cores)
392         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
393         time.sleep(3)
394
395     def start_all(self):
396         """ start all cores on the remote instance """
397         LOG.debug("Start all")
398         self.put_command("start all\n")
399
400     def start(self, cores):
401         """ start specific cores on the remote instance """
402         LOG.debug("Starting cores %s", cores)
403         self.put_command("start {}\n".format(join_non_strings(',', cores)))
404         time.sleep(3)
405
406     def reset_stats(self):
407         """ reset the statistics on the remote instance """
408         LOG.debug("Reset stats")
409         self.put_command("reset stats\n")
410         time.sleep(1)
411
412     def _run_template_over_cores(self, template, cores, *args):
413         for core in cores:
414             self.put_command(template.format(core, *args))
415
416     def set_pkt_size(self, cores, pkt_size):
417         """ set the packet size to generate on the remote instance """
418         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
419         pkt_size -= 4
420         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
421         time.sleep(1)
422
423     def set_value(self, cores, offset, value, length):
424         """ set value on the remote instance """
425         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
426         LOG.debug(msg, cores, value, length, offset)
427         template = "set value {} 0 {} {} {}\n"
428         self._run_template_over_cores(template, cores, offset, value, length)
429
430     def reset_values(self, cores):
431         """ reset values on the remote instance """
432         LOG.debug("Set value for core(s) %s", cores)
433         self._run_template_over_cores("reset values {} 0\n", cores)
434
435     def set_speed(self, cores, speed, tasks=None):
436         """ set speed on the remote instance """
437         if tasks is None:
438             tasks = [0] * len(cores)
439         elif len(tasks) != len(cores):
440             LOG.error("set_speed: cores and tasks must have the same len")
441         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
442         for (core, task) in list(zip(cores, tasks)):
443             self.put_command("speed {} {} {}\n".format(core, task, speed))
444
445     def slope_speed(self, cores_speed, duration, n_steps=0):
446         """will start to increase speed from 0 to N where N is taken from
447         a['speed'] for each a in cores_speed"""
448         # by default, each step will take 0.5 sec
449         if n_steps == 0:
450             n_steps = duration * 2
451
452         private_core_data = []
453         step_duration = float(duration) / n_steps
454         for core_data in cores_speed:
455             target = float(core_data['speed'])
456             private_core_data.append({
457                 'cores': core_data['cores'],
458                 'zero': 0,
459                 'delta': target / n_steps,
460                 'current': 0,
461                 'speed': target,
462             })
463
464         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
465         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
466             time.sleep(step_duration)
467             for core_data in private_core_data:
468                 core_data['current'] = core_data[key1] + core_data[key2]
469                 self.set_speed(core_data['cores'], core_data['current'])
470
471     def set_pps(self, cores, pps, pkt_size):
472         """ set packets per second for specific cores on the remote instance """
473         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
474         LOG.debug(msg, cores, pps, pkt_size)
475
476         # speed in percent of line-rate
477         speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
478         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
479
480     def lat_stats(self, cores, task=0):
481         """Get the latency statistics from the remote system"""
482         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
483         lat_min = {}
484         lat_max = {}
485         lat_avg = {}
486         for core in cores:
487             self.put_command("lat stats {} {} \n".format(core, task))
488             ret = self.get_data()
489
490             try:
491                 lat_min[core], lat_max[core], lat_avg[core] = \
492                     tuple(int(n) for n in ret.split(",")[:3])
493
494             except (AttributeError, ValueError, TypeError):
495                 pass
496
497         return lat_min, lat_max, lat_avg
498
499     def get_all_tot_stats(self):
500         self.put_command("tot stats\n")
501         all_stats_str = self.get_data().split(",")
502         if len(all_stats_str) != 4:
503             all_stats = [0] * 4
504             return all_stats
505         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
506         self.master_stats = all_stats
507         return all_stats
508
509     def hz(self):
510         return self.get_all_tot_stats()[3]
511
512     # Deprecated
513     # TODO: remove
514     def rx_stats(self, cores, task=0):
515         return self.core_stats(cores, task)
516
517     def core_stats(self, cores, task=0):
518         """Get the receive statistics from the remote system"""
519         rx = tx = drop = tsc = 0
520         for core in cores:
521             self.put_command("core stats {} {}\n".format(core, task))
522             ret = self.get_data().split(",")
523             rx += int(ret[0])
524             tx += int(ret[1])
525             drop += int(ret[2])
526             tsc = int(ret[3])
527         return rx, tx, drop, tsc
528
529     def port_stats(self, ports):
530         """get counter values from a specific port"""
531         tot_result = [0] * 12
532         for port in ports:
533             self.put_command("port_stats {}\n".format(port))
534             ret = [try_int(s, 0) for s in self.get_data().split(",")]
535             tot_result = [sum(x) for x in zip(tot_result, ret)]
536         return tot_result
537
538     @contextmanager
539     def measure_tot_stats(self):
540         start = self.get_all_tot_stats()
541         container = {'start_tot': start}
542         try:
543             yield container
544         finally:
545             container['end_tot'] = end = self.get_all_tot_stats()
546
547         container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
548
549     def tot_stats(self):
550         """Get the total statistics from the remote system"""
551         stats = self.get_all_tot_stats()
552         return stats[:3]
553
554     def tot_ierrors(self):
555         """Get the total ierrors from the remote system"""
556         self.put_command("tot ierrors tot\n")
557         recv = self.get_data().split(',')
558         tot_ierrors = int(recv[0])
559         tsc = int(recv[0])
560         return tot_ierrors, tsc
561
562     def set_count(self, count, cores):
563         """Set the number of packets to send on the specified core"""
564         self._run_template_over_cores("count {} 0 {}\n", cores, count)
565
566     def dump_rx(self, core_id, task_id=0, count=1):
567         """Activate dump on rx on the specified core"""
568         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
569         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
570         time.sleep(1.5)  # Give PROX time to set up packet dumping
571
572     def quit(self):
573         self.stop_all()
574         self._quit()
575         self.force_quit()
576
577     def _quit(self):
578         """ stop all cores on the remote instance """
579         LOG.debug("Quit prox")
580         self.put_command("quit\n")
581         time.sleep(3)
582
583     def force_quit(self):
584         """ stop all cores on the remote instance """
585         LOG.debug("Force Quit prox")
586         self.put_command("quit_force\n")
587         time.sleep(3)
588
589
590 _LOCAL_OBJECT = object()
591
592
593 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
594     # the actual app is lowercase
595     APP_NAME = 'prox'
596
597     LUA_PARAMETER_NAME = ""
598     LUA_PARAMETER_PEER = {
599         "gen": "sut",
600         "sut": "gen",
601     }
602
603     CONFIG_QUEUE_TIMEOUT = 120
604
605     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
606         self.remote_path = None
607         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
608         self.remote_prox_file_name = None
609         self._prox_config_data = None
610         self.additional_files = {}
611         self.config_queue = Queue()
612         self._global_section = None
613
614     @property
615     def prox_config_data(self):
616         if self._prox_config_data is None:
617             # this will block, but it needs too
618             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
619         return self._prox_config_data
620
621     @property
622     def global_section(self):
623         if self._global_section is None and self.prox_config_data:
624             self._global_section = self.find_section("global")
625         return self._global_section
626
627     def find_section(self, name, default=_LOCAL_OBJECT):
628         result = next((value for key, value in self.prox_config_data if key == name), default)
629         if result is _LOCAL_OBJECT:
630             raise KeyError('{} not found in Prox config'.format(name))
631         return result
632
633     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
634         section = self.find_section(section_name, [])
635         result = next((value for key, value in section if key == section_key), default)
636         if result is _LOCAL_OBJECT:
637             template = '{} not found in {} section of Prox config'
638             raise KeyError(template.format(section_key, section_name))
639         return result
640
641     def _build_pipeline_kwargs(self):
642         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
643         self.pipeline_kwargs = {
644             'tool_path': tool_path,
645             'tool_dir': os.path.dirname(tool_path),
646         }
647
648     def copy_to_target(self, config_file_path, prox_file):
649         remote_path = os.path.join("/tmp", prox_file)
650         self.ssh_helper.put(config_file_path, remote_path)
651         return remote_path
652
653     @staticmethod
654     def _get_tx_port(section, sections):
655         iface_port = [-1]
656         for item in sections[section]:
657             if item[0] == "tx port":
658                 iface_port = re.findall(r'\d+', item[1])
659                 # do we want the last one?
660                 #   if yes, then can we reverse?
661         return int(iface_port[0])
662
663     @staticmethod
664     def _replace_quoted_with_value(quoted, value, count=1):
665         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
666         return new_string
667
668     def _insert_additional_file(self, value):
669         file_str = value.split('"')
670         base_name = os.path.basename(file_str[1])
671         file_str[1] = self.additional_files[base_name]
672         return '"'.join(file_str)
673
674     def generate_prox_config_file(self, config_path):
675         sections = []
676         prox_config = ConfigParser(config_path, sections)
677         prox_config.parse()
678
679         # Ensure MAC is set "hardware"
680         all_ports = self.vnfd_helper.port_pairs.all_ports
681         # use dpdk port number
682         for port_name in all_ports:
683             port_num = self.vnfd_helper.port_num(port_name)
684             port_section_name = "port {}".format(port_num)
685             for section_name, section in sections:
686                 if port_section_name != section_name:
687                     continue
688
689                 for index, section_data in enumerate(section):
690                     if section_data[0] == "mac":
691                         section_data[1] = "hardware"
692
693         # search for dst mac
694         for _, section in sections:
695             # for index, (item_key, item_val) in enumerate(section):
696             for index, section_data in enumerate(section):
697                 item_key, item_val = section_data
698                 if item_val.startswith("@@dst_mac"):
699                     tx_port_iter = re.finditer(r'\d+', item_val)
700                     tx_port_no = int(next(tx_port_iter).group(0))
701                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
702                     mac = intf["virtual-interface"]["dst_mac"]
703                     section_data[1] = mac.replace(":", " ", 6)
704
705                 if item_key == "dst mac" and item_val.startswith("@@"):
706                     tx_port_iter = re.finditer(r'\d+', item_val)
707                     tx_port_no = int(next(tx_port_iter).group(0))
708                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
709                     mac = intf["virtual-interface"]["dst_mac"]
710                     section_data[1] = mac
711
712         # if addition file specified in prox config
713         if not self.additional_files:
714             return sections
715
716         for section_name, section in sections:
717             for index, section_data in enumerate(section):
718                 try:
719                     if section_data[0].startswith("dofile"):
720                         section_data[0] = self._insert_additional_file(section_data[0])
721
722                     if section_data[1].startswith("dofile"):
723                         section_data[1] = self._insert_additional_file(section_data[1])
724                 except:
725                     pass
726
727         return sections
728
729     @staticmethod
730     def write_prox_config(prox_config):
731         """
732         Write an .ini-format config file for PROX
733         PROX does not allow a space before/after the =, so we need
734         a custom method
735         """
736         out = []
737         for i, (section_name, section) in enumerate(prox_config):
738             out.append("[{}]".format(section_name))
739             for index, item in enumerate(section):
740                 key, value = item
741                 if key == "__name__":
742                     continue
743                 if value is not None and value != '@':
744                     key = "=".join((key, str(value).replace('\n', '\n\t')))
745                     out.append(key)
746                 else:
747                     key = str(key).replace('\n', '\n\t')
748                     out.append(key)
749         return os.linesep.join(out)
750
751     def put_string_to_file(self, s, remote_path):
752         file_obj = cStringIO(s)
753         self.ssh_helper.put_file_obj(file_obj, remote_path)
754         return remote_path
755
756     def generate_prox_lua_file(self):
757         p = OrderedDict()
758         all_ports = self.vnfd_helper.port_pairs.all_ports
759         lua_param = self.LUA_PARAMETER_NAME
760         for port_name in all_ports:
761             peer = self.LUA_PARAMETER_PEER[lua_param]
762             port_num = self.vnfd_helper.port_num(port_name)
763             intf = self.vnfd_helper.find_interface(name=port_name)
764             vintf = intf['virtual-interface']
765             local_ip = vintf["local_ip"]
766             dst_ip = vintf["dst_ip"]
767             local_ip_hex = ip_to_hex(local_ip, separator=' ')
768             dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
769             p.update([
770                 ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
771                 ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
772                 ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
773                 ("{}_ip_port_{}".format(peer, port_num), dst_ip),
774             ])
775         lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
776         return lua
777
778     def upload_prox_lua(self, config_dir, prox_config_data):
779         # we could have multiple lua directives
780         lau_dict = prox_config_data.get('lua', {})
781         find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
782         lua_file = next((found[0] for found in find_iter if found), None)
783         if not lua_file:
784             return ""
785
786         out = self.generate_prox_lua_file()
787         remote_path = os.path.join(config_dir, lua_file)
788         return self.put_string_to_file(out, remote_path)
789
790     def upload_prox_config(self, config_file, prox_config_data):
791         # prox can't handle spaces around ' = ' so use custom method
792         out = StringIO(self.write_prox_config(prox_config_data))
793         out.seek(0)
794         remote_path = os.path.join("/tmp", config_file)
795         self.ssh_helper.put_file_obj(out, remote_path)
796
797         return remote_path
798
799     def build_config_file(self):
800         task_path = self.scenario_helper.task_path
801         options = self.scenario_helper.options
802         config_path = options['prox_config']
803         config_file = os.path.basename(config_path)
804         config_path = find_relative_file(config_path, task_path)
805         self.additional_files = {}
806
807         prox_files = options.get('prox_files', [])
808         if isinstance(prox_files, six.string_types):
809             prox_files = [prox_files]
810         for key_prox_file in prox_files:
811             base_prox_file = os.path.basename(key_prox_file)
812             key_prox_path = find_relative_file(key_prox_file, task_path)
813             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
814             self.additional_files[base_prox_file] = remote_prox_file
815
816         self._prox_config_data = self.generate_prox_config_file(config_path)
817         # copy config to queue so we can read it from traffic_runner process
818         self.config_queue.put(self._prox_config_data)
819         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
820
821     def build_config(self):
822         self.build_config_file()
823
824         options = self.scenario_helper.options
825
826         prox_args = options['prox_args']
827         LOG.info("Provision and start the %s", self.APP_NAME)
828         self._build_pipeline_kwargs()
829         self.pipeline_kwargs["args"] = " ".join(
830             " ".join([k, v if v else ""]) for k, v in prox_args.items())
831         self.pipeline_kwargs["cfg_file"] = self.remote_path
832
833         cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
834         prox_cmd = cmd_template.format(**self.pipeline_kwargs)
835         return prox_cmd
836
837
838 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
839 class ProxResourceHelper(ClientResourceHelper):
840
841     RESOURCE_WORD = 'prox'
842
843     PROX_MODE = ""
844
845     WAIT_TIME = 3
846
847     @staticmethod
848     def find_pci(pci, bound_pci):
849         # we have to substring match PCI bus address from the end
850         return any(b.endswith(pci) for b in bound_pci)
851
852     def __init__(self, setup_helper):
853         super(ProxResourceHelper, self).__init__(setup_helper)
854         self.mgmt_interface = self.vnfd_helper.mgmt_interface
855         self._user = self.mgmt_interface["user"]
856         self._ip = self.mgmt_interface["ip"]
857
858         self.done = False
859         self._vpci_to_if_name_map = None
860         self.additional_file = {}
861         self.remote_prox_file_name = None
862         self.lower = None
863         self.upper = None
864         self.step_delta = 1
865         self.step_time = 0.5
866         self._test_type = None
867
868     @property
869     def sut(self):
870         if not self.client:
871             self.client = self._connect()
872         return self.client
873
874     @property
875     def test_type(self):
876         if self._test_type is None:
877             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
878         return self._test_type
879
880     def run_traffic(self, traffic_profile):
881         self.lower = 0.0
882         self.upper = 100.0
883
884         traffic_profile.init(self._queue)
885         # this frees up the run_traffic loop
886         self.client_started.value = 1
887
888         while not self._terminated.value:
889             # move it all to traffic_profile
890             self._run_traffic_once(traffic_profile)
891
892     def _run_traffic_once(self, traffic_profile):
893         traffic_profile.execute_traffic(self)
894         if traffic_profile.done:
895             self._queue.put({'done': True})
896             LOG.debug("tg_prox done")
897             self._terminated.value = 1
898
899     # For VNF use ResourceHelper method to collect KPIs directly.
900     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
901     def collect_collectd_kpi(self):
902         return self._collect_resource_kpi()
903
904     def collect_kpi(self):
905         result = super(ProxResourceHelper, self).collect_kpi()
906         # add in collectd kpis manually
907         if result:
908             result['collect_stats'] = self._collect_resource_kpi()
909         return result
910
911     def terminate(self):
912         # should not be called, use VNF terminate
913         raise NotImplementedError()
914
915     def up_post(self):
916         return self.sut  # force connection
917
918     def execute(self, cmd, *args, **kwargs):
919         func = getattr(self.sut, cmd, None)
920         if func:
921             return func(*args, **kwargs)
922
923     def _connect(self, client=None):
924         """Run and connect to prox on the remote system """
925         # De-allocating a large amount of hugepages takes some time. If a new
926         # PROX instance is started immediately after killing the previous one,
927         # it might not be able to allocate hugepages, because they are still
928         # being freed. Hence the -w switch.
929         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
930         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
931         # -f ./handle_none-4.cfg"
932         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
933         #  "; " \
934         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
935         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
936         # sudo " \
937         #    + "./build/Prox " + prox_args
938         # log.debug("Starting PROX with command [%s]", prox_cmd)
939         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
940         # self._ip, prox_cmd))
941         if client is None:
942             client = ProxSocketHelper()
943
944         # try connecting to Prox for 60s
945         for _ in range(RETRY_SECONDS):
946             time.sleep(RETRY_INTERVAL)
947             try:
948                 client.connect(self._ip, PROX_PORT)
949             except (socket.gaierror, socket.error):
950                 continue
951             else:
952                 return client
953
954         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
955         raise Exception(msg.format(self._ip, PROX_PORT))
956
957
958 class ProxDataHelper(object):
959
960     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
961         super(ProxDataHelper, self).__init__()
962         self.vnfd_helper = vnfd_helper
963         self.sut = sut
964         self.pkt_size = pkt_size
965         self.value = value
966         self.tolerated_loss = tolerated_loss
967         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
968         self.tsc_hz = None
969         self.measured_stats = None
970         self.latency = None
971         self._totals_and_pps = None
972         self.result_tuple = None
973
974     @property
975     def totals_and_pps(self):
976         if self._totals_and_pps is None:
977             rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
978             pps = self.value / 100.0 * self.line_rate_to_pps()
979             self._totals_and_pps = rx_total, tx_total, pps
980         return self._totals_and_pps
981
982     @property
983     def rx_total(self):
984         return self.totals_and_pps[0]
985
986     @property
987     def tx_total(self):
988         return self.totals_and_pps[1]
989
990     @property
991     def pps(self):
992         return self.totals_and_pps[2]
993
994     @property
995     def samples(self):
996         samples = {}
997         for port_name, port_num in self.vnfd_helper.ports_iter():
998             port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
999             samples[port_name] = {
1000                 "in_packets": port_rx_total,
1001                 "out_packets": port_tx_total,
1002             }
1003         return samples
1004
1005     def __enter__(self):
1006         self.check_interface_count()
1007         return self
1008
1009     def __exit__(self, exc_type, exc_val, exc_tb):
1010         self.make_tuple()
1011
1012     def make_tuple(self):
1013         if self.result_tuple:
1014             return
1015
1016         self.result_tuple = ProxTestDataTuple(
1017             self.tolerated_loss,
1018             self.tsc_hz,
1019             self.measured_stats['delta'].rx,
1020             self.measured_stats['delta'].tx,
1021             self.measured_stats['delta'].tsc,
1022             self.latency,
1023             self.rx_total,
1024             self.tx_total,
1025             self.pps,
1026         )
1027         self.result_tuple.log_data()
1028
1029     @contextmanager
1030     def measure_tot_stats(self):
1031         with self.sut.measure_tot_stats() as self.measured_stats:
1032             yield
1033
1034     def check_interface_count(self):
1035         # do this assert in init?  unless we expect interface count to
1036         # change from one run to another run...
1037         assert self.port_count in {1, 2, 4}, \
1038             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1039
1040     def capture_tsc_hz(self):
1041         self.tsc_hz = float(self.sut.hz())
1042
1043     def line_rate_to_pps(self):
1044         # FIXME Don't hardcode 10Gb/s
1045         return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1046
1047
1048 class ProxProfileHelper(object):
1049
1050     __prox_profile_type__ = "Generic"
1051
1052     PROX_CORE_GEN_MODE = "gen"
1053     PROX_CORE_LAT_MODE = "lat"
1054
1055     @classmethod
1056     def get_cls(cls, helper_type):
1057         """Return class of specified type."""
1058         if not helper_type:
1059             return ProxProfileHelper
1060
1061         for profile_helper_class in utils.itersubclasses(cls):
1062             if helper_type == profile_helper_class.__prox_profile_type__:
1063                 return profile_helper_class
1064
1065         return ProxProfileHelper
1066
1067     @classmethod
1068     def make_profile_helper(cls, resource_helper):
1069         return cls.get_cls(resource_helper.test_type)(resource_helper)
1070
1071     def __init__(self, resource_helper):
1072         super(ProxProfileHelper, self).__init__()
1073         self.resource_helper = resource_helper
1074         self._cpu_topology = None
1075         self._test_cores = None
1076         self._latency_cores = None
1077
1078     @property
1079     def cpu_topology(self):
1080         if not self._cpu_topology:
1081             stdout = io.BytesIO()
1082             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1083             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1084         return self._cpu_topology
1085
1086     @property
1087     def test_cores(self):
1088         if not self._test_cores:
1089             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1090         return self._test_cores
1091
1092     @property
1093     def latency_cores(self):
1094         if not self._latency_cores:
1095             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1096         return self._latency_cores
1097
1098     @contextmanager
1099     def traffic_context(self, pkt_size, value):
1100         self.sut.stop_all()
1101         self.sut.reset_stats()
1102         try:
1103             self.sut.set_pkt_size(self.test_cores, pkt_size)
1104             self.sut.set_speed(self.test_cores, value)
1105             self.sut.start_all()
1106             yield
1107         finally:
1108             self.sut.stop_all()
1109
1110     def get_cores(self, mode):
1111         cores = []
1112
1113         for section_name, section in self.setup_helper.prox_config_data:
1114             if not section_name.startswith("core"):
1115                 continue
1116
1117             for key, value in section:
1118                 if key == "mode" and value == mode:
1119                     core_tuple = CoreSocketTuple(section_name)
1120                     core = core_tuple.find_in_topology(self.cpu_topology)
1121                     cores.append(core)
1122
1123         return cores
1124
1125     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1126         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1127
1128         with data_helper, self.traffic_context(pkt_size, value):
1129             with data_helper.measure_tot_stats():
1130                 time.sleep(duration)
1131                 # Getting statistics to calculate PPS at right speed....
1132                 data_helper.capture_tsc_hz()
1133                 data_helper.latency = self.get_latency()
1134
1135         return data_helper.result_tuple, data_helper.samples
1136
1137     def get_latency(self):
1138         """
1139         :return: return lat_min, lat_max, lat_avg
1140         :rtype: list
1141         """
1142         if self._latency_cores:
1143             return self.sut.lat_stats(self._latency_cores)
1144         return []
1145
1146     def terminate(self):
1147         pass
1148
1149     def __getattr__(self, item):
1150         return getattr(self.resource_helper, item)
1151
1152
1153 class ProxMplsProfileHelper(ProxProfileHelper):
1154
1155     __prox_profile_type__ = "MPLS tag/untag"
1156
1157     def __init__(self, resource_helper):
1158         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1159         self._cores_tuple = None
1160
1161     @property
1162     def mpls_cores(self):
1163         if not self._cores_tuple:
1164             self._cores_tuple = self.get_cores_mpls()
1165         return self._cores_tuple
1166
1167     @property
1168     def tagged_cores(self):
1169         return self.mpls_cores[0]
1170
1171     @property
1172     def plain_cores(self):
1173         return self.mpls_cores[1]
1174
1175     def get_cores_mpls(self):
1176         cores_tagged = []
1177         cores_plain = []
1178         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1179             if not section_name.startswith("core"):
1180                 continue
1181
1182             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1183                 continue
1184
1185             for item_key, item_value in section:
1186                 if item_key != 'name':
1187                     continue
1188
1189                 if item_value.startswith("tag"):
1190                     core_tuple = CoreSocketTuple(section_name)
1191                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1192                     cores_tagged.append(core_tag)
1193
1194                 elif item_value.startswith("udp"):
1195                     core_tuple = CoreSocketTuple(section_name)
1196                     core_udp = core_tuple.find_in_topology(self.cpu_topology)
1197                     cores_plain.append(core_udp)
1198
1199         return cores_tagged, cores_plain
1200
1201     @contextmanager
1202     def traffic_context(self, pkt_size, value):
1203         self.sut.stop_all()
1204         self.sut.reset_stats()
1205         try:
1206             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1207             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1208             self.sut.set_speed(self.tagged_cores, value)
1209             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1210             self.sut.set_speed(self.plain_cores, value * ratio)
1211             self.sut.start_all()
1212             yield
1213         finally:
1214             self.sut.stop_all()
1215
1216
1217 class ProxBngProfileHelper(ProxProfileHelper):
1218
1219     __prox_profile_type__ = "BNG gen"
1220
1221     def __init__(self, resource_helper):
1222         super(ProxBngProfileHelper, self).__init__(resource_helper)
1223         self._cores_tuple = None
1224
1225     @property
1226     def bng_cores(self):
1227         if not self._cores_tuple:
1228             self._cores_tuple = self.get_cores_gen_bng_qos()
1229         return self._cores_tuple
1230
1231     @property
1232     def cpe_cores(self):
1233         return self.bng_cores[0]
1234
1235     @property
1236     def inet_cores(self):
1237         return self.bng_cores[1]
1238
1239     @property
1240     def arp_cores(self):
1241         return self.bng_cores[2]
1242
1243     @property
1244     def arp_task_cores(self):
1245         return self.bng_cores[3]
1246
1247     @property
1248     def all_rx_cores(self):
1249         return self.latency_cores
1250
1251     def get_cores_gen_bng_qos(self):
1252         cpe_cores = []
1253         inet_cores = []
1254         arp_cores = []
1255         arp_tasks_core = [0]
1256         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1257             if not section_name.startswith("core"):
1258                 continue
1259
1260             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1261                 continue
1262
1263             for item_key, item_value in section:
1264                 if item_key == "name" and item_value.startswith("cpe"):
1265                     core_tuple = CoreSocketTuple(section_name)
1266                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1267                     cpe_cores.append(core_tag)
1268
1269                 elif item_key == "name" and item_value.startswith("inet"):
1270                     core_tuple = CoreSocketTuple(section_name)
1271                     inet_core = core_tuple.find_in_topology(self.cpu_topology)
1272                     inet_cores.append(inet_core)
1273
1274                 elif item_key == "name" and item_value.startswith("arp"):
1275                     core_tuple = CoreSocketTuple(section_name)
1276                     arp_core = core_tuple.find_in_topology(self.cpu_topology)
1277                     arp_cores.append(arp_core)
1278
1279                 # We check the tasks/core separately
1280                 if item_key == "name" and item_value.startswith("arp_task"):
1281                     core_tuple = CoreSocketTuple(section_name)
1282                     arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
1283                     arp_tasks_core.append(arp_task_core)
1284
1285         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1286
1287     @contextmanager
1288     def traffic_context(self, pkt_size, value):
1289         # Tester is sending packets at the required speed already after
1290         # setup_test(). Just get the current statistics, sleep the required
1291         # amount of time and calculate packet loss.
1292         inet_pkt_size = pkt_size
1293         cpe_pkt_size = pkt_size - 24
1294         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1295
1296         curr_up_speed = curr_down_speed = 0
1297         max_up_speed = max_down_speed = value
1298         if ratio < 1:
1299             max_down_speed = value * ratio
1300         else:
1301             max_up_speed = value / ratio
1302
1303         # Initialize cores
1304         self.sut.stop_all()
1305         time.sleep(0.5)
1306
1307         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1308         # wrong.
1309         self.sut.start(self.all_rx_cores)
1310         time.sleep(0.5)
1311         self.sut.stop(self.all_rx_cores)
1312         time.sleep(0.5)
1313         self.sut.reset_stats()
1314
1315         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1316         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1317
1318         self.sut.reset_values(self.cpe_cores)
1319         self.sut.reset_values(self.inet_cores)
1320
1321         # Set correct IP and UDP lengths in packet headers
1322         # CPE
1323         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1324         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1325         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1326         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1327
1328         # INET
1329         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1330         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1331         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1332         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1333         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1334         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1335
1336         # Sending ARP to initialize tables - need a few seconds of generation
1337         # to make sure all CPEs are initialized
1338         LOG.info("Initializing SUT: sending ARP packets")
1339         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1340         self.sut.set_speed(self.inet_cores, curr_up_speed)
1341         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1342         self.sut.start(self.arp_cores)
1343         time.sleep(4)
1344
1345         # Ramp up the transmission speed. First go to the common speed, then
1346         # increase steps for the faster one.
1347         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1348
1349         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1350
1351         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1352             # The min(..., ...) takes care of 1) floating point rounding errors
1353             # that could make curr_*_speed to be slightly greater than
1354             # max_*_speed and 2) max_*_speed not being an exact multiple of
1355             # self._step_delta.
1356             if curr_up_speed < max_up_speed:
1357                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1358             if curr_down_speed < max_down_speed:
1359                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1360
1361             self.sut.set_speed(self.inet_cores, curr_up_speed)
1362             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1363             time.sleep(self.step_time)
1364
1365         LOG.info("Target speeds reached. Starting real test.")
1366
1367         yield
1368
1369         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1370         LOG.info("Test ended. Flushing NIC buffers")
1371         self.sut.start(self.all_rx_cores)
1372         time.sleep(3)
1373         self.sut.stop(self.all_rx_cores)
1374
1375     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1376         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1377
1378         with data_helper, self.traffic_context(pkt_size, value):
1379             with data_helper.measure_tot_stats():
1380                 time.sleep(duration)
1381                 # Getting statistics to calculate PPS at right speed....
1382                 data_helper.capture_tsc_hz()
1383                 data_helper.latency = self.get_latency()
1384
1385         return data_helper.result_tuple, data_helper.samples