Merge "NSB Prox minor refactor of BNG"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
15
16 import array
17 import operator
18 import logging
19 import io
20 import os
21 import re
22 import select
23 import socket
24
25 from collections import OrderedDict, namedtuple
26 import time
27 from contextlib import contextmanager
28 from itertools import repeat, chain
29 from multiprocessing import Queue
30
31 import six
32 from six.moves import zip, StringIO
33 from six.moves import cStringIO
34
35 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
36 from yardstick.common import utils
37 from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
38 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
39 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
40 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
41
42 PROX_PORT = 8474
43
44 SECTION_NAME = 0
45 SECTION_CONTENTS = 1
46
47 LOG = logging.getLogger(__name__)
48 LOG.setLevel(logging.DEBUG)
49
50 TEN_GIGABIT = 1e10
51 BITS_PER_BYTE = 8
52 RETRY_SECONDS = 60
53 RETRY_INTERVAL = 1
54
55 CONFIGURATION_OPTIONS = (
56     # dict key           section     key               default value
57     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
58     ('testDuration', 'general', 'test_duration', 5.0),
59     ('testPrecision', 'general', 'test_precision', 1.0),
60     ('tests', 'general', 'tests', None),
61     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
62
63     ('logFile', 'logging', 'file', 'dats.log'),
64     ('logDateFormat', 'logging', 'datefmt', None),
65     ('logLevel', 'logging', 'level', 'INFO'),
66     ('logOverwrite', 'logging', 'overwrite', 1),
67
68     ('testerIp', 'tester', 'ip', None),
69     ('testerUser', 'tester', 'user', 'root'),
70     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
71     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
72     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
73     ('testerSocketId', 'tester', 'socket_id', 0),
74
75     ('sutIp', 'sut', 'ip', None),
76     ('sutUser', 'sut', 'user', 'root'),
77     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
78     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
79     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
80     ('sutSocketId', 'sut', 'socket_id', 0),
81 )
82
83
84 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
85     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
86
87     def __new__(cls, *args):
88         try:
89             matches = cls.CORE_RE.search(str(args[0]))
90             if matches:
91                 args = matches.groups()
92
93             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94                                                        'h' if args[2] else '')
95
96         except (AttributeError, TypeError, IndexError, ValueError):
97             raise ValueError('Invalid core spec {}'.format(args))
98
99     def is_hyperthread(self):
100         return self.hyperthread == 'h'
101
102     @property
103     def index(self):
104         return int(self.is_hyperthread())
105
106     def find_in_topology(self, cpu_topology):
107         try:
108             socket_core_match = cpu_topology[self.socket_id][self.core_id]
109             sorted_match = sorted(socket_core_match.values())
110             return sorted_match[self.index][0]
111         except (KeyError, IndexError):
112             template = "Core {}{} on socket {} does not exist"
113             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
114
115
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
117     def __new__(cls, *args):
118         try:
119             assert args[0] is not str(args[0])
120             args = tuple(args[0])
121         except (AssertionError, IndexError, TypeError):
122             pass
123
124         return super(TotStatsTuple, cls).__new__(cls, *args)
125
126
127 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
128                                                         'delta_tx,delta_tsc,'
129                                                         'latency,rx_total,tx_total,pps')):
130     @property
131     def pkt_loss(self):
132         try:
133             return 1e2 * self.drop_total / float(self.tx_total)
134         except ZeroDivisionError:
135             return 100.0
136
137     @property
138     def mpps(self):
139         # calculate the effective throughput in Mpps
140         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
141
142     @property
143     def can_be_lost(self):
144         return int(self.tx_total * self.tolerated / 1e2)
145
146     @property
147     def drop_total(self):
148         return self.tx_total - self.rx_total
149
150     @property
151     def success(self):
152         return self.drop_total <= self.can_be_lost
153
154     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
155         if pkt_loss is None:
156             pkt_loss = self.pkt_loss
157
158         if port_samples is None:
159             port_samples = {}
160
161         latency_keys = [
162             "LatencyMin",
163             "LatencyMax",
164             "LatencyAvg",
165         ]
166
167         samples = {
168             "Throughput": self.mpps,
169             "DropPackets": pkt_loss,
170             "CurrentDropPackets": pkt_loss,
171             "TxThroughput": self.pps / 1e6,
172             "RxThroughput": self.mpps,
173             "PktSize": pkt_size,
174         }
175         if port_samples:
176             samples.update(port_samples)
177
178         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
179         return samples
180
181     def log_data(self, logger=None):
182         if logger is None:
183             logger = LOG
184
185         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
186         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
187         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
188
189
190 class PacketDump(object):
191     @staticmethod
192     def assert_func(func, value1, value2, template=None):
193         assert func(value1, value2), template.format(value1, value2)
194
195     def __init__(self, port_id, data_len, payload):
196         template = "Packet dump has specified length {}, but payload is {} bytes long"
197         self.assert_func(operator.eq, data_len, len(payload), template)
198         self._port_id = port_id
199         self._data_len = data_len
200         self._payload = payload
201
202     @property
203     def port_id(self):
204         """Get the port id of the packet dump"""
205         return self._port_id
206
207     @property
208     def data_len(self):
209         """Get the length of the data received"""
210         return self._data_len
211
212     def __str__(self):
213         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
214
215     def payload(self, start=None, end=None):
216         """Get part of the payload as a list of ordinals.
217
218         Returns a list of byte values, matching the contents of the packet dump.
219         Optional start and end parameters can be specified to retrieve only a
220         part of the packet contents.
221
222         The number of elements in the list is equal to end - start + 1, so end
223         is the offset of the last character.
224
225         Args:
226             start (pos. int): the starting offset in the payload. If it is not
227                 specified or None, offset 0 is assumed.
228             end (pos. int): the ending offset of the payload. If it is not
229                 specified or None, the contents until the end of the packet are
230                 returned.
231
232         Returns:
233             [int, int, ...]. Each int represents the ordinal value of a byte in
234             the packet payload.
235         """
236         if start is None:
237             start = 0
238
239         if end is None:
240             end = self.data_len - 1
241
242         # Bounds checking on offsets
243         template = "Start offset must be non-negative"
244         self.assert_func(operator.ge, start, 0, template)
245
246         template = "End offset must be less than {1}"
247         self.assert_func(operator.lt, end, self.data_len, template)
248
249         # Adjust for splice operation: end offset must be 1 more than the offset
250         # of the last desired character.
251         end += 1
252
253         return self._payload[start:end]
254
255
256 class ProxSocketHelper(object):
257
258     def __init__(self, sock=None):
259         """ creates new prox instance """
260         super(ProxSocketHelper, self).__init__()
261
262         if sock is None:
263             sock = socket.socket()
264
265         self._sock = sock
266         self._pkt_dumps = []
267         self.master_stats = None
268
269     def connect(self, ip, port):
270         """Connect to the prox instance on the remote system"""
271         self._sock.connect((ip, port))
272
273     def get_socket(self):
274         """ get the socket connected to the remote instance """
275         return self._sock
276
277     def _parse_socket_data(self, decoded_data, pkt_dump_only):
278         def get_newline_index():
279             return decoded_data.find('\n', index)
280
281         ret_str = ''
282         index = 0
283         for newline_index in iter(get_newline_index, -1):
284             ret_str = decoded_data[index:newline_index]
285
286             try:
287                 mode, port_id, data_len = ret_str.split(',', 2)
288             except ValueError:
289                 mode, port_id, data_len = None, None, None
290
291             if mode != 'pktdump':
292                 # Regular 1-line message. Stop reading from the socket.
293                 LOG.debug("Regular response read")
294                 return ret_str
295
296             LOG.debug("Packet dump header read: [%s]", ret_str)
297
298             # The line is a packet dump header. Parse it, read the
299             # packet payload, store the dump for later retrieval.
300             # Skip over the packet dump and continue processing: a
301             # 1-line response may follow the packet dump.
302
303             data_len = int(data_len)
304             data_start = newline_index + 1  # + 1 to skip over \n
305             data_end = data_start + data_len
306             sub_data = decoded_data[data_start:data_end]
307             pkt_payload = array.array('B', (ord(v) for v in sub_data))
308             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
309             self._pkt_dumps.append(pkt_dump)
310
311             if pkt_dump_only:
312                 # Return boolean instead of string to signal
313                 # successful reception of the packet dump.
314                 LOG.debug("Packet dump stored, returning")
315                 return True
316
317             index = data_end + 1
318
319         return ret_str
320
321     def get_data(self, pkt_dump_only=False, timeout=1):
322         """ read data from the socket """
323
324         # This method behaves slightly differently depending on whether it is
325         # called to read the response to a command (pkt_dump_only = 0) or if
326         # it is called specifically to read a packet dump (pkt_dump_only = 1).
327         #
328         # Packet dumps look like:
329         #   pktdump,<port_id>,<data_len>\n
330         #   <packet contents as byte array>\n
331         # This means the total packet dump message consists of 2 lines instead
332         # of 1 line.
333         #
334         # - Response for a command (pkt_dump_only = 0):
335         #   1) Read response from the socket until \n (end of message)
336         #   2a) If the response is a packet dump header (starts with "pktdump,"):
337         #     - Read the packet payload and store the packet dump for later
338         #       retrieval.
339         #     - Reset the state and restart from 1). Eventually state 2b) will
340         #       be reached and the function will return.
341         #   2b) If the response is not a packet dump:
342         #     - Return the received message as a string
343         #
344         # - Explicit request to read a packet dump (pkt_dump_only = 1):
345         #   - Read the dump header and payload
346         #   - Store the packet dump for later retrieval
347         #   - Return True to signify a packet dump was successfully read
348
349         def is_ready():
350             # recv() is blocking, so avoid calling it when no data is waiting.
351             ready = select.select([self._sock], [], [], timeout)
352             return bool(ready[0])
353
354         status = False
355         ret_str = ""
356         for status in iter(is_ready, False):
357             decoded_data = self._sock.recv(256).decode('utf-8')
358             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
359
360         LOG.debug("Received data from socket: [%s]", ret_str)
361         return ret_str if status else ''
362
363     def put_command(self, to_send):
364         """ send data to the remote instance """
365         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
366         try:
367             # TODO: sendall will block, we need a timeout
368             self._sock.sendall(to_send.encode('utf-8'))
369         except:
370             pass
371
372     def get_packet_dump(self):
373         """ get the next packet dump """
374         if self._pkt_dumps:
375             return self._pkt_dumps.pop(0)
376         return None
377
378     def stop_all_reset(self):
379         """ stop the remote instance and reset stats """
380         LOG.debug("Stop all and reset stats")
381         self.stop_all()
382         self.reset_stats()
383
384     def stop_all(self):
385         """ stop all cores on the remote instance """
386         LOG.debug("Stop all")
387         self.put_command("stop all\n")
388         time.sleep(3)
389
390     def stop(self, cores, task=''):
391         """ stop specific cores on the remote instance """
392         LOG.debug("Stopping cores %s", cores)
393         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
394         time.sleep(3)
395
396     def start_all(self):
397         """ start all cores on the remote instance """
398         LOG.debug("Start all")
399         self.put_command("start all\n")
400
401     def start(self, cores):
402         """ start specific cores on the remote instance """
403         LOG.debug("Starting cores %s", cores)
404         self.put_command("start {}\n".format(join_non_strings(',', cores)))
405         time.sleep(3)
406
407     def reset_stats(self):
408         """ reset the statistics on the remote instance """
409         LOG.debug("Reset stats")
410         self.put_command("reset stats\n")
411         time.sleep(1)
412
413     def _run_template_over_cores(self, template, cores, *args):
414         for core in cores:
415             self.put_command(template.format(core, *args))
416
417     def set_pkt_size(self, cores, pkt_size):
418         """ set the packet size to generate on the remote instance """
419         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
420         pkt_size -= 4
421         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
422         time.sleep(1)
423
424     def set_value(self, cores, offset, value, length):
425         """ set value on the remote instance """
426         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
427         LOG.debug(msg, cores, value, length, offset)
428         template = "set value {} 0 {} {} {}\n"
429         self._run_template_over_cores(template, cores, offset, value, length)
430
431     def reset_values(self, cores):
432         """ reset values on the remote instance """
433         LOG.debug("Set value for core(s) %s", cores)
434         self._run_template_over_cores("reset values {} 0\n", cores)
435
436     def set_speed(self, cores, speed, tasks=None):
437         """ set speed on the remote instance """
438         if tasks is None:
439             tasks = [0] * len(cores)
440         elif len(tasks) != len(cores):
441             LOG.error("set_speed: cores and tasks must have the same len")
442         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
443         for (core, task) in list(zip(cores, tasks)):
444             self.put_command("speed {} {} {}\n".format(core, task, speed))
445
446     def slope_speed(self, cores_speed, duration, n_steps=0):
447         """will start to increase speed from 0 to N where N is taken from
448         a['speed'] for each a in cores_speed"""
449         # by default, each step will take 0.5 sec
450         if n_steps == 0:
451             n_steps = duration * 2
452
453         private_core_data = []
454         step_duration = float(duration) / n_steps
455         for core_data in cores_speed:
456             target = float(core_data['speed'])
457             private_core_data.append({
458                 'cores': core_data['cores'],
459                 'zero': 0,
460                 'delta': target / n_steps,
461                 'current': 0,
462                 'speed': target,
463             })
464
465         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
466         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
467             time.sleep(step_duration)
468             for core_data in private_core_data:
469                 core_data['current'] = core_data[key1] + core_data[key2]
470                 self.set_speed(core_data['cores'], core_data['current'])
471
472     def set_pps(self, cores, pps, pkt_size):
473         """ set packets per second for specific cores on the remote instance """
474         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
475         LOG.debug(msg, cores, pps, pkt_size)
476
477         # speed in percent of line-rate
478         speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
479         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
480
481     def lat_stats(self, cores, task=0):
482         """Get the latency statistics from the remote system"""
483         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
484         lat_min = {}
485         lat_max = {}
486         lat_avg = {}
487         for core in cores:
488             self.put_command("lat stats {} {} \n".format(core, task))
489             ret = self.get_data()
490
491             try:
492                 lat_min[core], lat_max[core], lat_avg[core] = \
493                     tuple(int(n) for n in ret.split(",")[:3])
494
495             except (AttributeError, ValueError, TypeError):
496                 pass
497
498         return lat_min, lat_max, lat_avg
499
500     def get_all_tot_stats(self):
501         self.put_command("tot stats\n")
502         all_stats_str = self.get_data().split(",")
503         if len(all_stats_str) != 4:
504             all_stats = [0] * 4
505             return all_stats
506         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
507         self.master_stats = all_stats
508         return all_stats
509
510     def hz(self):
511         return self.get_all_tot_stats()[3]
512
513     # Deprecated
514     # TODO: remove
515     def rx_stats(self, cores, task=0):
516         return self.core_stats(cores, task)
517
518     def core_stats(self, cores, task=0):
519         """Get the receive statistics from the remote system"""
520         rx = tx = drop = tsc = 0
521         for core in cores:
522             self.put_command("core stats {} {}\n".format(core, task))
523             ret = self.get_data().split(",")
524             rx += int(ret[0])
525             tx += int(ret[1])
526             drop += int(ret[2])
527             tsc = int(ret[3])
528         return rx, tx, drop, tsc
529
530     def port_stats(self, ports):
531         """get counter values from a specific port"""
532         tot_result = [0] * 12
533         for port in ports:
534             self.put_command("port_stats {}\n".format(port))
535             ret = [try_int(s, 0) for s in self.get_data().split(",")]
536             tot_result = [sum(x) for x in zip(tot_result, ret)]
537         return tot_result
538
539     @contextmanager
540     def measure_tot_stats(self):
541         start = self.get_all_tot_stats()
542         container = {'start_tot': start}
543         try:
544             yield container
545         finally:
546             container['end_tot'] = end = self.get_all_tot_stats()
547
548         container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
549
550     def tot_stats(self):
551         """Get the total statistics from the remote system"""
552         stats = self.get_all_tot_stats()
553         return stats[:3]
554
555     def tot_ierrors(self):
556         """Get the total ierrors from the remote system"""
557         self.put_command("tot ierrors tot\n")
558         recv = self.get_data().split(',')
559         tot_ierrors = int(recv[0])
560         tsc = int(recv[0])
561         return tot_ierrors, tsc
562
563     def set_count(self, count, cores):
564         """Set the number of packets to send on the specified core"""
565         self._run_template_over_cores("count {} 0 {}\n", cores, count)
566
567     def dump_rx(self, core_id, task_id=0, count=1):
568         """Activate dump on rx on the specified core"""
569         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
570         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
571         time.sleep(1.5)  # Give PROX time to set up packet dumping
572
573     def quit(self):
574         self.stop_all()
575         self._quit()
576         self.force_quit()
577
578     def _quit(self):
579         """ stop all cores on the remote instance """
580         LOG.debug("Quit prox")
581         self.put_command("quit\n")
582         time.sleep(3)
583
584     def force_quit(self):
585         """ stop all cores on the remote instance """
586         LOG.debug("Force Quit prox")
587         self.put_command("quit_force\n")
588         time.sleep(3)
589
590
591 _LOCAL_OBJECT = object()
592
593
594 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
595     # the actual app is lowercase
596     APP_NAME = 'prox'
597     # not used for Prox but added for consistency
598     VNF_TYPE = "PROX"
599
600     LUA_PARAMETER_NAME = ""
601     LUA_PARAMETER_PEER = {
602         "gen": "sut",
603         "sut": "gen",
604     }
605
606     CONFIG_QUEUE_TIMEOUT = 120
607
608     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
609         self.remote_path = None
610         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
611         self.remote_prox_file_name = None
612         self._prox_config_data = None
613         self.additional_files = {}
614         self.config_queue = Queue()
615         # allow_exit_without_flush
616         self.config_queue.cancel_join_thread()
617         self._global_section = None
618
619     @property
620     def prox_config_data(self):
621         if self._prox_config_data is None:
622             # this will block, but it needs too
623             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
624         return self._prox_config_data
625
626     @property
627     def global_section(self):
628         if self._global_section is None and self.prox_config_data:
629             self._global_section = self.find_section("global")
630         return self._global_section
631
632     def find_section(self, name, default=_LOCAL_OBJECT):
633         result = next((value for key, value in self.prox_config_data if key == name), default)
634         if result is _LOCAL_OBJECT:
635             raise KeyError('{} not found in Prox config'.format(name))
636         return result
637
638     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
639         section = self.find_section(section_name, [])
640         result = next((value for key, value in section if key == section_key), default)
641         if result is _LOCAL_OBJECT:
642             template = '{} not found in {} section of Prox config'
643             raise KeyError(template.format(section_key, section_name))
644         return result
645
646     def _build_pipeline_kwargs(self):
647         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
648         self.pipeline_kwargs = {
649             'tool_path': tool_path,
650             'tool_dir': os.path.dirname(tool_path),
651         }
652
653     def copy_to_target(self, config_file_path, prox_file):
654         remote_path = os.path.join("/tmp", prox_file)
655         self.ssh_helper.put(config_file_path, remote_path)
656         return remote_path
657
658     @staticmethod
659     def _get_tx_port(section, sections):
660         iface_port = [-1]
661         for item in sections[section]:
662             if item[0] == "tx port":
663                 iface_port = re.findall(r'\d+', item[1])
664                 # do we want the last one?
665                 #   if yes, then can we reverse?
666         return int(iface_port[0])
667
668     @staticmethod
669     def _replace_quoted_with_value(quoted, value, count=1):
670         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
671         return new_string
672
673     def _insert_additional_file(self, value):
674         file_str = value.split('"')
675         base_name = os.path.basename(file_str[1])
676         file_str[1] = self.additional_files[base_name]
677         return '"'.join(file_str)
678
679     def generate_prox_config_file(self, config_path):
680         sections = []
681         prox_config = ConfigParser(config_path, sections)
682         prox_config.parse()
683
684         # Ensure MAC is set "hardware"
685         all_ports = self.vnfd_helper.port_pairs.all_ports
686         # use dpdk port number
687         for port_name in all_ports:
688             port_num = self.vnfd_helper.port_num(port_name)
689             port_section_name = "port {}".format(port_num)
690             for section_name, section in sections:
691                 if port_section_name != section_name:
692                     continue
693
694                 for index, section_data in enumerate(section):
695                     if section_data[0] == "mac":
696                         section_data[1] = "hardware"
697
698         # search for dst mac
699         for _, section in sections:
700             # for index, (item_key, item_val) in enumerate(section):
701             for index, section_data in enumerate(section):
702                 item_key, item_val = section_data
703                 if item_val.startswith("@@dst_mac"):
704                     tx_port_iter = re.finditer(r'\d+', item_val)
705                     tx_port_no = int(next(tx_port_iter).group(0))
706                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
707                     mac = intf["virtual-interface"]["dst_mac"]
708                     section_data[1] = mac.replace(":", " ", 6)
709
710                 if item_key == "dst mac" and item_val.startswith("@@"):
711                     tx_port_iter = re.finditer(r'\d+', item_val)
712                     tx_port_no = int(next(tx_port_iter).group(0))
713                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
714                     mac = intf["virtual-interface"]["dst_mac"]
715                     section_data[1] = mac
716
717         # if addition file specified in prox config
718         if not self.additional_files:
719             return sections
720
721         for section_name, section in sections:
722             for index, section_data in enumerate(section):
723                 try:
724                     if section_data[0].startswith("dofile"):
725                         section_data[0] = self._insert_additional_file(section_data[0])
726
727                     if section_data[1].startswith("dofile"):
728                         section_data[1] = self._insert_additional_file(section_data[1])
729                 except:
730                     pass
731
732         return sections
733
734     @staticmethod
735     def write_prox_config(prox_config):
736         """
737         Write an .ini-format config file for PROX
738         PROX does not allow a space before/after the =, so we need
739         a custom method
740         """
741         out = []
742         for i, (section_name, section) in enumerate(prox_config):
743             out.append("[{}]".format(section_name))
744             for index, item in enumerate(section):
745                 key, value = item
746                 if key == "__name__":
747                     continue
748                 if value is not None and value != '@':
749                     key = "=".join((key, str(value).replace('\n', '\n\t')))
750                     out.append(key)
751                 else:
752                     key = str(key).replace('\n', '\n\t')
753                     out.append(key)
754         return os.linesep.join(out)
755
756     def put_string_to_file(self, s, remote_path):
757         file_obj = cStringIO(s)
758         self.ssh_helper.put_file_obj(file_obj, remote_path)
759         return remote_path
760
761     def generate_prox_lua_file(self):
762         p = OrderedDict()
763         all_ports = self.vnfd_helper.port_pairs.all_ports
764         lua_param = self.LUA_PARAMETER_NAME
765         for port_name in all_ports:
766             peer = self.LUA_PARAMETER_PEER[lua_param]
767             port_num = self.vnfd_helper.port_num(port_name)
768             intf = self.vnfd_helper.find_interface(name=port_name)
769             vintf = intf['virtual-interface']
770             local_ip = vintf["local_ip"]
771             dst_ip = vintf["dst_ip"]
772             local_ip_hex = ip_to_hex(local_ip, separator=' ')
773             dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
774             p.update([
775                 ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
776                 ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
777                 ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
778                 ("{}_ip_port_{}".format(peer, port_num), dst_ip),
779             ])
780         lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
781         return lua
782
783     def upload_prox_lua(self, config_dir, prox_config_data):
784         # we could have multiple lua directives
785         lau_dict = prox_config_data.get('lua', {})
786         find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
787         lua_file = next((found[0] for found in find_iter if found), None)
788         if not lua_file:
789             return ""
790
791         out = self.generate_prox_lua_file()
792         remote_path = os.path.join(config_dir, lua_file)
793         return self.put_string_to_file(out, remote_path)
794
795     def upload_prox_config(self, config_file, prox_config_data):
796         # prox can't handle spaces around ' = ' so use custom method
797         out = StringIO(self.write_prox_config(prox_config_data))
798         out.seek(0)
799         remote_path = os.path.join("/tmp", config_file)
800         self.ssh_helper.put_file_obj(out, remote_path)
801
802         return remote_path
803
804     def build_config_file(self):
805         task_path = self.scenario_helper.task_path
806         options = self.scenario_helper.options
807         config_path = options['prox_config']
808         config_file = os.path.basename(config_path)
809         config_path = find_relative_file(config_path, task_path)
810         self.additional_files = {}
811
812         prox_files = options.get('prox_files', [])
813         if isinstance(prox_files, six.string_types):
814             prox_files = [prox_files]
815         for key_prox_file in prox_files:
816             base_prox_file = os.path.basename(key_prox_file)
817             key_prox_path = find_relative_file(key_prox_file, task_path)
818             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
819             self.additional_files[base_prox_file] = remote_prox_file
820
821         self._prox_config_data = self.generate_prox_config_file(config_path)
822         # copy config to queue so we can read it from traffic_runner process
823         self.config_queue.put(self._prox_config_data)
824         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
825
826     def build_config(self):
827         self.build_config_file()
828
829         options = self.scenario_helper.options
830
831         prox_args = options['prox_args']
832         LOG.info("Provision and start the %s", self.APP_NAME)
833         self._build_pipeline_kwargs()
834         self.pipeline_kwargs["args"] = " ".join(
835             " ".join([k, v if v else ""]) for k, v in prox_args.items())
836         self.pipeline_kwargs["cfg_file"] = self.remote_path
837
838         cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
839         prox_cmd = cmd_template.format(**self.pipeline_kwargs)
840         return prox_cmd
841
842
843 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
844 class ProxResourceHelper(ClientResourceHelper):
845
846     RESOURCE_WORD = 'prox'
847
848     PROX_MODE = ""
849
850     WAIT_TIME = 3
851
852     @staticmethod
853     def find_pci(pci, bound_pci):
854         # we have to substring match PCI bus address from the end
855         return any(b.endswith(pci) for b in bound_pci)
856
857     def __init__(self, setup_helper):
858         super(ProxResourceHelper, self).__init__(setup_helper)
859         self.mgmt_interface = self.vnfd_helper.mgmt_interface
860         self._user = self.mgmt_interface["user"]
861         self._ip = self.mgmt_interface["ip"]
862
863         self.done = False
864         self._vpci_to_if_name_map = None
865         self.additional_file = {}
866         self.remote_prox_file_name = None
867         self.lower = None
868         self.upper = None
869         self.step_delta = 1
870         self.step_time = 0.5
871         self._test_type = None
872
873     @property
874     def sut(self):
875         if not self.client:
876             self.client = self._connect()
877         return self.client
878
879     @property
880     def test_type(self):
881         if self._test_type is None:
882             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
883         return self._test_type
884
885     def run_traffic(self, traffic_profile):
886         self._queue.cancel_join_thread()
887         self.lower = 0.0
888         self.upper = 100.0
889
890         traffic_profile.init(self._queue)
891         # this frees up the run_traffic loop
892         self.client_started.value = 1
893
894         while not self._terminated.value:
895             # move it all to traffic_profile
896             self._run_traffic_once(traffic_profile)
897
898     def _run_traffic_once(self, traffic_profile):
899         traffic_profile.execute_traffic(self)
900         if traffic_profile.done:
901             self._queue.put({'done': True})
902             LOG.debug("tg_prox done")
903             self._terminated.value = 1
904
905     # For VNF use ResourceHelper method to collect KPIs directly.
906     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
907     def collect_collectd_kpi(self):
908         return self._collect_resource_kpi()
909
910     def collect_kpi(self):
911         result = super(ProxResourceHelper, self).collect_kpi()
912         # add in collectd kpis manually
913         if result:
914             result['collect_stats'] = self._collect_resource_kpi()
915         return result
916
917     def terminate(self):
918         # should not be called, use VNF terminate
919         raise NotImplementedError()
920
921     def up_post(self):
922         return self.sut  # force connection
923
924     def execute(self, cmd, *args, **kwargs):
925         func = getattr(self.sut, cmd, None)
926         if func:
927             return func(*args, **kwargs)
928
929     def _connect(self, client=None):
930         """Run and connect to prox on the remote system """
931         # De-allocating a large amount of hugepages takes some time. If a new
932         # PROX instance is started immediately after killing the previous one,
933         # it might not be able to allocate hugepages, because they are still
934         # being freed. Hence the -w switch.
935         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
936         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
937         # -f ./handle_none-4.cfg"
938         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
939         #  "; " \
940         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
941         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
942         # sudo " \
943         #    + "./build/Prox " + prox_args
944         # log.debug("Starting PROX with command [%s]", prox_cmd)
945         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
946         # self._ip, prox_cmd))
947         if client is None:
948             client = ProxSocketHelper()
949
950         # try connecting to Prox for 60s
951         for _ in range(RETRY_SECONDS):
952             time.sleep(RETRY_INTERVAL)
953             try:
954                 client.connect(self._ip, PROX_PORT)
955             except (socket.gaierror, socket.error):
956                 continue
957             else:
958                 return client
959
960         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
961         raise Exception(msg.format(self._ip, PROX_PORT))
962
963
964 class ProxDataHelper(object):
965
966     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
967         super(ProxDataHelper, self).__init__()
968         self.vnfd_helper = vnfd_helper
969         self.sut = sut
970         self.pkt_size = pkt_size
971         self.value = value
972         self.tolerated_loss = tolerated_loss
973         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
974         self.tsc_hz = None
975         self.measured_stats = None
976         self.latency = None
977         self._totals_and_pps = None
978         self.result_tuple = None
979
980     @property
981     def totals_and_pps(self):
982         if self._totals_and_pps is None:
983             rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
984             pps = self.value / 100.0 * self.line_rate_to_pps()
985             self._totals_and_pps = rx_total, tx_total, pps
986         return self._totals_and_pps
987
988     @property
989     def rx_total(self):
990         return self.totals_and_pps[0]
991
992     @property
993     def tx_total(self):
994         return self.totals_and_pps[1]
995
996     @property
997     def pps(self):
998         return self.totals_and_pps[2]
999
1000     @property
1001     def samples(self):
1002         samples = {}
1003         for port_name, port_num in self.vnfd_helper.ports_iter():
1004             port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1005             samples[port_name] = {
1006                 "in_packets": port_rx_total,
1007                 "out_packets": port_tx_total,
1008             }
1009         return samples
1010
1011     def __enter__(self):
1012         self.check_interface_count()
1013         return self
1014
1015     def __exit__(self, exc_type, exc_val, exc_tb):
1016         self.make_tuple()
1017
1018     def make_tuple(self):
1019         if self.result_tuple:
1020             return
1021
1022         self.result_tuple = ProxTestDataTuple(
1023             self.tolerated_loss,
1024             self.tsc_hz,
1025             self.measured_stats['delta'].rx,
1026             self.measured_stats['delta'].tx,
1027             self.measured_stats['delta'].tsc,
1028             self.latency,
1029             self.rx_total,
1030             self.tx_total,
1031             self.pps,
1032         )
1033         self.result_tuple.log_data()
1034
1035     @contextmanager
1036     def measure_tot_stats(self):
1037         with self.sut.measure_tot_stats() as self.measured_stats:
1038             yield
1039
1040     def check_interface_count(self):
1041         # do this assert in init?  unless we expect interface count to
1042         # change from one run to another run...
1043         assert self.port_count in {1, 2, 4}, \
1044             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1045
1046     def capture_tsc_hz(self):
1047         self.tsc_hz = float(self.sut.hz())
1048
1049     def line_rate_to_pps(self):
1050         # FIXME Don't hardcode 10Gb/s
1051         return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1052
1053
1054 class ProxProfileHelper(object):
1055
1056     __prox_profile_type__ = "Generic"
1057
1058     PROX_CORE_GEN_MODE = "gen"
1059     PROX_CORE_LAT_MODE = "lat"
1060
1061     @classmethod
1062     def get_cls(cls, helper_type):
1063         """Return class of specified type."""
1064         if not helper_type:
1065             return ProxProfileHelper
1066
1067         for profile_helper_class in utils.itersubclasses(cls):
1068             if helper_type == profile_helper_class.__prox_profile_type__:
1069                 return profile_helper_class
1070
1071         return ProxProfileHelper
1072
1073     @classmethod
1074     def make_profile_helper(cls, resource_helper):
1075         return cls.get_cls(resource_helper.test_type)(resource_helper)
1076
1077     def __init__(self, resource_helper):
1078         super(ProxProfileHelper, self).__init__()
1079         self.resource_helper = resource_helper
1080         self._cpu_topology = None
1081         self._test_cores = None
1082         self._latency_cores = None
1083
1084     @property
1085     def cpu_topology(self):
1086         if not self._cpu_topology:
1087             stdout = io.BytesIO()
1088             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1089             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1090         return self._cpu_topology
1091
1092     @property
1093     def test_cores(self):
1094         if not self._test_cores:
1095             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1096         return self._test_cores
1097
1098     @property
1099     def latency_cores(self):
1100         if not self._latency_cores:
1101             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1102         return self._latency_cores
1103
1104     @contextmanager
1105     def traffic_context(self, pkt_size, value):
1106         self.sut.stop_all()
1107         self.sut.reset_stats()
1108         try:
1109             self.sut.set_pkt_size(self.test_cores, pkt_size)
1110             self.sut.set_speed(self.test_cores, value)
1111             self.sut.start_all()
1112             yield
1113         finally:
1114             self.sut.stop_all()
1115
1116     def get_cores(self, mode):
1117         cores = []
1118
1119         for section_name, section in self.setup_helper.prox_config_data:
1120             if not section_name.startswith("core"):
1121                 continue
1122
1123             for key, value in section:
1124                 if key == "mode" and value == mode:
1125                     core_tuple = CoreSocketTuple(section_name)
1126                     core = core_tuple.find_in_topology(self.cpu_topology)
1127                     cores.append(core)
1128
1129         return cores
1130
1131     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1132         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1133
1134         with data_helper, self.traffic_context(pkt_size, value):
1135             with data_helper.measure_tot_stats():
1136                 time.sleep(duration)
1137                 # Getting statistics to calculate PPS at right speed....
1138                 data_helper.capture_tsc_hz()
1139                 data_helper.latency = self.get_latency()
1140
1141         return data_helper.result_tuple, data_helper.samples
1142
1143     def get_latency(self):
1144         """
1145         :return: return lat_min, lat_max, lat_avg
1146         :rtype: list
1147         """
1148         if self._latency_cores:
1149             return self.sut.lat_stats(self._latency_cores)
1150         return []
1151
1152     def terminate(self):
1153         pass
1154
1155     def __getattr__(self, item):
1156         return getattr(self.resource_helper, item)
1157
1158
1159 class ProxMplsProfileHelper(ProxProfileHelper):
1160
1161     __prox_profile_type__ = "MPLS tag/untag"
1162
1163     def __init__(self, resource_helper):
1164         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1165         self._cores_tuple = None
1166
1167     @property
1168     def mpls_cores(self):
1169         if not self._cores_tuple:
1170             self._cores_tuple = self.get_cores_mpls()
1171         return self._cores_tuple
1172
1173     @property
1174     def tagged_cores(self):
1175         return self.mpls_cores[0]
1176
1177     @property
1178     def plain_cores(self):
1179         return self.mpls_cores[1]
1180
1181     def get_cores_mpls(self):
1182         cores_tagged = []
1183         cores_plain = []
1184         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1185             if not section_name.startswith("core"):
1186                 continue
1187
1188             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1189                 continue
1190
1191             for item_key, item_value in section:
1192                 if item_key != 'name':
1193                     continue
1194
1195                 if item_value.startswith("tag"):
1196                     core_tuple = CoreSocketTuple(section_name)
1197                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1198                     cores_tagged.append(core_tag)
1199
1200                 elif item_value.startswith("udp"):
1201                     core_tuple = CoreSocketTuple(section_name)
1202                     core_udp = core_tuple.find_in_topology(self.cpu_topology)
1203                     cores_plain.append(core_udp)
1204
1205         return cores_tagged, cores_plain
1206
1207     @contextmanager
1208     def traffic_context(self, pkt_size, value):
1209         self.sut.stop_all()
1210         self.sut.reset_stats()
1211         try:
1212             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1213             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1214             self.sut.set_speed(self.tagged_cores, value)
1215             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1216             self.sut.set_speed(self.plain_cores, value * ratio)
1217             self.sut.start_all()
1218             yield
1219         finally:
1220             self.sut.stop_all()
1221
1222
1223 class ProxBngProfileHelper(ProxProfileHelper):
1224
1225     __prox_profile_type__ = "BNG gen"
1226
1227     def __init__(self, resource_helper):
1228         super(ProxBngProfileHelper, self).__init__(resource_helper)
1229         self._cores_tuple = None
1230
1231     @property
1232     def bng_cores(self):
1233         if not self._cores_tuple:
1234             self._cores_tuple = self.get_cores_gen_bng_qos()
1235         return self._cores_tuple
1236
1237     @property
1238     def cpe_cores(self):
1239         return self.bng_cores[0]
1240
1241     @property
1242     def inet_cores(self):
1243         return self.bng_cores[1]
1244
1245     @property
1246     def arp_cores(self):
1247         return self.bng_cores[2]
1248
1249     @property
1250     def arp_task_cores(self):
1251         return self.bng_cores[3]
1252
1253     @property
1254     def all_rx_cores(self):
1255         return self.latency_cores
1256
1257     def get_cores_gen_bng_qos(self):
1258         cpe_cores = []
1259         inet_cores = []
1260         arp_cores = []
1261         arp_tasks_core = [0]
1262         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1263             if not section_name.startswith("core"):
1264                 continue
1265
1266             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1267                 continue
1268
1269             for item_key, item_value in section:
1270                 if item_key != 'name':
1271                     continue
1272
1273                 if item_value.startswith("cpe"):
1274                     core_tuple = CoreSocketTuple(section_name)
1275                     cpe_core = core_tuple.find_in_topology(self.cpu_topology)
1276                     cpe_cores.append(cpe_core)
1277
1278                 elif item_value.startswith("inet"):
1279                     core_tuple = CoreSocketTuple(section_name)
1280                     inet_core = core_tuple.find_in_topology(self.cpu_topology)
1281                     inet_cores.append(inet_core)
1282
1283                 elif item_value.startswith("arp"):
1284                     core_tuple = CoreSocketTuple(section_name)
1285                     arp_core = core_tuple.find_in_topology(self.cpu_topology)
1286                     arp_cores.append(arp_core)
1287
1288                 # We check the tasks/core separately
1289                 if item_value.startswith("arp_task"):
1290                     core_tuple = CoreSocketTuple(section_name)
1291                     arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
1292                     arp_tasks_core.append(arp_task_core)
1293
1294         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1295
1296     @contextmanager
1297     def traffic_context(self, pkt_size, value):
1298         # Tester is sending packets at the required speed already after
1299         # setup_test(). Just get the current statistics, sleep the required
1300         # amount of time and calculate packet loss.
1301         inet_pkt_size = pkt_size
1302         cpe_pkt_size = pkt_size - 24
1303         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1304
1305         curr_up_speed = curr_down_speed = 0
1306         max_up_speed = max_down_speed = value
1307         if ratio < 1:
1308             max_down_speed = value * ratio
1309         else:
1310             max_up_speed = value / ratio
1311
1312         # Initialize cores
1313         self.sut.stop_all()
1314         time.sleep(0.5)
1315
1316         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1317         # wrong.
1318         self.sut.start(self.all_rx_cores)
1319         time.sleep(0.5)
1320         self.sut.stop(self.all_rx_cores)
1321         time.sleep(0.5)
1322         self.sut.reset_stats()
1323
1324         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1325         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1326
1327         self.sut.reset_values(self.cpe_cores)
1328         self.sut.reset_values(self.inet_cores)
1329
1330         # Set correct IP and UDP lengths in packet headers
1331         # CPE
1332         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1333         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1334         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1335         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1336
1337         # INET
1338         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1339         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1340         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1341         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1342         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1343         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1344
1345         # Sending ARP to initialize tables - need a few seconds of generation
1346         # to make sure all CPEs are initialized
1347         LOG.info("Initializing SUT: sending ARP packets")
1348         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1349         self.sut.set_speed(self.inet_cores, curr_up_speed)
1350         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1351         self.sut.start(self.arp_cores)
1352         time.sleep(4)
1353
1354         # Ramp up the transmission speed. First go to the common speed, then
1355         # increase steps for the faster one.
1356         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1357
1358         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1359
1360         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1361             # The min(..., ...) takes care of 1) floating point rounding errors
1362             # that could make curr_*_speed to be slightly greater than
1363             # max_*_speed and 2) max_*_speed not being an exact multiple of
1364             # self._step_delta.
1365             if curr_up_speed < max_up_speed:
1366                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1367             if curr_down_speed < max_down_speed:
1368                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1369
1370             self.sut.set_speed(self.inet_cores, curr_up_speed)
1371             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1372             time.sleep(self.step_time)
1373
1374         LOG.info("Target speeds reached. Starting real test.")
1375
1376         yield
1377
1378         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1379         LOG.info("Test ended. Flushing NIC buffers")
1380         self.sut.start(self.all_rx_cores)
1381         time.sleep(3)
1382         self.sut.stop(self.all_rx_cores)
1383
1384     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1385         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1386
1387         with data_helper, self.traffic_context(pkt_size, value):
1388             with data_helper.measure_tot_stats():
1389                 time.sleep(duration)
1390                 # Getting statistics to calculate PPS at right speed....
1391                 data_helper.capture_tsc_hz()
1392                 data_helper.latency = self.get_latency()
1393
1394         return data_helper.result_tuple, data_helper.samples