import array
import operator
import logging
+import io
import os
import re
import select
import socket
+
from collections import OrderedDict, namedtuple
import time
from contextlib import contextmanager
from itertools import repeat, chain
+from multiprocessing import Queue
+import six
from six.moves import zip, StringIO
+from six.moves import cStringIO
from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
-from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings
+from yardstick.common import utils
+from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
PROX_PORT = 8474
+SECTION_NAME = 0
+SECTION_CONTENTS = 1
+
LOG = logging.getLogger(__name__)
+LOG.setLevel(logging.DEBUG)
TEN_GIGABIT = 1e10
BITS_PER_BYTE = 8
class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
-
- CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?")
+ CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
def __new__(cls, *args):
try:
if matches:
args = matches.groups()
- return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), int(args[1]),
+ return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
'h' if args[2] else '')
except (AttributeError, TypeError, IndexError, ValueError):
class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
-
def __new__(cls, *args):
try:
assert args[0] is not str(args[0])
class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
'delta_tx,delta_tsc,'
'latency,rx_total,tx_total,pps')):
-
@property
def pkt_loss(self):
try:
def success(self):
return self.drop_total <= self.can_be_lost
- def get_samples(self, pkt_size, pkt_loss=None):
+ def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
if pkt_loss is None:
pkt_loss = self.pkt_loss
+ if port_samples is None:
+ port_samples = {}
+
latency_keys = [
"LatencyMin",
"LatencyMax",
"RxThroughput": self.mpps,
"PktSize": pkt_size,
}
+ if port_samples:
+ samples.update(port_samples)
samples.update((key, value) for key, value in zip(latency_keys, self.latency))
return samples
class PacketDump(object):
-
@staticmethod
def assert_func(func, value1, value2, template=None):
assert func(value1, value2), template.format(value1, value2)
self._sock = sock
self._pkt_dumps = []
+ self.master_stats = None
def connect(self, ip, port):
"""Connect to the prox instance on the remote system"""
def get_data(self, pkt_dump_only=False, timeout=1):
""" read data from the socket """
+
# This method behaves slightly differently depending on whether it is
# called to read the response to a command (pkt_dump_only = 0) or if
# it is called specifically to read a packet dump (pkt_dump_only = 1).
status = False
ret_str = ""
for status in iter(is_ready, False):
- LOG.debug("Reading from socket")
decoded_data = self._sock.recv(256).decode('utf-8')
ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
def put_command(self, to_send):
""" send data to the remote instance """
LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
- self._sock.sendall(to_send.encode('utf-8'))
+ try:
+ # TODO: sendall will block, we need a timeout
+ self._sock.sendall(to_send.encode('utf-8'))
+ except:
+ pass
def get_packet_dump(self):
""" get the next packet dump """
LOG.debug("Set value for core(s) %s", cores)
self._run_template_over_cores("reset values {} 0\n", cores)
- def set_speed(self, cores, speed):
+ def set_speed(self, cores, speed, tasks=None):
""" set speed on the remote instance """
- LOG.debug("Set speed for core(s) %s to %g", cores, speed)
- self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
+ if tasks is None:
+ tasks = [0] * len(cores)
+ elif len(tasks) != len(cores):
+ LOG.error("set_speed: cores and tasks must have the same len")
+ LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
+ for (core, task) in list(zip(cores, tasks)):
+ self.put_command("speed {} {} {}\n".format(core, task, speed))
def slope_speed(self, cores_speed, duration, n_steps=0):
"""will start to increase speed from 0 to N where N is taken from
def get_all_tot_stats(self):
self.put_command("tot stats\n")
- all_stats = TotStatsTuple(int(v) for v in self.get_data().split(","))
+ all_stats_str = self.get_data().split(",")
+ if len(all_stats_str) != 4:
+ all_stats = [0] * 4
+ return all_stats
+ all_stats = TotStatsTuple(int(v) for v in all_stats_str)
+ self.master_stats = all_stats
return all_stats
def hz(self):
- return self.get_all_tot_stats().hz
+ return self.get_all_tot_stats()[3]
# Deprecated
# TODO: remove
def port_stats(self, ports):
"""get counter values from a specific port"""
- tot_result = list(repeat(0, 12))
+ tot_result = [0] * 12
for port in ports:
self.put_command("port_stats {}\n".format(port))
- for index, n in enumerate(self.get_data().split(',')):
- tot_result[index] += int(n)
+ ret = [try_int(s, 0) for s in self.get_data().split(",")]
+ tot_result = [sum(x) for x in zip(tot_result, ret)]
return tot_result
@contextmanager
"""Activate dump on rx on the specified core"""
LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
- time.sleep(1.5) # Give PROX time to set up packet dumping
+ time.sleep(1.5) # Give PROX time to set up packet dumping
def quit(self):
self.stop_all()
time.sleep(3)
-class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
-
- def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
- super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
- self.dpdk_root = "/root/dpdk-17.02"
-
- def setup_vnf_environment(self):
- super(ProxDpdkVnfSetupEnvHelper, self).setup_vnf_environment()
-
- # debug dump after binding
- self.ssh_helper.execute("sudo {} -s".format(self.dpdk_nic_bind))
-
- def rebind_drivers(self, force=True):
- if force:
- force = '--force '
- else:
- force = ''
- cmd_template = "{} {}-b {} {}"
- if not self.used_drivers:
- self._find_used_drivers()
- for vpci, (_, driver) in self.used_drivers.items():
- self.ssh_helper.execute(cmd_template.format(self.dpdk_nic_bind, force, driver, vpci))
-
- def _setup_dpdk(self):
- self._setup_hugepages()
-
- self.ssh_helper.execute("pkill prox")
- self.ssh_helper.execute("sudo modprobe uio")
-
- # for baremetal
- self.ssh_helper.execute("sudo modprobe msr")
-
- # why remove?, just keep it loaded
- # self.connection.execute("sudo rmmod igb_uio")
-
- igb_uio_path = os.path.join(self.dpdk_root, "x86_64-native-linuxapp-gcc/kmod/igb_uio.ko")
- self.ssh_helper.execute("sudo insmod {}".format(igb_uio_path))
-
- # quick hack to allow non-root copy
- self.ssh_helper.execute("sudo chmod 0777 {}".format(self.ssh_helper.bin_path))
+_LOCAL_OBJECT = object()
-class ProxResourceHelper(ClientResourceHelper):
-
- PROX_CORE_GEN_MODE = "gen"
- PROX_CORE_LAT_MODE = "lat"
-
- PROX_MODE = ""
+class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
+ # the actual app is lowercase
+ APP_NAME = 'prox'
+ # not used for Prox but added for consistency
+ VNF_TYPE = "PROX"
LUA_PARAMETER_NAME = ""
LUA_PARAMETER_PEER = {
"sut": "gen",
}
- WAIT_TIME = 3
+ CONFIG_QUEUE_TIMEOUT = 120
- @staticmethod
- def _replace_quoted_with_value(quoted, value, count=1):
- new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
- return new_string
+ def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
+ self.remote_path = None
+ super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
+ self.remote_prox_file_name = None
+ self._prox_config_data = None
+ self.additional_files = {}
+ self.config_queue = Queue()
+ # allow_exit_without_flush
+ self.config_queue.cancel_join_thread()
+ self._global_section = None
+
+ @property
+ def prox_config_data(self):
+ if self._prox_config_data is None:
+ # this will block, but it needs too
+ self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
+ return self._prox_config_data
+
+ @property
+ def global_section(self):
+ if self._global_section is None and self.prox_config_data:
+ self._global_section = self.find_section("global")
+ return self._global_section
+
+ def find_section(self, name, default=_LOCAL_OBJECT):
+ result = next((value for key, value in self.prox_config_data if key == name), default)
+ if result is _LOCAL_OBJECT:
+ raise KeyError('{} not found in Prox config'.format(name))
+ return result
+
+ def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
+ section = self.find_section(section_name, [])
+ result = next((value for key, value in section if key == section_key), default)
+ if result is _LOCAL_OBJECT:
+ template = '{} not found in {} section of Prox config'
+ raise KeyError(template.format(section_key, section_name))
+ return result
+
+ def _build_pipeline_kwargs(self):
+ tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
+ self.pipeline_kwargs = {
+ 'tool_path': tool_path,
+ 'tool_dir': os.path.dirname(tool_path),
+ }
+
+ def copy_to_target(self, config_file_path, prox_file):
+ remote_path = os.path.join("/tmp", prox_file)
+ self.ssh_helper.put(config_file_path, remote_path)
+ return remote_path
@staticmethod
def _get_tx_port(section, sections):
return int(iface_port[0])
@staticmethod
- def line_rate_to_pps(pkt_size, n_ports):
- # FIXME Don't hardcode 10Gb/s
- return n_ports * TEN_GIGABIT / BITS_PER_BYTE / (pkt_size + 20)
+ def _replace_quoted_with_value(quoted, value, count=1):
+ new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
+ return new_string
- @staticmethod
- def find_pci(pci, bound_pci):
- # we have to substring match PCI bus address from the end
- return any(b.endswith(pci) for b in bound_pci)
+ def _insert_additional_file(self, value):
+ file_str = value.split('"')
+ base_name = os.path.basename(file_str[1])
+ file_str[1] = self.additional_files[base_name]
+ return '"'.join(file_str)
+
+ def generate_prox_config_file(self, config_path):
+ sections = []
+ prox_config = ConfigParser(config_path, sections)
+ prox_config.parse()
+
+ # Ensure MAC is set "hardware"
+ all_ports = self.vnfd_helper.port_pairs.all_ports
+ # use dpdk port number
+ for port_name in all_ports:
+ port_num = self.vnfd_helper.port_num(port_name)
+ port_section_name = "port {}".format(port_num)
+ for section_name, section in sections:
+ if port_section_name != section_name:
+ continue
+
+ for index, section_data in enumerate(section):
+ if section_data[0] == "mac":
+ section_data[1] = "hardware"
+
+ # search for dst mac
+ for _, section in sections:
+ # for index, (item_key, item_val) in enumerate(section):
+ for index, section_data in enumerate(section):
+ item_key, item_val = section_data
+ if item_val.startswith("@@dst_mac"):
+ tx_port_iter = re.finditer(r'\d+', item_val)
+ tx_port_no = int(next(tx_port_iter).group(0))
+ intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
+ mac = intf["virtual-interface"]["dst_mac"]
+ section_data[1] = mac.replace(":", " ", 6)
+
+ if item_key == "dst mac" and item_val.startswith("@@"):
+ tx_port_iter = re.finditer(r'\d+', item_val)
+ tx_port_no = int(next(tx_port_iter).group(0))
+ intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
+ mac = intf["virtual-interface"]["dst_mac"]
+ section_data[1] = mac
+
+ # if addition file specified in prox config
+ if not self.additional_files:
+ return sections
+
+ for section_name, section in sections:
+ for index, section_data in enumerate(section):
+ try:
+ if section_data[0].startswith("dofile"):
+ section_data[0] = self._insert_additional_file(section_data[0])
+
+ if section_data[1].startswith("dofile"):
+ section_data[1] = self._insert_additional_file(section_data[1])
+ except:
+ pass
+
+ return sections
@staticmethod
def write_prox_config(prox_config):
a custom method
"""
out = []
- for section_name, section_value in prox_config.items():
+ for i, (section_name, section) in enumerate(prox_config):
out.append("[{}]".format(section_name))
- for key, value in section_value:
+ for index, item in enumerate(section):
+ key, value = item
if key == "__name__":
continue
- if value is not None:
+ if value is not None and value != '@':
key = "=".join((key, str(value).replace('\n', '\n\t')))
- out.append(key)
+ out.append(key)
+ else:
+ key = str(key).replace('\n', '\n\t')
+ out.append(key)
return os.linesep.join(out)
+ def put_string_to_file(self, s, remote_path):
+ file_obj = cStringIO(s)
+ self.ssh_helper.put_file_obj(file_obj, remote_path)
+ return remote_path
+
+ def generate_prox_lua_file(self):
+ p = OrderedDict()
+ all_ports = self.vnfd_helper.port_pairs.all_ports
+ lua_param = self.LUA_PARAMETER_NAME
+ for port_name in all_ports:
+ peer = self.LUA_PARAMETER_PEER[lua_param]
+ port_num = self.vnfd_helper.port_num(port_name)
+ intf = self.vnfd_helper.find_interface(name=port_name)
+ vintf = intf['virtual-interface']
+ local_ip = vintf["local_ip"]
+ dst_ip = vintf["dst_ip"]
+ local_ip_hex = ip_to_hex(local_ip, separator=' ')
+ dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
+ p.update([
+ ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
+ ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
+ ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
+ ("{}_ip_port_{}".format(peer, port_num), dst_ip),
+ ])
+ lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
+ return lua
+
+ def upload_prox_lua(self, config_dir, prox_config_data):
+ # we could have multiple lua directives
+ lau_dict = prox_config_data.get('lua', {})
+ find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
+ lua_file = next((found[0] for found in find_iter if found), None)
+ if not lua_file:
+ return ""
+
+ out = self.generate_prox_lua_file()
+ remote_path = os.path.join(config_dir, lua_file)
+ return self.put_string_to_file(out, remote_path)
+
+ def upload_prox_config(self, config_file, prox_config_data):
+ # prox can't handle spaces around ' = ' so use custom method
+ out = StringIO(self.write_prox_config(prox_config_data))
+ out.seek(0)
+ remote_path = os.path.join("/tmp", config_file)
+ self.ssh_helper.put_file_obj(out, remote_path)
+
+ return remote_path
+
+ def build_config_file(self):
+ task_path = self.scenario_helper.task_path
+ options = self.scenario_helper.options
+ config_path = options['prox_config']
+ config_file = os.path.basename(config_path)
+ config_path = find_relative_file(config_path, task_path)
+ self.additional_files = {}
+
+ prox_files = options.get('prox_files', [])
+ if isinstance(prox_files, six.string_types):
+ prox_files = [prox_files]
+ for key_prox_file in prox_files:
+ base_prox_file = os.path.basename(key_prox_file)
+ key_prox_path = find_relative_file(key_prox_file, task_path)
+ remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
+ self.additional_files[base_prox_file] = remote_prox_file
+
+ self._prox_config_data = self.generate_prox_config_file(config_path)
+ # copy config to queue so we can read it from traffic_runner process
+ self.config_queue.put(self._prox_config_data)
+ self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
+
+ def build_config(self):
+ self.build_config_file()
+
+ options = self.scenario_helper.options
+
+ prox_args = options['prox_args']
+ LOG.info("Provision and start the %s", self.APP_NAME)
+ self._build_pipeline_kwargs()
+ self.pipeline_kwargs["args"] = " ".join(
+ " ".join([k, v if v else ""]) for k, v in prox_args.items())
+ self.pipeline_kwargs["cfg_file"] = self.remote_path
+
+ cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
+ prox_cmd = cmd_template.format(**self.pipeline_kwargs)
+ return prox_cmd
+
+
+# this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
+class ProxResourceHelper(ClientResourceHelper):
+
+ RESOURCE_WORD = 'prox'
+
+ PROX_MODE = ""
+
+ WAIT_TIME = 3
+
+ @staticmethod
+ def find_pci(pci, bound_pci):
+ # we have to substring match PCI bus address from the end
+ return any(b.endswith(pci) for b in bound_pci)
+
def __init__(self, setup_helper):
super(ProxResourceHelper, self).__init__(setup_helper)
self.mgmt_interface = self.vnfd_helper.mgmt_interface
self._ip = self.mgmt_interface["ip"]
self.done = False
- self._cpu_topology = None
self._vpci_to_if_name_map = None
- self.additional_file = False
+ self.additional_file = {}
self.remote_prox_file_name = None
- self.prox_config_dict = None
self.lower = None
self.upper = None
- self._test_cores = None
- self._latency_cores = None
+ self.step_delta = 1
+ self.step_time = 0.5
+ self._test_type = None
@property
def sut(self):
if not self.client:
- self.client = ProxSocketHelper()
+ self.client = self._connect()
return self.client
@property
- def cpu_topology(self):
- if not self._cpu_topology:
- stdout = self.ssh_helper.execute("cat /proc/cpuinfo")[1]
- self._cpu_topology = SocketTopology.parse_cpuinfo(stdout)
- return self._cpu_topology
-
- @property
- def vpci_to_if_name_map(self):
- if self._vpci_to_if_name_map is None:
- self._vpci_to_if_name_map = {
- interface["virtual-interface"]["vpci"]: interface["name"]
- for interface in self.vnfd_helper.interfaces
- }
- return self._vpci_to_if_name_map
-
- @property
- def test_cores(self):
- if not self._test_cores:
- self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
- return self._test_cores
-
- @property
- def latency_cores(self):
- if not self._latency_cores:
- self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
- return self._latency_cores
+ def test_type(self):
+ if self._test_type is None:
+ self._test_type = self.setup_helper.find_in_section('global', 'name', None)
+ return self._test_type
def run_traffic(self, traffic_profile):
+ self._queue.cancel_join_thread()
self.lower = 0.0
self.upper = 100.0
self._run_traffic_once(traffic_profile)
def _run_traffic_once(self, traffic_profile):
- traffic_profile.execute(self)
+ traffic_profile.execute_traffic(self)
if traffic_profile.done:
self._queue.put({'done': True})
LOG.debug("tg_prox done")
self._terminated.value = 1
- def start_collect(self):
- pass
-
- def terminate(self):
- super(ProxResourceHelper, self).terminate()
- self.ssh_helper.execute('sudo pkill prox')
- self.setup_helper.rebind_drivers()
-
- def get_process_args(self):
- task_path = self.scenario_helper.task_path
- options = self.scenario_helper.options
-
- prox_args = options['prox_args']
- prox_path = options['prox_path']
- config_path = options['prox_config']
-
- config_file = os.path.basename(config_path)
- config_path = find_relative_file(config_path, task_path)
+ # For VNF use ResourceHelper method to collect KPIs directly.
+ # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
+ def collect_collectd_kpi(self):
+ return self._collect_resource_kpi()
- try:
- prox_file_config_path = options['prox_files']
- prox_file_file = os.path.basename(prox_file_config_path)
- prox_file_config_path = find_relative_file(prox_file_config_path, task_path)
- self.remote_prox_file_name = self.copy_to_target(prox_file_config_path, prox_file_file)
- self.additional_file = True
- except:
- self.additional_file = False
-
- self.prox_config_dict = self.generate_prox_config_file(config_path)
+ def collect_kpi(self):
+ result = super(ProxResourceHelper, self).collect_kpi()
+ # add in collectd kpis manually
+ if result:
+ result['collect_stats'] = self._collect_resource_kpi()
+ return result
- remote_path = self.upload_prox_config(config_file, self.prox_config_dict)
- return prox_args, prox_path, remote_path
+ def terminate(self):
+ # should not be called, use VNF terminate
+ raise NotImplementedError()
def up_post(self):
return self.sut # force connection
if func:
return func(*args, **kwargs)
- def copy_to_target(self, config_file_path, prox_file):
- remote_path = os.path.join("/tmp", prox_file)
- self.ssh_helper.put(config_file_path, remote_path)
- return remote_path
+ def _connect(self, client=None):
+ """Run and connect to prox on the remote system """
+ # De-allocating a large amount of hugepages takes some time. If a new
+ # PROX instance is started immediately after killing the previous one,
+ # it might not be able to allocate hugepages, because they are still
+ # being freed. Hence the -w switch.
+ # self.connection.execute("sudo killall -w Prox 2>/dev/null")
+ # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
+ # -f ./handle_none-4.cfg"
+ # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
+ # "; " \
+ # + "export RTE_TARGET=" + self._dpdk_target + ";" \
+ # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
+ # sudo " \
+ # + "./build/Prox " + prox_args
+ # log.debug("Starting PROX with command [%s]", prox_cmd)
+ # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
+ # self._ip, prox_cmd))
+ if client is None:
+ client = ProxSocketHelper()
- def upload_prox_config(self, config_file, prox_config_dict):
- # prox can't handle spaces around ' = ' so use custom method
- out = StringIO(self.write_prox_config(prox_config_dict))
- out.seek(0)
- remote_path = os.path.join("/tmp", config_file)
- self.ssh_helper.put_file_obj(out, remote_path)
+ # try connecting to Prox for 60s
+ for _ in range(RETRY_SECONDS):
+ time.sleep(RETRY_INTERVAL)
+ try:
+ client.connect(self._ip, PROX_PORT)
+ except (socket.gaierror, socket.error):
+ continue
+ else:
+ return client
- return remote_path
+ msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
+ raise Exception(msg.format(self._ip, PROX_PORT))
+
+
+class ProxDataHelper(object):
+
+ def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
+ super(ProxDataHelper, self).__init__()
+ self.vnfd_helper = vnfd_helper
+ self.sut = sut
+ self.pkt_size = pkt_size
+ self.value = value
+ self.tolerated_loss = tolerated_loss
+ self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
+ self.tsc_hz = None
+ self.measured_stats = None
+ self.latency = None
+ self._totals_and_pps = None
+ self.result_tuple = None
+
+ @property
+ def totals_and_pps(self):
+ if self._totals_and_pps is None:
+ rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
+ pps = self.value / 100.0 * self.line_rate_to_pps()
+ self._totals_and_pps = rx_total, tx_total, pps
+ return self._totals_and_pps
+
+ @property
+ def rx_total(self):
+ return self.totals_and_pps[0]
+
+ @property
+ def tx_total(self):
+ return self.totals_and_pps[1]
+
+ @property
+ def pps(self):
+ return self.totals_and_pps[2]
+
+ @property
+ def samples(self):
+ samples = {}
+ for port_name, port_num in self.vnfd_helper.ports_iter():
+ port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
+ samples[port_name] = {
+ "in_packets": port_rx_total,
+ "out_packets": port_tx_total,
+ }
+ return samples
+
+ def __enter__(self):
+ self.check_interface_count()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.make_tuple()
+
+ def make_tuple(self):
+ if self.result_tuple:
+ return
+
+ self.result_tuple = ProxTestDataTuple(
+ self.tolerated_loss,
+ self.tsc_hz,
+ self.measured_stats['delta'].rx,
+ self.measured_stats['delta'].tx,
+ self.measured_stats['delta'].tsc,
+ self.latency,
+ self.rx_total,
+ self.tx_total,
+ self.pps,
+ )
+ self.result_tuple.log_data()
@contextmanager
- def traffic_context(self, pkt_size, value):
- self.sut.stop_all()
- self.sut.reset_stats()
- self.sut.set_pkt_size(self.test_cores, pkt_size)
- self.sut.set_speed(self.test_cores, value)
- self.sut.start_all()
- try:
+ def measure_tot_stats(self):
+ with self.sut.measure_tot_stats() as self.measured_stats:
yield
- finally:
- self.sut.stop_all()
- def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
+ def check_interface_count(self):
# do this assert in init? unless we expect interface count to
# change from one run to another run...
- interfaces = self.vnfd_helper.interfaces
- interface_count = len(interfaces)
- assert interface_count in {2, 4}, \
- "Invalid no of ports, 2 or 4 ports only supported at this time"
-
- with self.traffic_context(pkt_size, value):
- # Getting statistics to calculate PPS at right speed....
- tsc_hz = float(self.sut.hz())
- time.sleep(2)
- with self.sut.measure_tot_stats() as data:
- time.sleep(duration)
+ assert self.port_count in {1, 2, 4}, \
+ "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
- # Get stats before stopping the cores. Stopping cores takes some time
- # and might skew results otherwise.
- latency = self.get_latency()
+ def capture_tsc_hz(self):
+ self.tsc_hz = float(self.sut.hz())
- deltas = data['delta']
- rx_total, tx_total = self.sut.port_stats(range(interface_count))[6:8]
- pps = value / 100.0 * self.line_rate_to_pps(pkt_size, interface_count)
+ def line_rate_to_pps(self):
+ # FIXME Don't hardcode 10Gb/s
+ return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
- result = ProxTestDataTuple(tolerated_loss, tsc_hz, deltas.rx, deltas.tx,
- deltas.tsc, latency, rx_total, tx_total, pps)
- result.log_data()
- return result
+class ProxProfileHelper(object):
- def get_cores(self, mode):
- cores = []
- for section_name, section_data in self.prox_config_dict.items():
- if section_name.startswith("core"):
- for index, item in enumerate(section_data):
- if item[0] == "mode" and item[1] == mode:
- core = CoreSocketTuple(section_name).find_in_topology(self.cpu_topology)
- cores.append(core)
- return cores
+ __prox_profile_type__ = "Generic"
- def upload_prox_lua(self, config_dir, prox_config_dict):
- # we could have multiple lua directives
- lau_dict = prox_config_dict.get('lua', {})
- find_iter = (re.findall('\("([^"]+)"\)', k) for k in lau_dict)
- lua_file = next((found[0] for found in find_iter if found), None)
- if not lua_file:
- return ""
+ PROX_CORE_GEN_MODE = "gen"
+ PROX_CORE_LAT_MODE = "lat"
- out = self.generate_prox_lua_file()
- remote_path = os.path.join(config_dir, lua_file)
- return self.put_string_to_file(out, remote_path)
+ @classmethod
+ def get_cls(cls, helper_type):
+ """Return class of specified type."""
+ if not helper_type:
+ return ProxProfileHelper
- def put_string_to_file(self, s, remote_path):
- self.ssh_helper.run("cat > '{}'".format(remote_path), stdin=s)
- return remote_path
+ for profile_helper_class in utils.itersubclasses(cls):
+ if helper_type == profile_helper_class.__prox_profile_type__:
+ return profile_helper_class
- def generate_prox_lua_file(self):
- p = OrderedDict()
- ext_intf = self.vnfd_helper.interfaces
- lua_param = self.LUA_PARAMETER_NAME
- for intf in ext_intf:
- peer = self.LUA_PARAMETER_PEER[lua_param]
- port_num = intf["virtual-interface"]["dpdk_port_num"]
- local_ip = intf["local_ip"]
- dst_ip = intf["dst_ip"]
- local_ip_hex = ip_to_hex(local_ip, separator=' ')
- dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
- p.update([
- ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
- ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
- ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
- ("{}_ip_port_{}".format(peer, port_num), dst_ip),
- ])
- lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
- return lua
+ return ProxProfileHelper
- def generate_prox_config_file(self, config_path):
- sections = {}
- prox_config = ConfigParser(config_path, sections)
- prox_config.parse()
+ @classmethod
+ def make_profile_helper(cls, resource_helper):
+ return cls.get_cls(resource_helper.test_type)(resource_helper)
- # Ensure MAC is set "hardware"
- ext_intf = self.vnfd_helper.interfaces
- for intf in ext_intf:
- port_num = intf["virtual-interface"]["dpdk_port_num"]
- section_name = "port {}".format(port_num)
- for index, section_data in enumerate(sections.get(section_name, [])):
- if section_data[0] == "mac":
- sections[section_name][index][1] = "hardware"
-
- # search for dest mac
- for section_name, section_data in sections.items():
- for index, section_attr in enumerate(section_data):
- if section_attr[0] != "dst mac":
- continue
+ def __init__(self, resource_helper):
+ super(ProxProfileHelper, self).__init__()
+ self.resource_helper = resource_helper
+ self._cpu_topology = None
+ self._test_cores = None
+ self._latency_cores = None
- tx_port_no = self._get_tx_port(section_name, sections)
- if tx_port_no == -1:
- raise Exception("Failed ..destination MAC undefined")
+ @property
+ def cpu_topology(self):
+ if not self._cpu_topology:
+ stdout = io.BytesIO()
+ self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
+ self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
+ return self._cpu_topology
- dst_mac = ext_intf[tx_port_no]["virtual-interface"]["dst_mac"]
- section_attr[1] = dst_mac
+ @property
+ def test_cores(self):
+ if not self._test_cores:
+ self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
+ return self._test_cores
- # if addition file specified in prox config
- if self.additional_file:
- remote_name = self.remote_prox_file_name
- for section_data in sections.values():
- for index, section_attr in enumerate(section_data):
- try:
- if section_attr[1].startswith("dofile"):
- new_string = self._replace_quoted_with_value(section_attr[1],
- remote_name)
- section_attr[1] = new_string
- except:
- pass
+ @property
+ def latency_cores(self):
+ if not self._latency_cores:
+ self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
+ return self._latency_cores
- return sections
+ @contextmanager
+ def traffic_context(self, pkt_size, value):
+ self.sut.stop_all()
+ self.sut.reset_stats()
+ try:
+ self.sut.set_pkt_size(self.test_cores, pkt_size)
+ self.sut.set_speed(self.test_cores, value)
+ self.sut.start_all()
+ yield
+ finally:
+ self.sut.stop_all()
+
+ def get_cores(self, mode):
+ cores = []
+
+ for section_name, section in self.setup_helper.prox_config_data:
+ if not section_name.startswith("core"):
+ continue
+
+ for key, value in section:
+ if key == "mode" and value == mode:
+ core_tuple = CoreSocketTuple(section_name)
+ core = core_tuple.find_in_topology(self.cpu_topology)
+ cores.append(core)
+
+ return cores
+
+ def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
+ data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
+
+ with data_helper, self.traffic_context(pkt_size, value):
+ with data_helper.measure_tot_stats():
+ time.sleep(duration)
+ # Getting statistics to calculate PPS at right speed....
+ data_helper.capture_tsc_hz()
+ data_helper.latency = self.get_latency()
+
+ return data_helper.result_tuple, data_helper.samples
def get_latency(self):
"""
return self.sut.lat_stats(self._latency_cores)
return []
- def _get_logical_if_name(self, vpci):
- return self._vpci_to_if_name_map[vpci]
+ def terminate(self):
+ pass
- def _connect(self, client=None):
- """Run and connect to prox on the remote system """
- # De-allocating a large amount of hugepages takes some time. If a new
- # PROX instance is started immediately after killing the previous one,
- # it might not be able to allocate hugepages, because they are still
- # being freed. Hence the -w switch.
- # self.connection.execute("sudo killall -w Prox 2>/dev/null")
- # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
- # -f ./handle_none-4.cfg"
- # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
- # "; " \
- # + "export RTE_TARGET=" + self._dpdk_target + ";" \
- # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
- # sudo " \
- # + "./build/Prox " + prox_args
- # log.debug("Starting PROX with command [%s]", prox_cmd)
- # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
- # self._ip, prox_cmd))
- if client is None:
- client = ProxSocketHelper()
+ def __getattr__(self, item):
+ return getattr(self.resource_helper, item)
- # try connecting to Prox for 60s
- for _ in range(RETRY_SECONDS):
- time.sleep(RETRY_INTERVAL)
- try:
- client.connect(self._ip, PROX_PORT)
- except (socket.gaierror, socket.error):
+
+class ProxMplsProfileHelper(ProxProfileHelper):
+
+ __prox_profile_type__ = "MPLS tag/untag"
+
+ def __init__(self, resource_helper):
+ super(ProxMplsProfileHelper, self).__init__(resource_helper)
+ self._cores_tuple = None
+
+ @property
+ def mpls_cores(self):
+ if not self._cores_tuple:
+ self._cores_tuple = self.get_cores_mpls()
+ return self._cores_tuple
+
+ @property
+ def tagged_cores(self):
+ return self.mpls_cores[0]
+
+ @property
+ def plain_cores(self):
+ return self.mpls_cores[1]
+
+ def get_cores_mpls(self):
+ cores_tagged = []
+ cores_plain = []
+ for section_name, section in self.resource_helper.setup_helper.prox_config_data:
+ if not section_name.startswith("core"):
continue
- else:
- return client
- msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
- raise Exception(msg.format(self._ip, PROX_PORT))
+ if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
+ continue
+
+ for item_key, item_value in section:
+ if item_key != 'name':
+ continue
+
+ if item_value.startswith("tag"):
+ core_tuple = CoreSocketTuple(section_name)
+ core_tag = core_tuple.find_in_topology(self.cpu_topology)
+ cores_tagged.append(core_tag)
+
+ elif item_value.startswith("udp"):
+ core_tuple = CoreSocketTuple(section_name)
+ core_udp = core_tuple.find_in_topology(self.cpu_topology)
+ cores_plain.append(core_udp)
+
+ return cores_tagged, cores_plain
+
+ @contextmanager
+ def traffic_context(self, pkt_size, value):
+ self.sut.stop_all()
+ self.sut.reset_stats()
+ try:
+ self.sut.set_pkt_size(self.tagged_cores, pkt_size)
+ self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
+ self.sut.set_speed(self.tagged_cores, value)
+ ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
+ self.sut.set_speed(self.plain_cores, value * ratio)
+ self.sut.start_all()
+ yield
+ finally:
+ self.sut.stop_all()
+
+
+class ProxBngProfileHelper(ProxProfileHelper):
+
+ __prox_profile_type__ = "BNG gen"
+
+ def __init__(self, resource_helper):
+ super(ProxBngProfileHelper, self).__init__(resource_helper)
+ self._cores_tuple = None
+
+ @property
+ def bng_cores(self):
+ if not self._cores_tuple:
+ self._cores_tuple = self.get_cores_gen_bng_qos()
+ return self._cores_tuple
+
+ @property
+ def cpe_cores(self):
+ return self.bng_cores[0]
+
+ @property
+ def inet_cores(self):
+ return self.bng_cores[1]
+
+ @property
+ def arp_cores(self):
+ return self.bng_cores[2]
+
+ @property
+ def arp_task_cores(self):
+ return self.bng_cores[3]
+
+ @property
+ def all_rx_cores(self):
+ return self.latency_cores
+
+ def get_cores_gen_bng_qos(self):
+ cpe_cores = []
+ inet_cores = []
+ arp_cores = []
+ arp_tasks_core = [0]
+ for section_name, section in self.resource_helper.setup_helper.prox_config_data:
+ if not section_name.startswith("core"):
+ continue
+
+ if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
+ continue
+
+ for item_key, item_value in section:
+ if item_key != 'name':
+ continue
+
+ if item_value.startswith("cpe"):
+ core_tuple = CoreSocketTuple(section_name)
+ cpe_core = core_tuple.find_in_topology(self.cpu_topology)
+ cpe_cores.append(cpe_core)
+
+ elif item_value.startswith("inet"):
+ core_tuple = CoreSocketTuple(section_name)
+ inet_core = core_tuple.find_in_topology(self.cpu_topology)
+ inet_cores.append(inet_core)
+
+ elif item_value.startswith("arp"):
+ core_tuple = CoreSocketTuple(section_name)
+ arp_core = core_tuple.find_in_topology(self.cpu_topology)
+ arp_cores.append(arp_core)
+
+ # We check the tasks/core separately
+ if item_value.startswith("arp_task"):
+ core_tuple = CoreSocketTuple(section_name)
+ arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
+ arp_tasks_core.append(arp_task_core)
+
+ return cpe_cores, inet_cores, arp_cores, arp_tasks_core
+
+ @contextmanager
+ def traffic_context(self, pkt_size, value):
+ # Tester is sending packets at the required speed already after
+ # setup_test(). Just get the current statistics, sleep the required
+ # amount of time and calculate packet loss.
+ inet_pkt_size = pkt_size
+ cpe_pkt_size = pkt_size - 24
+ ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
+
+ curr_up_speed = curr_down_speed = 0
+ max_up_speed = max_down_speed = value
+ if ratio < 1:
+ max_down_speed = value * ratio
+ else:
+ max_up_speed = value / ratio
+
+ # Initialize cores
+ self.sut.stop_all()
+ time.sleep(0.5)
+
+ # Flush any packets in the NIC RX buffers, otherwise the stats will be
+ # wrong.
+ self.sut.start(self.all_rx_cores)
+ time.sleep(0.5)
+ self.sut.stop(self.all_rx_cores)
+ time.sleep(0.5)
+ self.sut.reset_stats()
+
+ self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
+ self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
+
+ self.sut.reset_values(self.cpe_cores)
+ self.sut.reset_values(self.inet_cores)
+
+ # Set correct IP and UDP lengths in packet headers
+ # CPE
+ # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
+ self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
+ # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
+ self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
+
+ # INET
+ # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
+ self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
+ # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
+ self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
+ # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
+ self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
+
+ # Sending ARP to initialize tables - need a few seconds of generation
+ # to make sure all CPEs are initialized
+ LOG.info("Initializing SUT: sending ARP packets")
+ self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
+ self.sut.set_speed(self.inet_cores, curr_up_speed)
+ self.sut.set_speed(self.cpe_cores, curr_down_speed)
+ self.sut.start(self.arp_cores)
+ time.sleep(4)
+
+ # Ramp up the transmission speed. First go to the common speed, then
+ # increase steps for the faster one.
+ self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
+
+ LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
+
+ while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
+ # The min(..., ...) takes care of 1) floating point rounding errors
+ # that could make curr_*_speed to be slightly greater than
+ # max_*_speed and 2) max_*_speed not being an exact multiple of
+ # self._step_delta.
+ if curr_up_speed < max_up_speed:
+ curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
+ if curr_down_speed < max_down_speed:
+ curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
+
+ self.sut.set_speed(self.inet_cores, curr_up_speed)
+ self.sut.set_speed(self.cpe_cores, curr_down_speed)
+ time.sleep(self.step_time)
+
+ LOG.info("Target speeds reached. Starting real test.")
+
+ yield
+
+ self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
+ LOG.info("Test ended. Flushing NIC buffers")
+ self.sut.start(self.all_rx_cores)
+ time.sleep(3)
+ self.sut.stop(self.all_rx_cores)
+
+ def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
+ data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
+
+ with data_helper, self.traffic_context(pkt_size, value):
+ with data_helper.measure_tot_stats():
+ time.sleep(duration)
+ # Getting statistics to calculate PPS at right speed....
+ data_helper.capture_tsc_hz()
+ data_helper.latency = self.get_latency()
+
+ return data_helper.result_tuple, data_helper.samples