from __future__ import absolute_import
import array
-import operator
-import logging
import io
+import logging
+import operator
import os
import re
import select
import socket
-
-from collections import OrderedDict, namedtuple
import time
+from collections import OrderedDict, namedtuple
from contextlib import contextmanager
from itertools import repeat, chain
from multiprocessing import Queue
import six
-from six.moves import zip, StringIO
from six.moves import cStringIO
+from six.moves import zip, StringIO
from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
from yardstick.common import utils
-from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
-from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
+from yardstick.common.utils import SocketTopology, join_non_strings, try_int
+from yardstick.network_services.helpers.iniparser import ConfigParser
from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
""" send data to the remote instance """
LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
try:
+ # TODO: sendall will block, we need a timeout
self._sock.sendall(to_send.encode('utf-8'))
except:
pass
def hz(self):
return self.get_all_tot_stats()[3]
- # Deprecated
- # TODO: remove
- def rx_stats(self, cores, task=0):
- return self.core_stats(cores, task)
-
def core_stats(self, cores, task=0):
"""Get the receive statistics from the remote system"""
rx = tx = drop = tsc = 0
class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
# the actual app is lowercase
APP_NAME = 'prox'
+ # not used for Prox but added for consistency
+ VNF_TYPE = "PROX"
LUA_PARAMETER_NAME = ""
LUA_PARAMETER_PEER = {
self._prox_config_data = None
self.additional_files = {}
self.config_queue = Queue()
+ # allow_exit_without_flush
+ self.config_queue.cancel_join_thread()
self._global_section = None
@property
return sections
+ @staticmethod
+ def write_prox_lua(lua_config):
+ """
+ Write an .ini-format config file for PROX (parameters.lua)
+ PROX does not allow a space before/after the =, so we need
+ a custom method
+ """
+ out = []
+ for key in lua_config:
+ value = '"' + lua_config[key] + '"'
+ if key == "__name__":
+ continue
+ if value is not None and value != '@':
+ key = "=".join((key, str(value).replace('\n', '\n\t')))
+ out.append(key)
+ else:
+ key = str(key).replace('\n', '\n\t')
+ out.append(key)
+ return os.linesep.join(out)
+
@staticmethod
def write_prox_config(prox_config):
"""
def generate_prox_lua_file(self):
p = OrderedDict()
all_ports = self.vnfd_helper.port_pairs.all_ports
- lua_param = self.LUA_PARAMETER_NAME
for port_name in all_ports:
- peer = self.LUA_PARAMETER_PEER[lua_param]
port_num = self.vnfd_helper.port_num(port_name)
intf = self.vnfd_helper.find_interface(name=port_name)
vintf = intf['virtual-interface']
- local_ip = vintf["local_ip"]
- dst_ip = vintf["dst_ip"]
- local_ip_hex = ip_to_hex(local_ip, separator=' ')
- dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
- p.update([
- ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
- ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
- ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
- ("{}_ip_port_{}".format(peer, port_num), dst_ip),
- ])
- lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
- return lua
-
- def upload_prox_lua(self, config_dir, prox_config_data):
- # we could have multiple lua directives
- lau_dict = prox_config_data.get('lua', {})
- find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
- lua_file = next((found[0] for found in find_iter if found), None)
- if not lua_file:
- return ""
-
- out = self.generate_prox_lua_file()
- remote_path = os.path.join(config_dir, lua_file)
- return self.put_string_to_file(out, remote_path)
+ p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
+ p["src_mac{0}".format(port_num)] = vintf["local_mac"]
+
+ return p
+
+ def upload_prox_lua(self, config_file, lua_data):
+ # prox can't handle spaces around ' = ' so use custom method
+ out = StringIO(self.write_prox_lua(lua_data))
+ out.seek(0)
+ remote_path = os.path.join("/tmp", config_file)
+ self.ssh_helper.put_file_obj(out, remote_path)
+
+ return remote_path
def upload_prox_config(self, config_file, prox_config_data):
# prox can't handle spaces around ' = ' so use custom method
config_path = find_relative_file(config_path, task_path)
self.additional_files = {}
+ try:
+ if options['prox_generate_parameter']:
+ self.lua = []
+ self.lua = self.generate_prox_lua_file()
+ if len(self.lua) > 0:
+ self.upload_prox_lua("parameters.lua", self.lua)
+ except:
+ pass
+
prox_files = options.get('prox_files', [])
if isinstance(prox_files, six.string_types):
prox_files = [prox_files]
continue
for item_key, item_value in section:
- if item_key == "name" and item_value.startswith("cpe"):
+ if item_key != 'name':
+ continue
+
+ if item_value.startswith("cpe"):
core_tuple = CoreSocketTuple(section_name)
- core_tag = core_tuple.find_in_topology(self.cpu_topology)
- cpe_cores.append(core_tag)
+ cpe_core = core_tuple.find_in_topology(self.cpu_topology)
+ cpe_cores.append(cpe_core)
- elif item_key == "name" and item_value.startswith("inet"):
+ elif item_value.startswith("inet"):
core_tuple = CoreSocketTuple(section_name)
inet_core = core_tuple.find_in_topology(self.cpu_topology)
inet_cores.append(inet_core)
- elif item_key == "name" and item_value.startswith("arp"):
+ elif item_value.startswith("arp"):
core_tuple = CoreSocketTuple(section_name)
arp_core = core_tuple.find_in_topology(self.cpu_topology)
arp_cores.append(arp_core)
# We check the tasks/core separately
- if item_key == "name" and item_value.startswith("arp_task"):
+ if item_value.startswith("arp_task"):
core_tuple = CoreSocketTuple(section_name)
arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
arp_tasks_core.append(arp_task_core)
data_helper.latency = self.get_latency()
return data_helper.result_tuple, data_helper.samples
+
+
+class ProxVpeProfileHelper(ProxProfileHelper):
+
+ __prox_profile_type__ = "vPE gen"
+
+ def __init__(self, resource_helper):
+ super(ProxVpeProfileHelper, self).__init__(resource_helper)
+ self._cores_tuple = None
+ self._ports_tuple = None
+
+ @property
+ def vpe_cores(self):
+ if not self._cores_tuple:
+ self._cores_tuple = self.get_cores_gen_vpe()
+ return self._cores_tuple
+
+ @property
+ def cpe_cores(self):
+ return self.vpe_cores[0]
+
+ @property
+ def inet_cores(self):
+ return self.vpe_cores[1]
+
+ @property
+ def all_rx_cores(self):
+ return self.latency_cores
+
+ @property
+ def vpe_ports(self):
+ if not self._ports_tuple:
+ self._ports_tuple = self.get_ports_gen_vpe()
+ return self._ports_tuple
+
+ @property
+ def cpe_ports(self):
+ return self.vpe_ports[0]
+
+ @property
+ def inet_ports(self):
+ return self.vpe_ports[1]
+
+ def get_cores_gen_vpe(self):
+ cpe_cores = []
+ inet_cores = []
+ for section_name, section in self.resource_helper.setup_helper.prox_config_data:
+ if not section_name.startswith("core"):
+ continue
+
+ if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
+ continue
+
+ for item_key, item_value in section:
+ if item_key != 'name':
+ continue
+
+ if item_value.startswith("cpe"):
+ core_tuple = CoreSocketTuple(section_name)
+ core_tag = core_tuple.find_in_topology(self.cpu_topology)
+ cpe_cores.append(core_tag)
+
+ elif item_value.startswith("inet"):
+ core_tuple = CoreSocketTuple(section_name)
+ inet_core = core_tuple.find_in_topology(self.cpu_topology)
+ inet_cores.append(inet_core)
+
+ return cpe_cores, inet_cores
+
+ def get_ports_gen_vpe(self):
+ cpe_ports = []
+ inet_ports = []
+
+ for section_name, section in self.resource_helper.setup_helper.prox_config_data:
+ if not section_name.startswith("port"):
+ continue
+ tx_port_iter = re.finditer(r'\d+', section_name)
+ tx_port_no = int(next(tx_port_iter).group(0))
+
+ for item_key, item_value in section:
+ if item_key != 'name':
+ continue
+
+ if item_value.startswith("cpe"):
+ cpe_ports.append(tx_port_no)
+
+ elif item_value.startswith("inet"):
+ inet_ports.append(tx_port_no)
+
+ return cpe_ports, inet_ports
+
+ @contextmanager
+ def traffic_context(self, pkt_size, value):
+ # Calculate the target upload and download speed. The upload and
+ # download packets have different packet sizes, so in order to get
+ # equal bandwidth usage, the ratio of the speeds has to match the ratio
+ # of the packet sizes.
+ cpe_pkt_size = pkt_size
+ inet_pkt_size = pkt_size - 4
+ ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
+
+ curr_up_speed = curr_down_speed = 0
+ max_up_speed = max_down_speed = value
+ if ratio < 1:
+ max_down_speed = value * ratio
+ else:
+ max_up_speed = value / ratio
+
+ # Adjust speed when multiple cores per port are used to generate traffic
+ if len(self.cpe_ports) != len(self.cpe_cores):
+ max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
+ if len(self.inet_ports) != len(self.inet_cores):
+ max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
+
+ # Initialize cores
+ self.sut.stop_all()
+ time.sleep(2)
+
+ # Flush any packets in the NIC RX buffers, otherwise the stats will be
+ # wrong.
+ self.sut.start(self.all_rx_cores)
+ time.sleep(2)
+ self.sut.stop(self.all_rx_cores)
+ time.sleep(2)
+ self.sut.reset_stats()
+
+ self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
+ self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
+
+ self.sut.reset_values(self.cpe_cores)
+ self.sut.reset_values(self.inet_cores)
+
+ # Set correct IP and UDP lengths in packet headers
+ # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
+ self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
+ # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
+ self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
+
+ # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
+ self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
+ # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
+ self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
+
+ self.sut.set_speed(self.inet_cores, curr_up_speed)
+ self.sut.set_speed(self.cpe_cores, curr_down_speed)
+
+ # Ramp up the transmission speed. First go to the common speed, then
+ # increase steps for the faster one.
+ self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
+
+ LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
+
+ while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
+ # The min(..., ...) takes care of 1) floating point rounding errors
+ # that could make curr_*_speed to be slightly greater than
+ # max_*_speed and 2) max_*_speed not being an exact multiple of
+ # self._step_delta.
+ if curr_up_speed < max_up_speed:
+ curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
+ if curr_down_speed < max_down_speed:
+ curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
+
+ self.sut.set_speed(self.inet_cores, curr_up_speed)
+ self.sut.set_speed(self.cpe_cores, curr_down_speed)
+ time.sleep(self.step_time)
+
+ LOG.info("Target speeds reached. Starting real test.")
+
+ yield
+
+ self.sut.stop(self.cpe_cores + self.inet_cores)
+ LOG.info("Test ended. Flushing NIC buffers")
+ self.sut.start(self.all_rx_cores)
+ time.sleep(3)
+ self.sut.stop(self.all_rx_cores)
+
+ def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
+ data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
+
+ with data_helper, self.traffic_context(pkt_size, value):
+ with data_helper.measure_tot_stats():
+ time.sleep(duration)
+ # Getting statistics to calculate PPS at right speed....
+ data_helper.capture_tsc_hz()
+ data_helper.latency = self.get_latency()
+
+ return data_helper.result_tuple, data_helper.samples
+
+
+class ProxlwAFTRProfileHelper(ProxProfileHelper):
+
+ __prox_profile_type__ = "lwAFTR gen"
+
+ def __init__(self, resource_helper):
+ super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
+ self._cores_tuple = None
+ self._ports_tuple = None
+ self.step_delta = 5
+ self.step_time = 0.5
+
+ @property
+ def _lwaftr_cores(self):
+ if not self._cores_tuple:
+ self._cores_tuple = self._get_cores_gen_lwaftr()
+ return self._cores_tuple
+
+ @property
+ def tun_cores(self):
+ return self._lwaftr_cores[0]
+
+ @property
+ def inet_cores(self):
+ return self._lwaftr_cores[1]
+
+ @property
+ def _lwaftr_ports(self):
+ if not self._ports_tuple:
+ self._ports_tuple = self._get_ports_gen_lw_aftr()
+ return self._ports_tuple
+
+ @property
+ def tun_ports(self):
+ return self._lwaftr_ports[0]
+
+ @property
+ def inet_ports(self):
+ return self._lwaftr_ports[1]
+
+ @property
+ def all_rx_cores(self):
+ return self.latency_cores
+
+ def _get_cores_gen_lwaftr(self):
+ tun_cores = []
+ inet_cores = []
+ for section_name, section in self.resource_helper.setup_helper.prox_config_data:
+ if not section_name.startswith("core"):
+ continue
+
+ if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
+ continue
+
+ core_tuple = CoreSocketTuple(section_name)
+ core_tag = core_tuple.find_in_topology(self.cpu_topology)
+ for item_value in (v for k, v in section if k == 'name'):
+ if item_value.startswith('tun'):
+ tun_cores.append(core_tag)
+ elif item_value.startswith('inet'):
+ inet_cores.append(core_tag)
+
+ return tun_cores, inet_cores
+
+ def _get_ports_gen_lw_aftr(self):
+ tun_ports = []
+ inet_ports = []
+
+ re_port = re.compile('port (\d+)')
+ for section_name, section in self.resource_helper.setup_helper.prox_config_data:
+ match = re_port.search(section_name)
+ if not match:
+ continue
+
+ tx_port_no = int(match.group(1))
+ for item_value in (v for k, v in section if k == 'name'):
+ if item_value.startswith('lwB4'):
+ tun_ports.append(tx_port_no)
+ elif item_value.startswith('inet'):
+ inet_ports.append(tx_port_no)
+
+ return tun_ports, inet_ports
+
+ @staticmethod
+ def _resize(len1, len2):
+ if len1 == len2:
+ return 1.0
+ return 1.0 * len1 / len2
+
+ @contextmanager
+ def traffic_context(self, pkt_size, value):
+ # Tester is sending packets at the required speed already after
+ # setup_test(). Just get the current statistics, sleep the required
+ # amount of time and calculate packet loss.
+ tun_pkt_size = pkt_size
+ inet_pkt_size = pkt_size - 40
+ ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
+
+ curr_up_speed = curr_down_speed = 0
+ max_up_speed = max_down_speed = value
+
+ max_up_speed = value / ratio
+
+ # Adjust speed when multiple cores per port are used to generate traffic
+ if len(self.tun_ports) != len(self.tun_cores):
+ max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
+ if len(self.inet_ports) != len(self.inet_cores):
+ max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
+
+ # Initialize cores
+ self.sut.stop_all()
+ time.sleep(0.5)
+
+ # Flush any packets in the NIC RX buffers, otherwise the stats will be
+ # wrong.
+ self.sut.start(self.all_rx_cores)
+ time.sleep(0.5)
+ self.sut.stop(self.all_rx_cores)
+ time.sleep(0.5)
+ self.sut.reset_stats()
+
+ self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
+ self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
+
+ self.sut.reset_values(self.tun_cores)
+ self.sut.reset_values(self.inet_cores)
+
+ # Set correct IP and UDP lengths in packet headers
+ # tun
+ # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
+ self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
+ # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
+ self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
+ # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
+ self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
+
+ # INET
+ # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
+ self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
+ # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
+ self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
+
+ LOG.info("Initializing SUT: sending lwAFTR packets")
+ self.sut.set_speed(self.inet_cores, curr_up_speed)
+ self.sut.set_speed(self.tun_cores, curr_down_speed)
+ time.sleep(4)
+
+ # Ramp up the transmission speed. First go to the common speed, then
+ # increase steps for the faster one.
+ self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
+
+ LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
+
+ while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
+ # The min(..., ...) takes care of 1) floating point rounding errors
+ # that could make curr_*_speed to be slightly greater than
+ # max_*_speed and 2) max_*_speed not being an exact multiple of
+ # self._step_delta.
+ if curr_up_speed < max_up_speed:
+ curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
+ if curr_down_speed < max_down_speed:
+ curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
+
+ self.sut.set_speed(self.inet_cores, curr_up_speed)
+ self.sut.set_speed(self.tun_cores, curr_down_speed)
+ time.sleep(self.step_time)
+
+ LOG.info("Target speeds reached. Starting real test.")
+
+ yield
+
+ self.sut.stop(self.tun_cores + self.inet_cores)
+ LOG.info("Test ended. Flushing NIC buffers")
+ self.sut.start(self.all_rx_cores)
+ time.sleep(3)
+ self.sut.stop(self.all_rx_cores)
+
+ def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
+ data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
+
+ with data_helper, self.traffic_context(pkt_size, value):
+ with data_helper.measure_tot_stats():
+ time.sleep(duration)
+ # Getting statistics to calculate PPS at right speed....
+ data_helper.capture_tsc_hz()
+ data_helper.latency = self.get_latency()
+
+ return data_helper.result_tuple, data_helper.samples