X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fnetwork_services%2Fvnf_generic%2Fvnf%2Fprox_helpers.py;h=ac5abfbcb547b5e9509054d751a45f77b80f62a1;hb=5bc60308b6ec764bd1908138694ea8115b3adb35;hp=dfed45aa4d7af4bb4e1bff695d6c0d589cf9ea4e;hpb=4a5bc16d841221e8ac7853b3044e50af0c8143d2;p=yardstick.git diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index dfed45aa4..ac5abfbcb 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -14,28 +14,37 @@ from __future__ import absolute_import import array -import operator +import io import logging +import operator import os import re import select import socket -from collections import OrderedDict, namedtuple import time +from collections import OrderedDict, namedtuple from contextlib import contextmanager from itertools import repeat, chain +from multiprocessing import Queue +import six +from six.moves import cStringIO from six.moves import zip, StringIO from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file -from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings -from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser +from yardstick.common import utils +from yardstick.common.utils import SocketTopology, join_non_strings, try_int +from yardstick.network_services.helpers.iniparser import ConfigParser from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper PROX_PORT = 8474 +SECTION_NAME = 0 +SECTION_CONTENTS = 1 + LOG = logging.getLogger(__name__) +LOG.setLevel(logging.DEBUG) TEN_GIGABIT = 1e10 BITS_PER_BYTE = 8 @@ -72,8 +81,7 @@ CONFIGURATION_OPTIONS = ( class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')): - - CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?") + CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$") def __new__(cls, *args): try: @@ -81,7 +89,7 @@ class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread') if matches: args = matches.groups() - return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), int(args[1]), + return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0), 'h' if args[2] else '') except (AttributeError, TypeError, IndexError, ValueError): @@ -105,7 +113,6 @@ class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread') class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')): - def __new__(cls, *args): try: assert args[0] is not str(args[0]) @@ -119,7 +126,6 @@ class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')): class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,' 'delta_tx,delta_tsc,' 'latency,rx_total,tx_total,pps')): - @property def pkt_loss(self): try: @@ -144,10 +150,13 @@ class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_ def success(self): return self.drop_total <= self.can_be_lost - def get_samples(self, pkt_size, pkt_loss=None): + def get_samples(self, pkt_size, pkt_loss=None, port_samples=None): if pkt_loss is None: pkt_loss = self.pkt_loss + if port_samples is None: + port_samples = {} + latency_keys = [ "LatencyMin", "LatencyMax", @@ -162,6 +171,8 @@ class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_ "RxThroughput": self.mpps, "PktSize": pkt_size, } + if port_samples: + samples.update(port_samples) samples.update((key, value) for key, value in zip(latency_keys, self.latency)) return samples @@ -176,7 +187,6 @@ class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_ class PacketDump(object): - @staticmethod def assert_func(func, value1, value2, template=None): assert func(value1, value2), template.format(value1, value2) @@ -253,6 +263,7 @@ class ProxSocketHelper(object): self._sock = sock self._pkt_dumps = [] + self.master_stats = None def connect(self, ip, port): """Connect to the prox instance on the remote system""" @@ -308,6 +319,7 @@ class ProxSocketHelper(object): def get_data(self, pkt_dump_only=False, timeout=1): """ read data from the socket """ + # This method behaves slightly differently depending on whether it is # called to read the response to a command (pkt_dump_only = 0) or if # it is called specifically to read a packet dump (pkt_dump_only = 1). @@ -341,7 +353,6 @@ class ProxSocketHelper(object): status = False ret_str = "" for status in iter(is_ready, False): - LOG.debug("Reading from socket") decoded_data = self._sock.recv(256).decode('utf-8') ret_str = self._parse_socket_data(decoded_data, pkt_dump_only) @@ -351,7 +362,11 @@ class ProxSocketHelper(object): def put_command(self, to_send): """ send data to the remote instance """ LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n')) - self._sock.sendall(to_send.encode('utf-8')) + try: + # TODO: sendall will block, we need a timeout + self._sock.sendall(to_send.encode('utf-8')) + except: + pass def get_packet_dump(self): """ get the next packet dump """ @@ -417,10 +432,15 @@ class ProxSocketHelper(object): LOG.debug("Set value for core(s) %s", cores) self._run_template_over_cores("reset values {} 0\n", cores) - def set_speed(self, cores, speed): + def set_speed(self, cores, speed, tasks=None): """ set speed on the remote instance """ - LOG.debug("Set speed for core(s) %s to %g", cores, speed) - self._run_template_over_cores("speed {} 0 {}\n", cores, speed) + if tasks is None: + tasks = [0] * len(cores) + elif len(tasks) != len(cores): + LOG.error("set_speed: cores and tasks must have the same len") + LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed) + for (core, task) in list(zip(cores, tasks)): + self.put_command("speed {} {} {}\n".format(core, task, speed)) def slope_speed(self, cores_speed, duration, n_steps=0): """will start to increase speed from 0 to N where N is taken from @@ -478,11 +498,16 @@ class ProxSocketHelper(object): def get_all_tot_stats(self): self.put_command("tot stats\n") - all_stats = TotStatsTuple(int(v) for v in self.get_data().split(",")) + all_stats_str = self.get_data().split(",") + if len(all_stats_str) != 4: + all_stats = [0] * 4 + return all_stats + all_stats = TotStatsTuple(int(v) for v in all_stats_str) + self.master_stats = all_stats return all_stats def hz(self): - return self.get_all_tot_stats().hz + return self.get_all_tot_stats()[3] # Deprecated # TODO: remove @@ -503,11 +528,11 @@ class ProxSocketHelper(object): def port_stats(self, ports): """get counter values from a specific port""" - tot_result = list(repeat(0, 12)) + tot_result = [0] * 12 for port in ports: self.put_command("port_stats {}\n".format(port)) - for index, n in enumerate(self.get_data().split(',')): - tot_result[index] += int(n) + ret = [try_int(s, 0) for s in self.get_data().split(",")] + tot_result = [sum(x) for x in zip(tot_result, ret)] return tot_result @contextmanager @@ -542,7 +567,7 @@ class ProxSocketHelper(object): """Activate dump on rx on the specified core""" LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count) self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count)) - time.sleep(1.5) # Give PROX time to set up packet dumping + time.sleep(1.5) # Give PROX time to set up packet dumping def quit(self): self.stop_all() @@ -562,54 +587,14 @@ class ProxSocketHelper(object): time.sleep(3) -class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): - - def __init__(self, vnfd_helper, ssh_helper, scenario_helper): - super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper) - self.dpdk_root = "/root/dpdk-17.02" - - def setup_vnf_environment(self): - super(ProxDpdkVnfSetupEnvHelper, self).setup_vnf_environment() - - # debug dump after binding - self.ssh_helper.execute("sudo {} -s".format(self.dpdk_nic_bind)) - - def rebind_drivers(self, force=True): - if force: - force = '--force ' - else: - force = '' - cmd_template = "{} {}-b {} {}" - if not self.used_drivers: - self._find_used_drivers() - for vpci, (_, driver) in self.used_drivers.items(): - self.ssh_helper.execute(cmd_template.format(self.dpdk_nic_bind, force, driver, vpci)) - - def _setup_dpdk(self): - self._setup_hugepages() - - self.ssh_helper.execute("pkill prox") - self.ssh_helper.execute("sudo modprobe uio") - - # for baremetal - self.ssh_helper.execute("sudo modprobe msr") - - # why remove?, just keep it loaded - # self.connection.execute("sudo rmmod igb_uio") - - igb_uio_path = os.path.join(self.dpdk_root, "x86_64-native-linuxapp-gcc/kmod/igb_uio.ko") - self.ssh_helper.execute("sudo insmod {}".format(igb_uio_path)) - - # quick hack to allow non-root copy - self.ssh_helper.execute("sudo chmod 0777 {}".format(self.ssh_helper.bin_path)) +_LOCAL_OBJECT = object() -class ProxResourceHelper(ClientResourceHelper): - - PROX_CORE_GEN_MODE = "gen" - PROX_CORE_LAT_MODE = "lat" - - PROX_MODE = "" +class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): + # the actual app is lowercase + APP_NAME = 'prox' + # not used for Prox but added for consistency + VNF_TYPE = "PROX" LUA_PARAMETER_NAME = "" LUA_PARAMETER_PEER = { @@ -617,12 +602,57 @@ class ProxResourceHelper(ClientResourceHelper): "sut": "gen", } - WAIT_TIME = 3 + CONFIG_QUEUE_TIMEOUT = 120 - @staticmethod - def _replace_quoted_with_value(quoted, value, count=1): - new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count) - return new_string + def __init__(self, vnfd_helper, ssh_helper, scenario_helper): + self.remote_path = None + super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper) + self.remote_prox_file_name = None + self._prox_config_data = None + self.additional_files = {} + self.config_queue = Queue() + # allow_exit_without_flush + self.config_queue.cancel_join_thread() + self._global_section = None + + @property + def prox_config_data(self): + if self._prox_config_data is None: + # this will block, but it needs too + self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT) + return self._prox_config_data + + @property + def global_section(self): + if self._global_section is None and self.prox_config_data: + self._global_section = self.find_section("global") + return self._global_section + + def find_section(self, name, default=_LOCAL_OBJECT): + result = next((value for key, value in self.prox_config_data if key == name), default) + if result is _LOCAL_OBJECT: + raise KeyError('{} not found in Prox config'.format(name)) + return result + + def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT): + section = self.find_section(section_name, []) + result = next((value for key, value in section if key == section_key), default) + if result is _LOCAL_OBJECT: + template = '{} not found in {} section of Prox config' + raise KeyError(template.format(section_key, section_name)) + return result + + def _build_pipeline_kwargs(self): + tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME) + self.pipeline_kwargs = { + 'tool_path': tool_path, + 'tool_dir': os.path.dirname(tool_path), + } + + def copy_to_target(self, config_file_path, prox_file): + remote_path = os.path.join("/tmp", prox_file) + self.ssh_helper.put(config_file_path, remote_path) + return remote_path @staticmethod def _get_tx_port(section, sections): @@ -635,14 +665,90 @@ class ProxResourceHelper(ClientResourceHelper): return int(iface_port[0]) @staticmethod - def line_rate_to_pps(pkt_size, n_ports): - # FIXME Don't hardcode 10Gb/s - return n_ports * TEN_GIGABIT / BITS_PER_BYTE / (pkt_size + 20) + def _replace_quoted_with_value(quoted, value, count=1): + new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count) + return new_string + + def _insert_additional_file(self, value): + file_str = value.split('"') + base_name = os.path.basename(file_str[1]) + file_str[1] = self.additional_files[base_name] + return '"'.join(file_str) + + def generate_prox_config_file(self, config_path): + sections = [] + prox_config = ConfigParser(config_path, sections) + prox_config.parse() + + # Ensure MAC is set "hardware" + all_ports = self.vnfd_helper.port_pairs.all_ports + # use dpdk port number + for port_name in all_ports: + port_num = self.vnfd_helper.port_num(port_name) + port_section_name = "port {}".format(port_num) + for section_name, section in sections: + if port_section_name != section_name: + continue + + for index, section_data in enumerate(section): + if section_data[0] == "mac": + section_data[1] = "hardware" + + # search for dst mac + for _, section in sections: + # for index, (item_key, item_val) in enumerate(section): + for index, section_data in enumerate(section): + item_key, item_val = section_data + if item_val.startswith("@@dst_mac"): + tx_port_iter = re.finditer(r'\d+', item_val) + tx_port_no = int(next(tx_port_iter).group(0)) + intf = self.vnfd_helper.find_interface_by_port(tx_port_no) + mac = intf["virtual-interface"]["dst_mac"] + section_data[1] = mac.replace(":", " ", 6) + + if item_key == "dst mac" and item_val.startswith("@@"): + tx_port_iter = re.finditer(r'\d+', item_val) + tx_port_no = int(next(tx_port_iter).group(0)) + intf = self.vnfd_helper.find_interface_by_port(tx_port_no) + mac = intf["virtual-interface"]["dst_mac"] + section_data[1] = mac + + # if addition file specified in prox config + if not self.additional_files: + return sections + + for section_name, section in sections: + for index, section_data in enumerate(section): + try: + if section_data[0].startswith("dofile"): + section_data[0] = self._insert_additional_file(section_data[0]) + + if section_data[1].startswith("dofile"): + section_data[1] = self._insert_additional_file(section_data[1]) + except: + pass + + return sections @staticmethod - def find_pci(pci, bound_pci): - # we have to substring match PCI bus address from the end - return any(b.endswith(pci) for b in bound_pci) + def write_prox_lua(lua_config): + """ + Write an .ini-format config file for PROX (parameters.lua) + PROX does not allow a space before/after the =, so we need + a custom method + """ + out = [] + for key in lua_config: + value = '"' + lua_config[key] + '"' + if key == "__name__": + continue + if value is not None and value != '@': + key = "=".join((key, str(value).replace('\n', '\n\t'))) + out.append(key) + else: + key = str(key).replace('\n', '\n\t') + out.append(key) + return os.linesep.join(out) @staticmethod def write_prox_config(prox_config): @@ -652,16 +758,117 @@ class ProxResourceHelper(ClientResourceHelper): a custom method """ out = [] - for section_name, section_value in prox_config.items(): + for i, (section_name, section) in enumerate(prox_config): out.append("[{}]".format(section_name)) - for key, value in section_value: + for index, item in enumerate(section): + key, value = item if key == "__name__": continue - if value is not None: + if value is not None and value != '@': key = "=".join((key, str(value).replace('\n', '\n\t'))) - out.append(key) + out.append(key) + else: + key = str(key).replace('\n', '\n\t') + out.append(key) return os.linesep.join(out) + def put_string_to_file(self, s, remote_path): + file_obj = cStringIO(s) + self.ssh_helper.put_file_obj(file_obj, remote_path) + return remote_path + + def generate_prox_lua_file(self): + p = OrderedDict() + all_ports = self.vnfd_helper.port_pairs.all_ports + for port_name in all_ports: + port_num = self.vnfd_helper.port_num(port_name) + intf = self.vnfd_helper.find_interface(name=port_name) + vintf = intf['virtual-interface'] + p["tester_mac{0}".format(port_num)] = vintf["dst_mac"] + p["src_mac{0}".format(port_num)] = vintf["local_mac"] + + return p + + def upload_prox_lua(self, config_file, lua_data): + # prox can't handle spaces around ' = ' so use custom method + out = StringIO(self.write_prox_lua(lua_data)) + out.seek(0) + remote_path = os.path.join("/tmp", config_file) + self.ssh_helper.put_file_obj(out, remote_path) + + return remote_path + + def upload_prox_config(self, config_file, prox_config_data): + # prox can't handle spaces around ' = ' so use custom method + out = StringIO(self.write_prox_config(prox_config_data)) + out.seek(0) + remote_path = os.path.join("/tmp", config_file) + self.ssh_helper.put_file_obj(out, remote_path) + + return remote_path + + def build_config_file(self): + task_path = self.scenario_helper.task_path + options = self.scenario_helper.options + config_path = options['prox_config'] + config_file = os.path.basename(config_path) + config_path = find_relative_file(config_path, task_path) + self.additional_files = {} + + try: + if options['prox_generate_parameter']: + self.lua = [] + self.lua = self.generate_prox_lua_file() + if len(self.lua) > 0: + self.upload_prox_lua("parameters.lua", self.lua) + except: + pass + + prox_files = options.get('prox_files', []) + if isinstance(prox_files, six.string_types): + prox_files = [prox_files] + for key_prox_file in prox_files: + base_prox_file = os.path.basename(key_prox_file) + key_prox_path = find_relative_file(key_prox_file, task_path) + remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file) + self.additional_files[base_prox_file] = remote_prox_file + + self._prox_config_data = self.generate_prox_config_file(config_path) + # copy config to queue so we can read it from traffic_runner process + self.config_queue.put(self._prox_config_data) + self.remote_path = self.upload_prox_config(config_file, self._prox_config_data) + + def build_config(self): + self.build_config_file() + + options = self.scenario_helper.options + + prox_args = options['prox_args'] + LOG.info("Provision and start the %s", self.APP_NAME) + self._build_pipeline_kwargs() + self.pipeline_kwargs["args"] = " ".join( + " ".join([k, v if v else ""]) for k, v in prox_args.items()) + self.pipeline_kwargs["cfg_file"] = self.remote_path + + cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '" + prox_cmd = cmd_template.format(**self.pipeline_kwargs) + return prox_cmd + + +# this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi +class ProxResourceHelper(ClientResourceHelper): + + RESOURCE_WORD = 'prox' + + PROX_MODE = "" + + WAIT_TIME = 3 + + @staticmethod + def find_pci(pci, bound_pci): + # we have to substring match PCI bus address from the end + return any(b.endswith(pci) for b in bound_pci) + def __init__(self, setup_helper): super(ProxResourceHelper, self).__init__(setup_helper) self.mgmt_interface = self.vnfd_helper.mgmt_interface @@ -669,51 +876,29 @@ class ProxResourceHelper(ClientResourceHelper): self._ip = self.mgmt_interface["ip"] self.done = False - self._cpu_topology = None self._vpci_to_if_name_map = None - self.additional_file = False + self.additional_file = {} self.remote_prox_file_name = None - self.prox_config_dict = None self.lower = None self.upper = None - self._test_cores = None - self._latency_cores = None + self.step_delta = 1 + self.step_time = 0.5 + self._test_type = None @property def sut(self): if not self.client: - self.client = ProxSocketHelper() + self.client = self._connect() return self.client @property - def cpu_topology(self): - if not self._cpu_topology: - stdout = self.ssh_helper.execute("cat /proc/cpuinfo")[1] - self._cpu_topology = SocketTopology.parse_cpuinfo(stdout) - return self._cpu_topology - - @property - def vpci_to_if_name_map(self): - if self._vpci_to_if_name_map is None: - self._vpci_to_if_name_map = { - interface["virtual-interface"]["vpci"]: interface["name"] - for interface in self.vnfd_helper.interfaces - } - return self._vpci_to_if_name_map - - @property - def test_cores(self): - if not self._test_cores: - self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE) - return self._test_cores - - @property - def latency_cores(self): - if not self._latency_cores: - self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE) - return self._latency_cores + def test_type(self): + if self._test_type is None: + self._test_type = self.setup_helper.find_in_section('global', 'name', None) + return self._test_type def run_traffic(self, traffic_profile): + self._queue.cancel_join_thread() self.lower = 0.0 self.upper = 100.0 @@ -726,44 +911,27 @@ class ProxResourceHelper(ClientResourceHelper): self._run_traffic_once(traffic_profile) def _run_traffic_once(self, traffic_profile): - traffic_profile.execute(self) + traffic_profile.execute_traffic(self) if traffic_profile.done: self._queue.put({'done': True}) LOG.debug("tg_prox done") self._terminated.value = 1 - def start_collect(self): - pass - - def terminate(self): - super(ProxResourceHelper, self).terminate() - self.ssh_helper.execute('sudo pkill prox') - self.setup_helper.rebind_drivers() - - def get_process_args(self): - task_path = self.scenario_helper.task_path - options = self.scenario_helper.options - - prox_args = options['prox_args'] - prox_path = options['prox_path'] - config_path = options['prox_config'] - - config_file = os.path.basename(config_path) - config_path = find_relative_file(config_path, task_path) - - try: - prox_file_config_path = options['prox_files'] - prox_file_file = os.path.basename(prox_file_config_path) - prox_file_config_path = find_relative_file(prox_file_config_path, task_path) - self.remote_prox_file_name = self.copy_to_target(prox_file_config_path, prox_file_file) - self.additional_file = True - except: - self.additional_file = False + # For VNF use ResourceHelper method to collect KPIs directly. + # for TG leave the superclass ClientResourceHelper collect_kpi_method intact + def collect_collectd_kpi(self): + return self._collect_resource_kpi() - self.prox_config_dict = self.generate_prox_config_file(config_path) + def collect_kpi(self): + result = super(ProxResourceHelper, self).collect_kpi() + # add in collectd kpis manually + if result: + result['collect_stats'] = self._collect_resource_kpi() + return result - remote_path = self.upload_prox_config(config_file, self.prox_config_dict) - return prox_args, prox_path, remote_path + def terminate(self): + # should not be called, use VNF terminate + raise NotImplementedError() def up_post(self): return self.sut # force connection @@ -773,148 +941,219 @@ class ProxResourceHelper(ClientResourceHelper): if func: return func(*args, **kwargs) - def copy_to_target(self, config_file_path, prox_file): - remote_path = os.path.join("/tmp", prox_file) - self.ssh_helper.put(config_file_path, remote_path) - return remote_path + def _connect(self, client=None): + """Run and connect to prox on the remote system """ + # De-allocating a large amount of hugepages takes some time. If a new + # PROX instance is started immediately after killing the previous one, + # it might not be able to allocate hugepages, because they are still + # being freed. Hence the -w switch. + # self.connection.execute("sudo killall -w Prox 2>/dev/null") + # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t + # -f ./handle_none-4.cfg" + # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir + + # "; " \ + # + "export RTE_TARGET=" + self._dpdk_target + ";" \ + # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50; + # sudo " \ + # + "./build/Prox " + prox_args + # log.debug("Starting PROX with command [%s]", prox_cmd) + # thread.start_new_thread(self.ssh_check_quit, (self, self._user, + # self._ip, prox_cmd)) + if client is None: + client = ProxSocketHelper() - def upload_prox_config(self, config_file, prox_config_dict): - # prox can't handle spaces around ' = ' so use custom method - out = StringIO(self.write_prox_config(prox_config_dict)) - out.seek(0) - remote_path = os.path.join("/tmp", config_file) - self.ssh_helper.put_file_obj(out, remote_path) + # try connecting to Prox for 60s + for _ in range(RETRY_SECONDS): + time.sleep(RETRY_INTERVAL) + try: + client.connect(self._ip, PROX_PORT) + except (socket.gaierror, socket.error): + continue + else: + return client - return remote_path + msg = "Failed to connect to prox, please check if system {} accepts connections on port {}" + raise Exception(msg.format(self._ip, PROX_PORT)) + + +class ProxDataHelper(object): + + def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss): + super(ProxDataHelper, self).__init__() + self.vnfd_helper = vnfd_helper + self.sut = sut + self.pkt_size = pkt_size + self.value = value + self.tolerated_loss = tolerated_loss + self.port_count = len(self.vnfd_helper.port_pairs.all_ports) + self.tsc_hz = None + self.measured_stats = None + self.latency = None + self._totals_and_pps = None + self.result_tuple = None + + @property + def totals_and_pps(self): + if self._totals_and_pps is None: + rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8] + pps = self.value / 100.0 * self.line_rate_to_pps() + self._totals_and_pps = rx_total, tx_total, pps + return self._totals_and_pps + + @property + def rx_total(self): + return self.totals_and_pps[0] + + @property + def tx_total(self): + return self.totals_and_pps[1] + + @property + def pps(self): + return self.totals_and_pps[2] + + @property + def samples(self): + samples = {} + for port_name, port_num in self.vnfd_helper.ports_iter(): + port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8] + samples[port_name] = { + "in_packets": port_rx_total, + "out_packets": port_tx_total, + } + return samples + + def __enter__(self): + self.check_interface_count() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.make_tuple() + + def make_tuple(self): + if self.result_tuple: + return + + self.result_tuple = ProxTestDataTuple( + self.tolerated_loss, + self.tsc_hz, + self.measured_stats['delta'].rx, + self.measured_stats['delta'].tx, + self.measured_stats['delta'].tsc, + self.latency, + self.rx_total, + self.tx_total, + self.pps, + ) + self.result_tuple.log_data() @contextmanager - def traffic_context(self, pkt_size, value): - self.sut.stop_all() - self.sut.reset_stats() - self.sut.set_pkt_size(self.test_cores, pkt_size) - self.sut.set_speed(self.test_cores, value) - self.sut.start_all() - try: + def measure_tot_stats(self): + with self.sut.measure_tot_stats() as self.measured_stats: yield - finally: - self.sut.stop_all() - def run_test(self, pkt_size, duration, value, tolerated_loss=0.0): + def check_interface_count(self): # do this assert in init? unless we expect interface count to # change from one run to another run... - interfaces = self.vnfd_helper.interfaces - interface_count = len(interfaces) - assert interface_count in {2, 4}, \ - "Invalid no of ports, 2 or 4 ports only supported at this time" - - with self.traffic_context(pkt_size, value): - # Getting statistics to calculate PPS at right speed.... - tsc_hz = float(self.sut.hz()) - time.sleep(2) - with self.sut.measure_tot_stats() as data: - time.sleep(duration) + assert self.port_count in {1, 2, 4}, \ + "Invalid number of ports: 1, 2 or 4 ports only supported at this time" - # Get stats before stopping the cores. Stopping cores takes some time - # and might skew results otherwise. - latency = self.get_latency() + def capture_tsc_hz(self): + self.tsc_hz = float(self.sut.hz()) - deltas = data['delta'] - rx_total, tx_total = self.sut.port_stats(range(interface_count))[6:8] - pps = value / 100.0 * self.line_rate_to_pps(pkt_size, interface_count) + def line_rate_to_pps(self): + # FIXME Don't hardcode 10Gb/s + return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20) - result = ProxTestDataTuple(tolerated_loss, tsc_hz, deltas.rx, deltas.tx, - deltas.tsc, latency, rx_total, tx_total, pps) - result.log_data() - return result +class ProxProfileHelper(object): - def get_cores(self, mode): - cores = [] - for section_name, section_data in self.prox_config_dict.items(): - if section_name.startswith("core"): - for index, item in enumerate(section_data): - if item[0] == "mode" and item[1] == mode: - core = CoreSocketTuple(section_name).find_in_topology(self.cpu_topology) - cores.append(core) - return cores + __prox_profile_type__ = "Generic" - def upload_prox_lua(self, config_dir, prox_config_dict): - # we could have multiple lua directives - lau_dict = prox_config_dict.get('lua', {}) - find_iter = (re.findall('\("([^"]+)"\)', k) for k in lau_dict) - lua_file = next((found[0] for found in find_iter if found), None) - if not lua_file: - return "" + PROX_CORE_GEN_MODE = "gen" + PROX_CORE_LAT_MODE = "lat" - out = self.generate_prox_lua_file() - remote_path = os.path.join(config_dir, lua_file) - return self.put_string_to_file(out, remote_path) + @classmethod + def get_cls(cls, helper_type): + """Return class of specified type.""" + if not helper_type: + return ProxProfileHelper - def put_string_to_file(self, s, remote_path): - self.ssh_helper.run("cat > '{}'".format(remote_path), stdin=s) - return remote_path + for profile_helper_class in utils.itersubclasses(cls): + if helper_type == profile_helper_class.__prox_profile_type__: + return profile_helper_class - def generate_prox_lua_file(self): - p = OrderedDict() - ext_intf = self.vnfd_helper.interfaces - lua_param = self.LUA_PARAMETER_NAME - for intf in ext_intf: - peer = self.LUA_PARAMETER_PEER[lua_param] - port_num = intf["virtual-interface"]["dpdk_port_num"] - local_ip = intf["local_ip"] - dst_ip = intf["dst_ip"] - local_ip_hex = ip_to_hex(local_ip, separator=' ') - dst_ip_hex = ip_to_hex(dst_ip, separator=' ') - p.update([ - ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex), - ("{}_ip_port_{}".format(lua_param, port_num), local_ip), - ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex), - ("{}_ip_port_{}".format(peer, port_num), dst_ip), - ]) - lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items())) - return lua + return ProxProfileHelper - def generate_prox_config_file(self, config_path): - sections = {} - prox_config = ConfigParser(config_path, sections) - prox_config.parse() + @classmethod + def make_profile_helper(cls, resource_helper): + return cls.get_cls(resource_helper.test_type)(resource_helper) - # Ensure MAC is set "hardware" - ext_intf = self.vnfd_helper.interfaces - for intf in ext_intf: - port_num = intf["virtual-interface"]["dpdk_port_num"] - section_name = "port {}".format(port_num) - for index, section_data in enumerate(sections.get(section_name, [])): - if section_data[0] == "mac": - sections[section_name][index][1] = "hardware" - - # search for dest mac - for section_name, section_data in sections.items(): - for index, section_attr in enumerate(section_data): - if section_attr[0] != "dst mac": - continue + def __init__(self, resource_helper): + super(ProxProfileHelper, self).__init__() + self.resource_helper = resource_helper + self._cpu_topology = None + self._test_cores = None + self._latency_cores = None - tx_port_no = self._get_tx_port(section_name, sections) - if tx_port_no == -1: - raise Exception("Failed ..destination MAC undefined") + @property + def cpu_topology(self): + if not self._cpu_topology: + stdout = io.BytesIO() + self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout) + self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8')) + return self._cpu_topology - dst_mac = ext_intf[tx_port_no]["virtual-interface"]["dst_mac"] - section_attr[1] = dst_mac + @property + def test_cores(self): + if not self._test_cores: + self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE) + return self._test_cores - # if addition file specified in prox config - if self.additional_file: - remote_name = self.remote_prox_file_name - for section_data in sections.values(): - for index, section_attr in enumerate(section_data): - try: - if section_attr[1].startswith("dofile"): - new_string = self._replace_quoted_with_value(section_attr[1], - remote_name) - section_attr[1] = new_string - except: - pass + @property + def latency_cores(self): + if not self._latency_cores: + self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE) + return self._latency_cores - return sections + @contextmanager + def traffic_context(self, pkt_size, value): + self.sut.stop_all() + self.sut.reset_stats() + try: + self.sut.set_pkt_size(self.test_cores, pkt_size) + self.sut.set_speed(self.test_cores, value) + self.sut.start_all() + yield + finally: + self.sut.stop_all() + + def get_cores(self, mode): + cores = [] + + for section_name, section in self.setup_helper.prox_config_data: + if not section_name.startswith("core"): + continue + + for key, value in section: + if key == "mode" and value == mode: + core_tuple = CoreSocketTuple(section_name) + core = core_tuple.find_in_topology(self.cpu_topology) + cores.append(core) + + return cores + + def run_test(self, pkt_size, duration, value, tolerated_loss=0.0): + data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss) + + with data_helper, self.traffic_context(pkt_size, value): + with data_helper.measure_tot_stats(): + time.sleep(duration) + # Getting statistics to calculate PPS at right speed.... + data_helper.capture_tsc_hz() + data_helper.latency = self.get_latency() + + return data_helper.result_tuple, data_helper.samples def get_latency(self): """ @@ -925,39 +1164,434 @@ class ProxResourceHelper(ClientResourceHelper): return self.sut.lat_stats(self._latency_cores) return [] - def _get_logical_if_name(self, vpci): - return self._vpci_to_if_name_map[vpci] + def terminate(self): + pass - def _connect(self, client=None): - """Run and connect to prox on the remote system """ - # De-allocating a large amount of hugepages takes some time. If a new - # PROX instance is started immediately after killing the previous one, - # it might not be able to allocate hugepages, because they are still - # being freed. Hence the -w switch. - # self.connection.execute("sudo killall -w Prox 2>/dev/null") - # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t - # -f ./handle_none-4.cfg" - # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir + - # "; " \ - # + "export RTE_TARGET=" + self._dpdk_target + ";" \ - # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50; - # sudo " \ - # + "./build/Prox " + prox_args - # log.debug("Starting PROX with command [%s]", prox_cmd) - # thread.start_new_thread(self.ssh_check_quit, (self, self._user, - # self._ip, prox_cmd)) - if client is None: - client = ProxSocketHelper() + def __getattr__(self, item): + return getattr(self.resource_helper, item) - # try connecting to Prox for 60s - for _ in range(RETRY_SECONDS): - time.sleep(RETRY_INTERVAL) - try: - client.connect(self._ip, PROX_PORT) - except (socket.gaierror, socket.error): + +class ProxMplsProfileHelper(ProxProfileHelper): + + __prox_profile_type__ = "MPLS tag/untag" + + def __init__(self, resource_helper): + super(ProxMplsProfileHelper, self).__init__(resource_helper) + self._cores_tuple = None + + @property + def mpls_cores(self): + if not self._cores_tuple: + self._cores_tuple = self.get_cores_mpls() + return self._cores_tuple + + @property + def tagged_cores(self): + return self.mpls_cores[0] + + @property + def plain_cores(self): + return self.mpls_cores[1] + + def get_cores_mpls(self): + cores_tagged = [] + cores_plain = [] + for section_name, section in self.resource_helper.setup_helper.prox_config_data: + if not section_name.startswith("core"): continue - else: - return client - msg = "Failed to connect to prox, please check if system {} accepts connections on port {}" - raise Exception(msg.format(self._ip, PROX_PORT)) + if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section): + continue + + for item_key, item_value in section: + if item_key != 'name': + continue + + if item_value.startswith("tag"): + core_tuple = CoreSocketTuple(section_name) + core_tag = core_tuple.find_in_topology(self.cpu_topology) + cores_tagged.append(core_tag) + + elif item_value.startswith("udp"): + core_tuple = CoreSocketTuple(section_name) + core_udp = core_tuple.find_in_topology(self.cpu_topology) + cores_plain.append(core_udp) + + return cores_tagged, cores_plain + + @contextmanager + def traffic_context(self, pkt_size, value): + self.sut.stop_all() + self.sut.reset_stats() + try: + self.sut.set_pkt_size(self.tagged_cores, pkt_size) + self.sut.set_pkt_size(self.plain_cores, pkt_size - 4) + self.sut.set_speed(self.tagged_cores, value) + ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20) + self.sut.set_speed(self.plain_cores, value * ratio) + self.sut.start_all() + yield + finally: + self.sut.stop_all() + + +class ProxBngProfileHelper(ProxProfileHelper): + + __prox_profile_type__ = "BNG gen" + + def __init__(self, resource_helper): + super(ProxBngProfileHelper, self).__init__(resource_helper) + self._cores_tuple = None + + @property + def bng_cores(self): + if not self._cores_tuple: + self._cores_tuple = self.get_cores_gen_bng_qos() + return self._cores_tuple + + @property + def cpe_cores(self): + return self.bng_cores[0] + + @property + def inet_cores(self): + return self.bng_cores[1] + + @property + def arp_cores(self): + return self.bng_cores[2] + + @property + def arp_task_cores(self): + return self.bng_cores[3] + + @property + def all_rx_cores(self): + return self.latency_cores + + def get_cores_gen_bng_qos(self): + cpe_cores = [] + inet_cores = [] + arp_cores = [] + arp_tasks_core = [0] + for section_name, section in self.resource_helper.setup_helper.prox_config_data: + if not section_name.startswith("core"): + continue + + if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section): + continue + + for item_key, item_value in section: + if item_key != 'name': + continue + + if item_value.startswith("cpe"): + core_tuple = CoreSocketTuple(section_name) + cpe_core = core_tuple.find_in_topology(self.cpu_topology) + cpe_cores.append(cpe_core) + + elif item_value.startswith("inet"): + core_tuple = CoreSocketTuple(section_name) + inet_core = core_tuple.find_in_topology(self.cpu_topology) + inet_cores.append(inet_core) + + elif item_value.startswith("arp"): + core_tuple = CoreSocketTuple(section_name) + arp_core = core_tuple.find_in_topology(self.cpu_topology) + arp_cores.append(arp_core) + + # We check the tasks/core separately + if item_value.startswith("arp_task"): + core_tuple = CoreSocketTuple(section_name) + arp_task_core = core_tuple.find_in_topology(self.cpu_topology) + arp_tasks_core.append(arp_task_core) + + return cpe_cores, inet_cores, arp_cores, arp_tasks_core + + @contextmanager + def traffic_context(self, pkt_size, value): + # Tester is sending packets at the required speed already after + # setup_test(). Just get the current statistics, sleep the required + # amount of time and calculate packet loss. + inet_pkt_size = pkt_size + cpe_pkt_size = pkt_size - 24 + ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20) + + curr_up_speed = curr_down_speed = 0 + max_up_speed = max_down_speed = value + if ratio < 1: + max_down_speed = value * ratio + else: + max_up_speed = value / ratio + + # Initialize cores + self.sut.stop_all() + time.sleep(0.5) + + # Flush any packets in the NIC RX buffers, otherwise the stats will be + # wrong. + self.sut.start(self.all_rx_cores) + time.sleep(0.5) + self.sut.stop(self.all_rx_cores) + time.sleep(0.5) + self.sut.reset_stats() + + self.sut.set_pkt_size(self.inet_cores, inet_pkt_size) + self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size) + + self.sut.reset_values(self.cpe_cores) + self.sut.reset_values(self.inet_cores) + + # Set correct IP and UDP lengths in packet headers + # CPE + # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4) + self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2) + # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4) + self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2) + + # INET + # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4) + self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2) + # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4) + self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2) + # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4) + self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2) + + # Sending ARP to initialize tables - need a few seconds of generation + # to make sure all CPEs are initialized + LOG.info("Initializing SUT: sending ARP packets") + self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores) + self.sut.set_speed(self.inet_cores, curr_up_speed) + self.sut.set_speed(self.cpe_cores, curr_down_speed) + self.sut.start(self.arp_cores) + time.sleep(4) + + # Ramp up the transmission speed. First go to the common speed, then + # increase steps for the faster one. + self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores) + + LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed) + + while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed): + # The min(..., ...) takes care of 1) floating point rounding errors + # that could make curr_*_speed to be slightly greater than + # max_*_speed and 2) max_*_speed not being an exact multiple of + # self._step_delta. + if curr_up_speed < max_up_speed: + curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed) + if curr_down_speed < max_down_speed: + curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed) + + self.sut.set_speed(self.inet_cores, curr_up_speed) + self.sut.set_speed(self.cpe_cores, curr_down_speed) + time.sleep(self.step_time) + + LOG.info("Target speeds reached. Starting real test.") + + yield + + self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores) + LOG.info("Test ended. Flushing NIC buffers") + self.sut.start(self.all_rx_cores) + time.sleep(3) + self.sut.stop(self.all_rx_cores) + + def run_test(self, pkt_size, duration, value, tolerated_loss=0.0): + data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss) + + with data_helper, self.traffic_context(pkt_size, value): + with data_helper.measure_tot_stats(): + time.sleep(duration) + # Getting statistics to calculate PPS at right speed.... + data_helper.capture_tsc_hz() + data_helper.latency = self.get_latency() + + return data_helper.result_tuple, data_helper.samples + + +class ProxVpeProfileHelper(ProxProfileHelper): + + __prox_profile_type__ = "vPE gen" + + def __init__(self, resource_helper): + super(ProxVpeProfileHelper, self).__init__(resource_helper) + self._cores_tuple = None + self._ports_tuple = None + + @property + def vpe_cores(self): + if not self._cores_tuple: + self._cores_tuple = self.get_cores_gen_vpe() + return self._cores_tuple + + @property + def cpe_cores(self): + return self.vpe_cores[0] + + @property + def inet_cores(self): + return self.vpe_cores[1] + + @property + def all_rx_cores(self): + return self.latency_cores + + @property + def vpe_ports(self): + if not self._ports_tuple: + self._ports_tuple = self.get_ports_gen_vpe() + return self._ports_tuple + + @property + def cpe_ports(self): + return self.vpe_ports[0] + + @property + def inet_ports(self): + return self.vpe_ports[1] + + def get_cores_gen_vpe(self): + cpe_cores = [] + inet_cores = [] + for section_name, section in self.resource_helper.setup_helper.prox_config_data: + if not section_name.startswith("core"): + continue + + if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section): + continue + + for item_key, item_value in section: + if item_key != 'name': + continue + + if item_value.startswith("cpe"): + core_tuple = CoreSocketTuple(section_name) + core_tag = core_tuple.find_in_topology(self.cpu_topology) + cpe_cores.append(core_tag) + + elif item_value.startswith("inet"): + core_tuple = CoreSocketTuple(section_name) + inet_core = core_tuple.find_in_topology(self.cpu_topology) + inet_cores.append(inet_core) + + return cpe_cores, inet_cores + + def get_ports_gen_vpe(self): + cpe_ports = [] + inet_ports = [] + + for section_name, section in self.resource_helper.setup_helper.prox_config_data: + if not section_name.startswith("port"): + continue + tx_port_iter = re.finditer(r'\d+', section_name) + tx_port_no = int(next(tx_port_iter).group(0)) + + for item_key, item_value in section: + if item_key != 'name': + continue + + for item_key, item_value in section: + if item_value.startswith("cpe"): + cpe_ports.append(tx_port_no) + + elif item_value.startswith("inet"): + inet_ports.append(tx_port_no) + + return cpe_ports, inet_ports + + @contextmanager + def traffic_context(self, pkt_size, value): + # Calculate the target upload and download speed. The upload and + # download packets have different packet sizes, so in order to get + # equal bandwidth usage, the ratio of the speeds has to match the ratio + # of the packet sizes. + cpe_pkt_size = pkt_size + inet_pkt_size = pkt_size - 4 + ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20) + + curr_up_speed = curr_down_speed = 0 + max_up_speed = max_down_speed = value + if ratio < 1: + max_down_speed = value * ratio + else: + max_up_speed = value / ratio + + # Adjust speed when multiple cores per port are used to generate traffic + if len(self.cpe_ports) != len(self.cpe_cores): + max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores) + if len(self.inet_ports) != len(self.inet_cores): + max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores) + + # Initialize cores + self.sut.stop_all() + time.sleep(2) + + # Flush any packets in the NIC RX buffers, otherwise the stats will be + # wrong. + self.sut.start(self.all_rx_cores) + time.sleep(2) + self.sut.stop(self.all_rx_cores) + time.sleep(2) + self.sut.reset_stats() + + self.sut.set_pkt_size(self.inet_cores, inet_pkt_size) + self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size) + + self.sut.reset_values(self.cpe_cores) + self.sut.reset_values(self.inet_cores) + + # Set correct IP and UDP lengths in packet headers + # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4) + self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2) + # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4) + self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2) + + # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4) + self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2) + # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4) + self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2) + + self.sut.set_speed(self.inet_cores, curr_up_speed) + self.sut.set_speed(self.cpe_cores, curr_down_speed) + + # Ramp up the transmission speed. First go to the common speed, then + # increase steps for the faster one. + self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores) + + LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed) + + while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed): + # The min(..., ...) takes care of 1) floating point rounding errors + # that could make curr_*_speed to be slightly greater than + # max_*_speed and 2) max_*_speed not being an exact multiple of + # self._step_delta. + if curr_up_speed < max_up_speed: + curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed) + if curr_down_speed < max_down_speed: + curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed) + + self.sut.set_speed(self.inet_cores, curr_up_speed) + self.sut.set_speed(self.cpe_cores, curr_down_speed) + time.sleep(self.step_time) + + LOG.info("Target speeds reached. Starting real test.") + + yield + + self.sut.stop(self.cpe_cores + self.inet_cores) + LOG.info("Test ended. Flushing NIC buffers") + self.sut.start(self.all_rx_cores) + time.sleep(3) + self.sut.stop(self.all_rx_cores) + + def run_test(self, pkt_size, duration, value, tolerated_loss=0.0): + data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss) + + with data_helper, self.traffic_context(pkt_size, value): + with data_helper.measure_tot_stats(): + time.sleep(duration) + # Getting statistics to calculate PPS at right speed.... + data_helper.capture_tsc_hz() + data_helper.latency = self.get_latency() + + return data_helper.result_tuple, data_helper.samples