1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
25 from collections import OrderedDict, namedtuple
27 from contextlib import contextmanager
28 from itertools import repeat, chain
29 from multiprocessing import Queue
32 from six.moves import zip, StringIO
33 from six.moves import cStringIO
35 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
36 from yardstick.common import utils
37 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
38 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
39 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
40 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
47 LOG = logging.getLogger(__name__)
48 LOG.setLevel(logging.DEBUG)
55 CONFIGURATION_OPTIONS = (
56 # dict key section key default value
57 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
58 ('testDuration', 'general', 'test_duration', 5.0),
59 ('testPrecision', 'general', 'test_precision', 1.0),
60 ('tests', 'general', 'tests', None),
61 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
63 ('logFile', 'logging', 'file', 'dats.log'),
64 ('logDateFormat', 'logging', 'datefmt', None),
65 ('logLevel', 'logging', 'level', 'INFO'),
66 ('logOverwrite', 'logging', 'overwrite', 1),
68 ('testerIp', 'tester', 'ip', None),
69 ('testerUser', 'tester', 'user', 'root'),
70 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
71 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
72 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
73 ('testerSocketId', 'tester', 'socket_id', 0),
75 ('sutIp', 'sut', 'ip', None),
76 ('sutUser', 'sut', 'user', 'root'),
77 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
78 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
79 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
80 ('sutSocketId', 'sut', 'socket_id', 0),
84 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
85 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
87 def __new__(cls, *args):
89 matches = cls.CORE_RE.search(str(args[0]))
91 args = matches.groups()
93 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94 'h' if args[2] else '')
96 except (AttributeError, TypeError, IndexError, ValueError):
97 raise ValueError('Invalid core spec {}'.format(args))
99 def is_hyperthread(self):
100 return self.hyperthread == 'h'
104 return int(self.is_hyperthread())
106 def find_in_topology(self, cpu_topology):
108 socket_core_match = cpu_topology[self.socket_id][self.core_id]
109 sorted_match = sorted(socket_core_match.values())
110 return sorted_match[self.index][0]
111 except (KeyError, IndexError):
112 template = "Core {}{} on socket {} does not exist"
113 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
117 def __new__(cls, *args):
119 assert args[0] is not str(args[0])
120 args = tuple(args[0])
121 except (AssertionError, IndexError, TypeError):
124 return super(TotStatsTuple, cls).__new__(cls, *args)
127 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
128 'delta_tx,delta_tsc,'
129 'latency,rx_total,tx_total,pps')):
133 return 1e2 * self.drop_total / float(self.tx_total)
134 except ZeroDivisionError:
139 # calculate the effective throughput in Mpps
140 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
143 def can_be_lost(self):
144 return int(self.tx_total * self.tolerated / 1e2)
147 def drop_total(self):
148 return self.tx_total - self.rx_total
152 return self.drop_total <= self.can_be_lost
154 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
156 pkt_loss = self.pkt_loss
158 if port_samples is None:
168 "Throughput": self.mpps,
169 "DropPackets": pkt_loss,
170 "CurrentDropPackets": pkt_loss,
171 "TxThroughput": self.pps / 1e6,
172 "RxThroughput": self.mpps,
176 samples.update(port_samples)
178 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
181 def log_data(self, logger=None):
185 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
186 logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
187 logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
190 class PacketDump(object):
192 def assert_func(func, value1, value2, template=None):
193 assert func(value1, value2), template.format(value1, value2)
195 def __init__(self, port_id, data_len, payload):
196 template = "Packet dump has specified length {}, but payload is {} bytes long"
197 self.assert_func(operator.eq, data_len, len(payload), template)
198 self._port_id = port_id
199 self._data_len = data_len
200 self._payload = payload
204 """Get the port id of the packet dump"""
209 """Get the length of the data received"""
210 return self._data_len
213 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
215 def payload(self, start=None, end=None):
216 """Get part of the payload as a list of ordinals.
218 Returns a list of byte values, matching the contents of the packet dump.
219 Optional start and end parameters can be specified to retrieve only a
220 part of the packet contents.
222 The number of elements in the list is equal to end - start + 1, so end
223 is the offset of the last character.
226 start (pos. int): the starting offset in the payload. If it is not
227 specified or None, offset 0 is assumed.
228 end (pos. int): the ending offset of the payload. If it is not
229 specified or None, the contents until the end of the packet are
233 [int, int, ...]. Each int represents the ordinal value of a byte in
240 end = self.data_len - 1
242 # Bounds checking on offsets
243 template = "Start offset must be non-negative"
244 self.assert_func(operator.ge, start, 0, template)
246 template = "End offset must be less than {1}"
247 self.assert_func(operator.lt, end, self.data_len, template)
249 # Adjust for splice operation: end offset must be 1 more than the offset
250 # of the last desired character.
253 return self._payload[start:end]
256 class ProxSocketHelper(object):
258 def __init__(self, sock=None):
259 """ creates new prox instance """
260 super(ProxSocketHelper, self).__init__()
263 sock = socket.socket()
267 self.master_stats = None
269 def connect(self, ip, port):
270 """Connect to the prox instance on the remote system"""
271 self._sock.connect((ip, port))
273 def get_socket(self):
274 """ get the socket connected to the remote instance """
277 def _parse_socket_data(self, decoded_data, pkt_dump_only):
278 def get_newline_index():
279 return decoded_data.find('\n', index)
283 for newline_index in iter(get_newline_index, -1):
284 ret_str = decoded_data[index:newline_index]
287 mode, port_id, data_len = ret_str.split(',', 2)
289 mode, port_id, data_len = None, None, None
291 if mode != 'pktdump':
292 # Regular 1-line message. Stop reading from the socket.
293 LOG.debug("Regular response read")
296 LOG.debug("Packet dump header read: [%s]", ret_str)
298 # The line is a packet dump header. Parse it, read the
299 # packet payload, store the dump for later retrieval.
300 # Skip over the packet dump and continue processing: a
301 # 1-line response may follow the packet dump.
303 data_len = int(data_len)
304 data_start = newline_index + 1 # + 1 to skip over \n
305 data_end = data_start + data_len
306 sub_data = decoded_data[data_start:data_end]
307 pkt_payload = array.array('B', (ord(v) for v in sub_data))
308 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
309 self._pkt_dumps.append(pkt_dump)
312 # Return boolean instead of string to signal
313 # successful reception of the packet dump.
314 LOG.debug("Packet dump stored, returning")
321 def get_data(self, pkt_dump_only=False, timeout=1):
322 """ read data from the socket """
324 # This method behaves slightly differently depending on whether it is
325 # called to read the response to a command (pkt_dump_only = 0) or if
326 # it is called specifically to read a packet dump (pkt_dump_only = 1).
328 # Packet dumps look like:
329 # pktdump,<port_id>,<data_len>\n
330 # <packet contents as byte array>\n
331 # This means the total packet dump message consists of 2 lines instead
334 # - Response for a command (pkt_dump_only = 0):
335 # 1) Read response from the socket until \n (end of message)
336 # 2a) If the response is a packet dump header (starts with "pktdump,"):
337 # - Read the packet payload and store the packet dump for later
339 # - Reset the state and restart from 1). Eventually state 2b) will
340 # be reached and the function will return.
341 # 2b) If the response is not a packet dump:
342 # - Return the received message as a string
344 # - Explicit request to read a packet dump (pkt_dump_only = 1):
345 # - Read the dump header and payload
346 # - Store the packet dump for later retrieval
347 # - Return True to signify a packet dump was successfully read
350 # recv() is blocking, so avoid calling it when no data is waiting.
351 ready = select.select([self._sock], [], [], timeout)
352 return bool(ready[0])
356 for status in iter(is_ready, False):
357 decoded_data = self._sock.recv(256).decode('utf-8')
358 ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
360 LOG.debug("Received data from socket: [%s]", ret_str)
361 return ret_str if status else ''
363 def put_command(self, to_send):
364 """ send data to the remote instance """
365 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
367 # TODO: sendall will block, we need a timeout
368 self._sock.sendall(to_send.encode('utf-8'))
372 def get_packet_dump(self):
373 """ get the next packet dump """
375 return self._pkt_dumps.pop(0)
378 def stop_all_reset(self):
379 """ stop the remote instance and reset stats """
380 LOG.debug("Stop all and reset stats")
385 """ stop all cores on the remote instance """
386 LOG.debug("Stop all")
387 self.put_command("stop all\n")
390 def stop(self, cores, task=''):
391 """ stop specific cores on the remote instance """
392 LOG.debug("Stopping cores %s", cores)
393 self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
397 """ start all cores on the remote instance """
398 LOG.debug("Start all")
399 self.put_command("start all\n")
401 def start(self, cores):
402 """ start specific cores on the remote instance """
403 LOG.debug("Starting cores %s", cores)
404 self.put_command("start {}\n".format(join_non_strings(',', cores)))
407 def reset_stats(self):
408 """ reset the statistics on the remote instance """
409 LOG.debug("Reset stats")
410 self.put_command("reset stats\n")
413 def _run_template_over_cores(self, template, cores, *args):
415 self.put_command(template.format(core, *args))
417 def set_pkt_size(self, cores, pkt_size):
418 """ set the packet size to generate on the remote instance """
419 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
421 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
424 def set_value(self, cores, offset, value, length):
425 """ set value on the remote instance """
426 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
427 LOG.debug(msg, cores, value, length, offset)
428 template = "set value {} 0 {} {} {}\n"
429 self._run_template_over_cores(template, cores, offset, value, length)
431 def reset_values(self, cores):
432 """ reset values on the remote instance """
433 LOG.debug("Set value for core(s) %s", cores)
434 self._run_template_over_cores("reset values {} 0\n", cores)
436 def set_speed(self, cores, speed, tasks=None):
437 """ set speed on the remote instance """
439 tasks = [0] * len(cores)
440 elif len(tasks) != len(cores):
441 LOG.error("set_speed: cores and tasks must have the same len")
442 LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
443 for (core, task) in list(zip(cores, tasks)):
444 self.put_command("speed {} {} {}\n".format(core, task, speed))
446 def slope_speed(self, cores_speed, duration, n_steps=0):
447 """will start to increase speed from 0 to N where N is taken from
448 a['speed'] for each a in cores_speed"""
449 # by default, each step will take 0.5 sec
451 n_steps = duration * 2
453 private_core_data = []
454 step_duration = float(duration) / n_steps
455 for core_data in cores_speed:
456 target = float(core_data['speed'])
457 private_core_data.append({
458 'cores': core_data['cores'],
460 'delta': target / n_steps,
465 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
466 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
467 time.sleep(step_duration)
468 for core_data in private_core_data:
469 core_data['current'] = core_data[key1] + core_data[key2]
470 self.set_speed(core_data['cores'], core_data['current'])
472 def set_pps(self, cores, pps, pkt_size):
473 """ set packets per second for specific cores on the remote instance """
474 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
475 LOG.debug(msg, cores, pps, pkt_size)
477 # speed in percent of line-rate
478 speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
479 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
481 def lat_stats(self, cores, task=0):
482 """Get the latency statistics from the remote system"""
483 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
488 self.put_command("lat stats {} {} \n".format(core, task))
489 ret = self.get_data()
492 lat_min[core], lat_max[core], lat_avg[core] = \
493 tuple(int(n) for n in ret.split(",")[:3])
495 except (AttributeError, ValueError, TypeError):
498 return lat_min, lat_max, lat_avg
500 def get_all_tot_stats(self):
501 self.put_command("tot stats\n")
502 all_stats_str = self.get_data().split(",")
503 if len(all_stats_str) != 4:
506 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
507 self.master_stats = all_stats
511 return self.get_all_tot_stats()[3]
515 def rx_stats(self, cores, task=0):
516 return self.core_stats(cores, task)
518 def core_stats(self, cores, task=0):
519 """Get the receive statistics from the remote system"""
520 rx = tx = drop = tsc = 0
522 self.put_command("core stats {} {}\n".format(core, task))
523 ret = self.get_data().split(",")
528 return rx, tx, drop, tsc
530 def port_stats(self, ports):
531 """get counter values from a specific port"""
532 tot_result = [0] * 12
534 self.put_command("port_stats {}\n".format(port))
535 ret = [try_int(s, 0) for s in self.get_data().split(",")]
536 tot_result = [sum(x) for x in zip(tot_result, ret)]
540 def measure_tot_stats(self):
541 start = self.get_all_tot_stats()
542 container = {'start_tot': start}
546 container['end_tot'] = end = self.get_all_tot_stats()
548 container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
551 """Get the total statistics from the remote system"""
552 stats = self.get_all_tot_stats()
555 def tot_ierrors(self):
556 """Get the total ierrors from the remote system"""
557 self.put_command("tot ierrors tot\n")
558 recv = self.get_data().split(',')
559 tot_ierrors = int(recv[0])
561 return tot_ierrors, tsc
563 def set_count(self, count, cores):
564 """Set the number of packets to send on the specified core"""
565 self._run_template_over_cores("count {} 0 {}\n", cores, count)
567 def dump_rx(self, core_id, task_id=0, count=1):
568 """Activate dump on rx on the specified core"""
569 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
570 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
571 time.sleep(1.5) # Give PROX time to set up packet dumping
579 """ stop all cores on the remote instance """
580 LOG.debug("Quit prox")
581 self.put_command("quit\n")
584 def force_quit(self):
585 """ stop all cores on the remote instance """
586 LOG.debug("Force Quit prox")
587 self.put_command("quit_force\n")
591 _LOCAL_OBJECT = object()
594 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
595 # the actual app is lowercase
597 # not used for Prox but added for consistency
600 LUA_PARAMETER_NAME = ""
601 LUA_PARAMETER_PEER = {
606 CONFIG_QUEUE_TIMEOUT = 120
608 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
609 self.remote_path = None
610 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
611 self.remote_prox_file_name = None
612 self._prox_config_data = None
613 self.additional_files = {}
614 self.config_queue = Queue()
615 # allow_exit_without_flush
616 self.config_queue.cancel_join_thread()
617 self._global_section = None
620 def prox_config_data(self):
621 if self._prox_config_data is None:
622 # this will block, but it needs too
623 self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
624 return self._prox_config_data
627 def global_section(self):
628 if self._global_section is None and self.prox_config_data:
629 self._global_section = self.find_section("global")
630 return self._global_section
632 def find_section(self, name, default=_LOCAL_OBJECT):
633 result = next((value for key, value in self.prox_config_data if key == name), default)
634 if result is _LOCAL_OBJECT:
635 raise KeyError('{} not found in Prox config'.format(name))
638 def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
639 section = self.find_section(section_name, [])
640 result = next((value for key, value in section if key == section_key), default)
641 if result is _LOCAL_OBJECT:
642 template = '{} not found in {} section of Prox config'
643 raise KeyError(template.format(section_key, section_name))
646 def _build_pipeline_kwargs(self):
647 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
648 self.pipeline_kwargs = {
649 'tool_path': tool_path,
650 'tool_dir': os.path.dirname(tool_path),
653 def copy_to_target(self, config_file_path, prox_file):
654 remote_path = os.path.join("/tmp", prox_file)
655 self.ssh_helper.put(config_file_path, remote_path)
659 def _get_tx_port(section, sections):
661 for item in sections[section]:
662 if item[0] == "tx port":
663 iface_port = re.findall(r'\d+', item[1])
664 # do we want the last one?
665 # if yes, then can we reverse?
666 return int(iface_port[0])
669 def _replace_quoted_with_value(quoted, value, count=1):
670 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
673 def _insert_additional_file(self, value):
674 file_str = value.split('"')
675 base_name = os.path.basename(file_str[1])
676 file_str[1] = self.additional_files[base_name]
677 return '"'.join(file_str)
679 def generate_prox_config_file(self, config_path):
681 prox_config = ConfigParser(config_path, sections)
684 # Ensure MAC is set "hardware"
685 all_ports = self.vnfd_helper.port_pairs.all_ports
686 # use dpdk port number
687 for port_name in all_ports:
688 port_num = self.vnfd_helper.port_num(port_name)
689 port_section_name = "port {}".format(port_num)
690 for section_name, section in sections:
691 if port_section_name != section_name:
694 for index, section_data in enumerate(section):
695 if section_data[0] == "mac":
696 section_data[1] = "hardware"
699 for _, section in sections:
700 # for index, (item_key, item_val) in enumerate(section):
701 for index, section_data in enumerate(section):
702 item_key, item_val = section_data
703 if item_val.startswith("@@dst_mac"):
704 tx_port_iter = re.finditer(r'\d+', item_val)
705 tx_port_no = int(next(tx_port_iter).group(0))
706 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
707 mac = intf["virtual-interface"]["dst_mac"]
708 section_data[1] = mac.replace(":", " ", 6)
710 if item_key == "dst mac" and item_val.startswith("@@"):
711 tx_port_iter = re.finditer(r'\d+', item_val)
712 tx_port_no = int(next(tx_port_iter).group(0))
713 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
714 mac = intf["virtual-interface"]["dst_mac"]
715 section_data[1] = mac
717 # if addition file specified in prox config
718 if not self.additional_files:
721 for section_name, section in sections:
722 for index, section_data in enumerate(section):
724 if section_data[0].startswith("dofile"):
725 section_data[0] = self._insert_additional_file(section_data[0])
727 if section_data[1].startswith("dofile"):
728 section_data[1] = self._insert_additional_file(section_data[1])
735 def write_prox_lua(lua_config):
737 Write an .ini-format config file for PROX (parameters.lua)
738 PROX does not allow a space before/after the =, so we need
742 for key in lua_config:
743 value = '"' + lua_config[key] + '"'
744 if key == "__name__":
746 if value is not None and value != '@':
747 key = "=".join((key, str(value).replace('\n', '\n\t')))
750 key = str(key).replace('\n', '\n\t')
752 return os.linesep.join(out)
755 def write_prox_config(prox_config):
757 Write an .ini-format config file for PROX
758 PROX does not allow a space before/after the =, so we need
762 for i, (section_name, section) in enumerate(prox_config):
763 out.append("[{}]".format(section_name))
764 for index, item in enumerate(section):
766 if key == "__name__":
768 if value is not None and value != '@':
769 key = "=".join((key, str(value).replace('\n', '\n\t')))
772 key = str(key).replace('\n', '\n\t')
774 return os.linesep.join(out)
776 def put_string_to_file(self, s, remote_path):
777 file_obj = cStringIO(s)
778 self.ssh_helper.put_file_obj(file_obj, remote_path)
781 def generate_prox_lua_file(self):
783 all_ports = self.vnfd_helper.port_pairs.all_ports
784 for port_name in all_ports:
785 port_num = self.vnfd_helper.port_num(port_name)
786 intf = self.vnfd_helper.find_interface(name=port_name)
787 vintf = intf['virtual-interface']
788 p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
789 p["src_mac{0}".format(port_num)] = vintf["local_mac"]
793 def upload_prox_lua(self, config_file, lua_data):
794 # prox can't handle spaces around ' = ' so use custom method
795 out = StringIO(self.write_prox_lua(lua_data))
797 remote_path = os.path.join("/tmp", config_file)
798 self.ssh_helper.put_file_obj(out, remote_path)
802 def upload_prox_config(self, config_file, prox_config_data):
803 # prox can't handle spaces around ' = ' so use custom method
804 out = StringIO(self.write_prox_config(prox_config_data))
806 remote_path = os.path.join("/tmp", config_file)
807 self.ssh_helper.put_file_obj(out, remote_path)
811 def build_config_file(self):
812 task_path = self.scenario_helper.task_path
813 options = self.scenario_helper.options
814 config_path = options['prox_config']
815 config_file = os.path.basename(config_path)
816 config_path = find_relative_file(config_path, task_path)
817 self.additional_files = {}
820 if options['prox_generate_parameter']:
822 self.lua = self.generate_prox_lua_file()
823 if len(self.lua) > 0:
824 self.upload_prox_lua("parameters.lua", self.lua)
828 prox_files = options.get('prox_files', [])
829 if isinstance(prox_files, six.string_types):
830 prox_files = [prox_files]
831 for key_prox_file in prox_files:
832 base_prox_file = os.path.basename(key_prox_file)
833 key_prox_path = find_relative_file(key_prox_file, task_path)
834 remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
835 self.additional_files[base_prox_file] = remote_prox_file
837 self._prox_config_data = self.generate_prox_config_file(config_path)
838 # copy config to queue so we can read it from traffic_runner process
839 self.config_queue.put(self._prox_config_data)
840 self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
842 def build_config(self):
843 self.build_config_file()
845 options = self.scenario_helper.options
847 prox_args = options['prox_args']
848 LOG.info("Provision and start the %s", self.APP_NAME)
849 self._build_pipeline_kwargs()
850 self.pipeline_kwargs["args"] = " ".join(
851 " ".join([k, v if v else ""]) for k, v in prox_args.items())
852 self.pipeline_kwargs["cfg_file"] = self.remote_path
854 cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
855 prox_cmd = cmd_template.format(**self.pipeline_kwargs)
859 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
860 class ProxResourceHelper(ClientResourceHelper):
862 RESOURCE_WORD = 'prox'
869 def find_pci(pci, bound_pci):
870 # we have to substring match PCI bus address from the end
871 return any(b.endswith(pci) for b in bound_pci)
873 def __init__(self, setup_helper):
874 super(ProxResourceHelper, self).__init__(setup_helper)
875 self.mgmt_interface = self.vnfd_helper.mgmt_interface
876 self._user = self.mgmt_interface["user"]
877 self._ip = self.mgmt_interface["ip"]
880 self._vpci_to_if_name_map = None
881 self.additional_file = {}
882 self.remote_prox_file_name = None
887 self._test_type = None
892 self.client = self._connect()
897 if self._test_type is None:
898 self._test_type = self.setup_helper.find_in_section('global', 'name', None)
899 return self._test_type
901 def run_traffic(self, traffic_profile):
902 self._queue.cancel_join_thread()
906 traffic_profile.init(self._queue)
907 # this frees up the run_traffic loop
908 self.client_started.value = 1
910 while not self._terminated.value:
911 # move it all to traffic_profile
912 self._run_traffic_once(traffic_profile)
914 def _run_traffic_once(self, traffic_profile):
915 traffic_profile.execute_traffic(self)
916 if traffic_profile.done:
917 self._queue.put({'done': True})
918 LOG.debug("tg_prox done")
919 self._terminated.value = 1
921 # For VNF use ResourceHelper method to collect KPIs directly.
922 # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
923 def collect_collectd_kpi(self):
924 return self._collect_resource_kpi()
926 def collect_kpi(self):
927 result = super(ProxResourceHelper, self).collect_kpi()
928 # add in collectd kpis manually
930 result['collect_stats'] = self._collect_resource_kpi()
934 # should not be called, use VNF terminate
935 raise NotImplementedError()
938 return self.sut # force connection
940 def execute(self, cmd, *args, **kwargs):
941 func = getattr(self.sut, cmd, None)
943 return func(*args, **kwargs)
945 def _connect(self, client=None):
946 """Run and connect to prox on the remote system """
947 # De-allocating a large amount of hugepages takes some time. If a new
948 # PROX instance is started immediately after killing the previous one,
949 # it might not be able to allocate hugepages, because they are still
950 # being freed. Hence the -w switch.
951 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
952 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
953 # -f ./handle_none-4.cfg"
954 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
956 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
957 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
959 # + "./build/Prox " + prox_args
960 # log.debug("Starting PROX with command [%s]", prox_cmd)
961 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
962 # self._ip, prox_cmd))
964 client = ProxSocketHelper()
966 # try connecting to Prox for 60s
967 for _ in range(RETRY_SECONDS):
968 time.sleep(RETRY_INTERVAL)
970 client.connect(self._ip, PROX_PORT)
971 except (socket.gaierror, socket.error):
976 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
977 raise Exception(msg.format(self._ip, PROX_PORT))
980 class ProxDataHelper(object):
982 def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
983 super(ProxDataHelper, self).__init__()
984 self.vnfd_helper = vnfd_helper
986 self.pkt_size = pkt_size
988 self.tolerated_loss = tolerated_loss
989 self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
991 self.measured_stats = None
993 self._totals_and_pps = None
994 self.result_tuple = None
997 def totals_and_pps(self):
998 if self._totals_and_pps is None:
999 rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
1000 pps = self.value / 100.0 * self.line_rate_to_pps()
1001 self._totals_and_pps = rx_total, tx_total, pps
1002 return self._totals_and_pps
1006 return self.totals_and_pps[0]
1010 return self.totals_and_pps[1]
1014 return self.totals_and_pps[2]
1019 for port_name, port_num in self.vnfd_helper.ports_iter():
1020 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1021 samples[port_name] = {
1022 "in_packets": port_rx_total,
1023 "out_packets": port_tx_total,
1027 def __enter__(self):
1028 self.check_interface_count()
1031 def __exit__(self, exc_type, exc_val, exc_tb):
1034 def make_tuple(self):
1035 if self.result_tuple:
1038 self.result_tuple = ProxTestDataTuple(
1039 self.tolerated_loss,
1041 self.measured_stats['delta'].rx,
1042 self.measured_stats['delta'].tx,
1043 self.measured_stats['delta'].tsc,
1049 self.result_tuple.log_data()
1052 def measure_tot_stats(self):
1053 with self.sut.measure_tot_stats() as self.measured_stats:
1056 def check_interface_count(self):
1057 # do this assert in init? unless we expect interface count to
1058 # change from one run to another run...
1059 assert self.port_count in {1, 2, 4}, \
1060 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1062 def capture_tsc_hz(self):
1063 self.tsc_hz = float(self.sut.hz())
1065 def line_rate_to_pps(self):
1066 # FIXME Don't hardcode 10Gb/s
1067 return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1070 class ProxProfileHelper(object):
1072 __prox_profile_type__ = "Generic"
1074 PROX_CORE_GEN_MODE = "gen"
1075 PROX_CORE_LAT_MODE = "lat"
1078 def get_cls(cls, helper_type):
1079 """Return class of specified type."""
1081 return ProxProfileHelper
1083 for profile_helper_class in utils.itersubclasses(cls):
1084 if helper_type == profile_helper_class.__prox_profile_type__:
1085 return profile_helper_class
1087 return ProxProfileHelper
1090 def make_profile_helper(cls, resource_helper):
1091 return cls.get_cls(resource_helper.test_type)(resource_helper)
1093 def __init__(self, resource_helper):
1094 super(ProxProfileHelper, self).__init__()
1095 self.resource_helper = resource_helper
1096 self._cpu_topology = None
1097 self._test_cores = None
1098 self._latency_cores = None
1101 def cpu_topology(self):
1102 if not self._cpu_topology:
1103 stdout = io.BytesIO()
1104 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1105 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1106 return self._cpu_topology
1109 def test_cores(self):
1110 if not self._test_cores:
1111 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1112 return self._test_cores
1115 def latency_cores(self):
1116 if not self._latency_cores:
1117 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1118 return self._latency_cores
1121 def traffic_context(self, pkt_size, value):
1123 self.sut.reset_stats()
1125 self.sut.set_pkt_size(self.test_cores, pkt_size)
1126 self.sut.set_speed(self.test_cores, value)
1127 self.sut.start_all()
1132 def get_cores(self, mode):
1135 for section_name, section in self.setup_helper.prox_config_data:
1136 if not section_name.startswith("core"):
1139 for key, value in section:
1140 if key == "mode" and value == mode:
1141 core_tuple = CoreSocketTuple(section_name)
1142 core = core_tuple.find_in_topology(self.cpu_topology)
1147 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1148 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1150 with data_helper, self.traffic_context(pkt_size, value):
1151 with data_helper.measure_tot_stats():
1152 time.sleep(duration)
1153 # Getting statistics to calculate PPS at right speed....
1154 data_helper.capture_tsc_hz()
1155 data_helper.latency = self.get_latency()
1157 return data_helper.result_tuple, data_helper.samples
1159 def get_latency(self):
1161 :return: return lat_min, lat_max, lat_avg
1164 if self._latency_cores:
1165 return self.sut.lat_stats(self._latency_cores)
1168 def terminate(self):
1171 def __getattr__(self, item):
1172 return getattr(self.resource_helper, item)
1175 class ProxMplsProfileHelper(ProxProfileHelper):
1177 __prox_profile_type__ = "MPLS tag/untag"
1179 def __init__(self, resource_helper):
1180 super(ProxMplsProfileHelper, self).__init__(resource_helper)
1181 self._cores_tuple = None
1184 def mpls_cores(self):
1185 if not self._cores_tuple:
1186 self._cores_tuple = self.get_cores_mpls()
1187 return self._cores_tuple
1190 def tagged_cores(self):
1191 return self.mpls_cores[0]
1194 def plain_cores(self):
1195 return self.mpls_cores[1]
1197 def get_cores_mpls(self):
1200 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1201 if not section_name.startswith("core"):
1204 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1207 for item_key, item_value in section:
1208 if item_key != 'name':
1211 if item_value.startswith("tag"):
1212 core_tuple = CoreSocketTuple(section_name)
1213 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1214 cores_tagged.append(core_tag)
1216 elif item_value.startswith("udp"):
1217 core_tuple = CoreSocketTuple(section_name)
1218 core_udp = core_tuple.find_in_topology(self.cpu_topology)
1219 cores_plain.append(core_udp)
1221 return cores_tagged, cores_plain
1224 def traffic_context(self, pkt_size, value):
1226 self.sut.reset_stats()
1228 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1229 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1230 self.sut.set_speed(self.tagged_cores, value)
1231 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1232 self.sut.set_speed(self.plain_cores, value * ratio)
1233 self.sut.start_all()
1239 class ProxBngProfileHelper(ProxProfileHelper):
1241 __prox_profile_type__ = "BNG gen"
1243 def __init__(self, resource_helper):
1244 super(ProxBngProfileHelper, self).__init__(resource_helper)
1245 self._cores_tuple = None
1248 def bng_cores(self):
1249 if not self._cores_tuple:
1250 self._cores_tuple = self.get_cores_gen_bng_qos()
1251 return self._cores_tuple
1254 def cpe_cores(self):
1255 return self.bng_cores[0]
1258 def inet_cores(self):
1259 return self.bng_cores[1]
1262 def arp_cores(self):
1263 return self.bng_cores[2]
1266 def arp_task_cores(self):
1267 return self.bng_cores[3]
1270 def all_rx_cores(self):
1271 return self.latency_cores
1273 def get_cores_gen_bng_qos(self):
1277 arp_tasks_core = [0]
1278 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1279 if not section_name.startswith("core"):
1282 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1285 for item_key, item_value in section:
1286 if item_key != 'name':
1289 if item_value.startswith("cpe"):
1290 core_tuple = CoreSocketTuple(section_name)
1291 cpe_core = core_tuple.find_in_topology(self.cpu_topology)
1292 cpe_cores.append(cpe_core)
1294 elif item_value.startswith("inet"):
1295 core_tuple = CoreSocketTuple(section_name)
1296 inet_core = core_tuple.find_in_topology(self.cpu_topology)
1297 inet_cores.append(inet_core)
1299 elif item_value.startswith("arp"):
1300 core_tuple = CoreSocketTuple(section_name)
1301 arp_core = core_tuple.find_in_topology(self.cpu_topology)
1302 arp_cores.append(arp_core)
1304 # We check the tasks/core separately
1305 if item_value.startswith("arp_task"):
1306 core_tuple = CoreSocketTuple(section_name)
1307 arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
1308 arp_tasks_core.append(arp_task_core)
1310 return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1313 def traffic_context(self, pkt_size, value):
1314 # Tester is sending packets at the required speed already after
1315 # setup_test(). Just get the current statistics, sleep the required
1316 # amount of time and calculate packet loss.
1317 inet_pkt_size = pkt_size
1318 cpe_pkt_size = pkt_size - 24
1319 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1321 curr_up_speed = curr_down_speed = 0
1322 max_up_speed = max_down_speed = value
1324 max_down_speed = value * ratio
1326 max_up_speed = value / ratio
1332 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1334 self.sut.start(self.all_rx_cores)
1336 self.sut.stop(self.all_rx_cores)
1338 self.sut.reset_stats()
1340 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1341 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1343 self.sut.reset_values(self.cpe_cores)
1344 self.sut.reset_values(self.inet_cores)
1346 # Set correct IP and UDP lengths in packet headers
1348 # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1349 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1350 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1351 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1354 # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1355 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1356 # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1357 self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1358 # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1359 self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1361 # Sending ARP to initialize tables - need a few seconds of generation
1362 # to make sure all CPEs are initialized
1363 LOG.info("Initializing SUT: sending ARP packets")
1364 self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1365 self.sut.set_speed(self.inet_cores, curr_up_speed)
1366 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1367 self.sut.start(self.arp_cores)
1370 # Ramp up the transmission speed. First go to the common speed, then
1371 # increase steps for the faster one.
1372 self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1374 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1376 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1377 # The min(..., ...) takes care of 1) floating point rounding errors
1378 # that could make curr_*_speed to be slightly greater than
1379 # max_*_speed and 2) max_*_speed not being an exact multiple of
1381 if curr_up_speed < max_up_speed:
1382 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1383 if curr_down_speed < max_down_speed:
1384 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1386 self.sut.set_speed(self.inet_cores, curr_up_speed)
1387 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1388 time.sleep(self.step_time)
1390 LOG.info("Target speeds reached. Starting real test.")
1394 self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1395 LOG.info("Test ended. Flushing NIC buffers")
1396 self.sut.start(self.all_rx_cores)
1398 self.sut.stop(self.all_rx_cores)
1400 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1401 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1403 with data_helper, self.traffic_context(pkt_size, value):
1404 with data_helper.measure_tot_stats():
1405 time.sleep(duration)
1406 # Getting statistics to calculate PPS at right speed....
1407 data_helper.capture_tsc_hz()
1408 data_helper.latency = self.get_latency()
1410 return data_helper.result_tuple, data_helper.samples
1413 class ProxVpeProfileHelper(ProxProfileHelper):
1415 __prox_profile_type__ = "vPE gen"
1417 def __init__(self, resource_helper):
1418 super(ProxVpeProfileHelper, self).__init__(resource_helper)
1419 self._cores_tuple = None
1420 self._ports_tuple = None
1423 def vpe_cores(self):
1424 if not self._cores_tuple:
1425 self._cores_tuple = self.get_cores_gen_vpe()
1426 return self._cores_tuple
1429 def cpe_cores(self):
1430 return self.vpe_cores[0]
1433 def inet_cores(self):
1434 return self.vpe_cores[1]
1437 def all_rx_cores(self):
1438 return self.latency_cores
1441 def vpe_ports(self):
1442 if not self._ports_tuple:
1443 self._ports_tuple = self.get_ports_gen_vpe()
1444 return self._ports_tuple
1447 def cpe_ports(self):
1448 return self.vpe_ports[0]
1451 def inet_ports(self):
1452 return self.vpe_ports[1]
1454 def get_cores_gen_vpe(self):
1457 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1458 if not section_name.startswith("core"):
1461 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1464 for item_key, item_value in section:
1465 if item_key != 'name':
1468 if item_value.startswith("cpe"):
1469 core_tuple = CoreSocketTuple(section_name)
1470 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1471 cpe_cores.append(core_tag)
1473 elif item_value.startswith("inet"):
1474 core_tuple = CoreSocketTuple(section_name)
1475 inet_core = core_tuple.find_in_topology(self.cpu_topology)
1476 inet_cores.append(inet_core)
1478 return cpe_cores, inet_cores
1480 def get_ports_gen_vpe(self):
1484 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1485 if not section_name.startswith("port"):
1487 tx_port_iter = re.finditer(r'\d+', section_name)
1488 tx_port_no = int(next(tx_port_iter).group(0))
1490 for item_key, item_value in section:
1491 if item_key != 'name':
1494 for item_key, item_value in section:
1495 if item_value.startswith("cpe"):
1496 cpe_ports.append(tx_port_no)
1498 elif item_value.startswith("inet"):
1499 inet_ports.append(tx_port_no)
1501 return cpe_ports, inet_ports
1504 def traffic_context(self, pkt_size, value):
1505 # Calculate the target upload and download speed. The upload and
1506 # download packets have different packet sizes, so in order to get
1507 # equal bandwidth usage, the ratio of the speeds has to match the ratio
1508 # of the packet sizes.
1509 cpe_pkt_size = pkt_size
1510 inet_pkt_size = pkt_size - 4
1511 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1513 curr_up_speed = curr_down_speed = 0
1514 max_up_speed = max_down_speed = value
1516 max_down_speed = value * ratio
1518 max_up_speed = value / ratio
1520 # Adjust speed when multiple cores per port are used to generate traffic
1521 if len(self.cpe_ports) != len(self.cpe_cores):
1522 max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1523 if len(self.inet_ports) != len(self.inet_cores):
1524 max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1530 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1532 self.sut.start(self.all_rx_cores)
1534 self.sut.stop(self.all_rx_cores)
1536 self.sut.reset_stats()
1538 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1539 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1541 self.sut.reset_values(self.cpe_cores)
1542 self.sut.reset_values(self.inet_cores)
1544 # Set correct IP and UDP lengths in packet headers
1545 # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1546 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1547 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1548 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1550 # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1551 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1552 # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1553 self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1555 self.sut.set_speed(self.inet_cores, curr_up_speed)
1556 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1558 # Ramp up the transmission speed. First go to the common speed, then
1559 # increase steps for the faster one.
1560 self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1562 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1564 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1565 # The min(..., ...) takes care of 1) floating point rounding errors
1566 # that could make curr_*_speed to be slightly greater than
1567 # max_*_speed and 2) max_*_speed not being an exact multiple of
1569 if curr_up_speed < max_up_speed:
1570 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1571 if curr_down_speed < max_down_speed:
1572 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1574 self.sut.set_speed(self.inet_cores, curr_up_speed)
1575 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1576 time.sleep(self.step_time)
1578 LOG.info("Target speeds reached. Starting real test.")
1582 self.sut.stop(self.cpe_cores + self.inet_cores)
1583 LOG.info("Test ended. Flushing NIC buffers")
1584 self.sut.start(self.all_rx_cores)
1586 self.sut.stop(self.all_rx_cores)
1588 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1589 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1591 with data_helper, self.traffic_context(pkt_size, value):
1592 with data_helper.measure_tot_stats():
1593 time.sleep(duration)
1594 # Getting statistics to calculate PPS at right speed....
1595 data_helper.capture_tsc_hz()
1596 data_helper.latency = self.get_latency()
1598 return data_helper.result_tuple, data_helper.samples