# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-""" Base class implementation for generic vnf implementation """
-from collections import Mapping
import logging
from multiprocessing import Queue, Value, Process
-
import os
import posixpath
import re
+import uuid
import subprocess
import time
from yardstick.common import exceptions as y_exceptions
from yardstick.common.process import check_if_process_failed
from yardstick.common import utils
-from yardstick.network_services.constants import DEFAULT_VNF_TIMEOUT
-from yardstick.network_services.constants import PROCESS_JOIN_TIMEOUT
-from yardstick.network_services.constants import REMOTE_TMP
+from yardstick.common import yaml_loader
+from yardstick.network_services import constants
from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
-from yardstick.network_services.helpers.samplevnf_helper import PortPairs
from yardstick.network_services.nfvi.resource import ResourceProfile
from yardstick.network_services.utils import get_nsb_option
from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
-
+from yardstick.benchmark.contexts.node import NodeContext
LOG = logging.getLogger(__name__)
class SetupEnvHelper(object):
- CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
- CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
+ CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
+ CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
PIPELINE_COMMAND = ''
VNF_TYPE = "SAMPLE"
self.vnfd_helper = vnfd_helper
self.ssh_helper = ssh_helper
self.scenario_helper = scenario_helper
+ self.collectd_options = {}
def build_config(self):
raise NotImplementedError
APP_NAME = 'DpdkVnf'
FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
- HUGEPAGES_KB = 1024 * 1024 * 16
@staticmethod
def _update_packet_type(ip_pipeline_cfg, traffic_options):
def _setup_hugepages(self):
meminfo = utils.read_meminfo(self.ssh_helper)
hp_size_kb = int(meminfo['Hugepagesize'])
- nr_hugepages = int(abs(self.HUGEPAGES_KB / hp_size_kb))
+ hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
+ nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
self.ssh_helper.execute('echo %s | sudo tee %s' %
(nr_hugepages, self.NR_HUGEPAGES_PATH))
hp = six.BytesIO()
'vnf_type': self.VNF_TYPE,
}
+ # read actions/rules from file
+ acl_options = None
+ acl_file_name = self.scenario_helper.options.get('rules')
+ if acl_file_name:
+ with utils.open_relative_file(acl_file_name, task_path) as infile:
+ acl_options = yaml_loader.yaml_load(infile)
+
config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
task_path)
config_basename = posixpath.basename(self.CFG_CONFIG)
new_config = self._update_packet_type(new_config, traffic_options)
self.ssh_helper.upload_config_file(config_basename, new_config)
self.ssh_helper.upload_config_file(script_basename,
- multiport.generate_script(self.vnfd_helper))
+ multiport.generate_script(self.vnfd_helper,
+ self.get_flows_config(acl_options)))
LOG.info("Provision and start the %s", self.APP_NAME)
self._build_pipeline_kwargs()
return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
+ def get_flows_config(self, options=None): # pylint: disable=unused-argument
+ """No actions/rules (flows) by default"""
+ return None
+
def _build_pipeline_kwargs(self):
tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
# count the number of actual ports in the list of pairs
port_nums = self.vnfd_helper.port_nums(ports)
# create mask from all the dpdk port numbers
ports_mask_hex = hex(sum(2 ** num for num in port_nums))
+
+ vnf_cfg = self.scenario_helper.vnf_cfg
+ lb_config = vnf_cfg.get('lb_config', 'SW')
+ worker_threads = vnf_cfg.get('worker_threads', 3)
+ hwlb = ''
+ if lb_config == 'HW':
+ hwlb = ' --hwlb %s' % worker_threads
+
self.pipeline_kwargs = {
'cfg_file': self.CFG_CONFIG,
'script': self.CFG_SCRIPT,
'port_mask_hex': ports_mask_hex,
'tool_path': tool_path,
+ 'hwlb': hwlb,
}
def setup_vnf_environment(self):
if exit_status == 0:
return
- def get_collectd_options(self):
- options = self.scenario_helper.all_options.get("collectd", {})
- # override with specific node settings
- options.update(self.scenario_helper.options.get("collectd", {}))
- return options
-
def _setup_resources(self):
# what is this magic? how do we know which socket is for which port?
# what about quad-socket?
# this won't work because we don't have DPDK port numbers yet
ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
port_names = (intf["name"] for intf in ports)
- collectd_options = self.get_collectd_options()
- plugins = collectd_options.get("plugins", {})
+ plugins = self.collectd_options.get("plugins", {})
+ interval = self.collectd_options.get("interval")
# we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
- plugins=plugins, interval=collectd_options.get("interval"),
+ plugins=plugins, interval=interval,
timeout=self.scenario_helper.timeout)
def _check_interface_fields(self):
self.resource = None
self.setup_helper = setup_helper
self.ssh_helper = setup_helper.ssh_helper
+ self._enable = True
def setup(self):
self.resource = self.setup_helper.setup_vnf_environment()
def generate_cfg(self):
pass
+ def update_from_context(self, context, attr_name):
+ """Disable resource helper in case of baremetal context.
+
+ And update appropriate node collectd options in context
+ """
+ if isinstance(context, NodeContext):
+ self._enable = False
+ context.update_collectd_options_for_node(self.setup_helper.collectd_options,
+ attr_name)
+
def _collect_resource_kpi(self):
result = {}
status = self.resource.check_if_system_agent_running("collectd")[0]
- if status == 0:
+ if status == 0 and self._enable:
result = self.resource.amqp_collect_nfvi_kpi()
result = {"core": result}
return result
def start_collect(self):
- self.resource.initiate_systemagent(self.ssh_helper.bin_path)
- self.resource.start()
- self.resource.amqp_process_for_nfvi_kpi()
+ if self._enable:
+ self.resource.initiate_systemagent(self.ssh_helper.bin_path)
+ self.resource.start()
+ self.resource.amqp_process_for_nfvi_kpi()
def stop_collect(self):
- if self.resource:
+ if self.resource and self._enable:
self.resource.stop()
def collect_kpi(self):
try:
return self.client.get_stats(*args, **kwargs)
except STLError:
- LOG.exception("TRex client not connected")
- return {}
-
- def generate_samples(self, ports, key=None, default=None):
- # needs to be used ports
- last_result = self.get_stats(ports)
- key_value = last_result.get(key, default)
-
- if not isinstance(last_result, Mapping): # added for mock unit test
- self._terminated.value = 1
+ LOG.error('TRex client not connected')
return {}
- samples = {}
- # recalculate port for interface and see if it matches ports provided
- for intf in self.vnfd_helper.interfaces:
- name = intf["name"]
- port = self.vnfd_helper.port_num(name)
- if port in ports:
- xe_value = last_result.get(port, {})
- samples[name] = {
- "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
- "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
- "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
- "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
- "in_packets": int(xe_value.get("ipackets", 0)),
- "out_packets": int(xe_value.get("opackets", 0)),
- }
- if key:
- samples[name][key] = key_value
- return samples
+ def _get_samples(self, ports, port_pg_id=False):
+ raise NotImplementedError()
def _run_traffic_once(self, traffic_profile):
traffic_profile.execute_traffic(self)
self.client_started.value = 1
time.sleep(self.RUN_DURATION)
- samples = self.generate_samples(traffic_profile.ports)
+ samples = self._get_samples(traffic_profile.ports)
time.sleep(self.QUEUE_WAIT_TIME)
self._queue.put(samples)
- def run_traffic(self, traffic_profile):
+ def run_traffic(self, traffic_profile, mq_producer):
# if we don't do this we can hang waiting for the queue to drain
# have to do this in the subprocess
self._queue.cancel_join_thread()
# fixme: fix passing correct trex config file,
# instead of searching the default path
+ mq_producer.tg_method_started()
try:
self._build_ports()
self.client = self._connect()
self.client.remove_all_streams(self.all_ports) # remove all streams
traffic_profile.register_generator(self)
+ iteration_index = 0
while self._terminated.value == 0:
+ iteration_index += 1
self._run_traffic_once(traffic_profile)
+ mq_producer.tg_method_iteration(iteration_index)
self.client.stop(self.all_ports)
self.client.disconnect()
return # return if trex/tg server is stopped.
raise
+ mq_producer.tg_method_finished()
+
def terminate(self):
self._terminated.value = 1 # stop client
@property
def timeout(self):
- return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
+ test_duration = self.scenario_cfg.get('runner', {}).get('duration',
+ self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
+ test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
+ return test_duration if test_duration > test_timeout else test_timeout
class SampleVNF(GenericVNF):
APP_NAME = "SampleVNF"
# we run the VNF interactively, so the ssh command will timeout after this long
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
- super(SampleVNF, self).__init__(name, vnfd)
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
+ super(SampleVNF, self).__init__(name, vnfd, task_id)
self.bin_path = get_nsb_option('bin_path', '')
self.scenario_helper = ScenarioHelper(self.name)
self.resource_helper = resource_helper_type(self.setup_helper)
self.context_cfg = None
- self.nfvi_context = None
self.pipeline_kwargs = {}
self.uplink_ports = None
self.downlink_ports = None
self.vnf_port_pairs = None
self._vnf_process = None
- def _build_ports(self):
- self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
- self.networks = self._port_pairs.networks
- self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
- self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
- self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
-
- def _get_route_data(self, route_index, route_type):
- route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
- for _ in range(route_index):
- next(route_iter, '')
- return next(route_iter, {}).get(route_type, '')
-
- def _get_port0localip6(self):
- return_value = self._get_route_data(0, 'network')
- LOG.info("_get_port0localip6 : %s", return_value)
- return return_value
-
- def _get_port1localip6(self):
- return_value = self._get_route_data(1, 'network')
- LOG.info("_get_port1localip6 : %s", return_value)
- return return_value
-
- def _get_port0prefixlen6(self):
- return_value = self._get_route_data(0, 'netmask')
- LOG.info("_get_port0prefixlen6 : %s", return_value)
- return return_value
-
- def _get_port1prefixlen6(self):
- return_value = self._get_route_data(1, 'netmask')
- LOG.info("_get_port1prefixlen6 : %s", return_value)
- return return_value
-
- def _get_port0gateway6(self):
- return_value = self._get_route_data(0, 'network')
- LOG.info("_get_port0gateway6 : %s", return_value)
- return return_value
-
- def _get_port1gateway6(self):
- return_value = self._get_route_data(1, 'network')
- LOG.info("_get_port1gateway6 : %s", return_value)
- return return_value
-
def _start_vnf(self):
self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
pass
def instantiate(self, scenario_cfg, context_cfg):
+ self._update_collectd_options(scenario_cfg, context_cfg)
self.scenario_helper.scenario_cfg = scenario_cfg
self.context_cfg = context_cfg
- self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
- # self.nfvi_context = None
+ self.resource_helper.update_from_context(
+ Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
+ self.scenario_helper.nodes[self.name]
+ )
# vnf deploy is unsupported, use ansible playbooks
if self.scenario_helper.options.get("vnf_deploy", False):
self.resource_helper.setup()
self._start_vnf()
+ def _update_collectd_options(self, scenario_cfg, context_cfg):
+ """Update collectd configuration options
+ This function retrieves all collectd options contained in the test case
+
+ definition builds a single dictionary combining them. The following fragment
+ represents a test case with the collectd options and priorities (1 highest, 3 lowest):
+ ---
+ schema: yardstick:task:0.1
+ scenarios:
+ - type: NSPerf
+ nodes:
+ tg__0: trafficgen_1.yardstick
+ vnf__0: vnf.yardstick
+ options:
+ collectd:
+ <options> # COLLECTD priority 3
+ vnf__0:
+ collectd:
+ plugins:
+ load
+ <options> # COLLECTD priority 2
+ context:
+ type: Node
+ name: yardstick
+ nfvi_type: baremetal
+ file: /etc/yardstick/nodes/pod_ixia.yaml # COLLECTD priority 1
+ """
+ scenario_options = scenario_cfg.get('options', {})
+ generic_options = scenario_options.get('collectd', {})
+ scenario_node_options = scenario_options.get(self.name, {})\
+ .get('collectd', {})
+ context_node_options = context_cfg.get('nodes', {})\
+ .get(self.name, {}).get('collectd', {})
+
+ options = generic_options
+ self._update_options(options, scenario_node_options)
+ self._update_options(options, context_node_options)
+
+ self.setup_helper.collectd_options = options
+
+ def _update_options(self, options, additional_options):
+ """Update collectd options and plugins dictionary"""
+ for k, v in additional_options.items():
+ if isinstance(v, dict) and k in options:
+ options[k].update(v)
+ else:
+ options[k] = v
+
def wait_for_instantiate(self):
buf = []
time.sleep(self.WAIT_TIME) # Give some time for config to load
LOG.info("%s VNF is up and running.", self.APP_NAME)
self._vnf_up_post()
self.queue_wrapper.clear()
- self.resource_helper.start_collect()
return self._vnf_process.exitcode
if "PANIC" in message:
# by other VNF output
self.q_in.put('\r\n')
+ def start_collect(self):
+ self.resource_helper.start_collect()
+
+ def stop_collect(self):
+ self.resource_helper.stop_collect()
+
def _build_run_kwargs(self):
self.run_kwargs = {
'stdin': self.queue_wrapper,
if self._vnf_process is not None:
# be proper and join first before we kill
LOG.debug("joining before terminate %s", self._vnf_process.name)
- self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
+ self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
self._vnf_process.terminate()
# no terminate children here because we share processes with tg
def collect_kpi(self):
# we can't get KPIs if the VNF is down
- check_if_process_failed(self._vnf_process)
+ check_if_process_failed(self._vnf_process, 0.01)
stats = self.get_stats()
m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+ physical_node = Context.get_physical_node_from_server(
+ self.scenario_helper.nodes[self.name])
+
+ result = {"physical_node": physical_node}
if m:
- result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
+ result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
result["collect_stats"] = self.resource_helper.collect_kpi()
else:
- result = {
- "packets_in": 0,
- "packets_fwd": 0,
- "packets_dropped": 0,
- }
+ result.update({"packets_in": 0,
+ "packets_fwd": 0,
+ "packets_dropped": 0})
+
LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
return result
APP_NAME = 'Sample'
RUN_WAIT = 1
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
- super(SampleVNFTrafficGen, self).__init__(name, vnfd)
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
+ super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id)
self.bin_path = get_nsb_option('bin_path', '')
self.scenario_helper = ScenarioHelper(self.name)
def instantiate(self, scenario_cfg, context_cfg):
self.scenario_helper.scenario_cfg = scenario_cfg
+ self.resource_helper.update_from_context(
+ Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
+ self.scenario_helper.nodes[self.name]
+ )
+
self.resource_helper.setup()
# must generate_cfg after DPDK bind because we need port number
self.resource_helper.generate_cfg()
LOG.info("%s TG Server is up and running.", self.APP_NAME)
return self._tg_process.exitcode
- def _traffic_runner(self, traffic_profile):
+ def _traffic_runner(self, traffic_profile, mq_id):
# always drop connections first thing in new processes
# so we don't get paramiko errors
self.ssh_helper.drop_connection()
LOG.info("Starting %s client...", self.APP_NAME)
- self.resource_helper.run_traffic(traffic_profile)
+ self._mq_producer = self._setup_mq_producer(mq_id)
+ self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
def run_traffic(self, traffic_profile):
""" Generate traffic on the wire according to the given params.
:param traffic_profile:
:return: True/False
"""
- name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
+ name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
+ traffic_profile.__class__.__name__,
os.getpid())
- self._traffic_process = Process(name=name, target=self._traffic_runner,
- args=(traffic_profile,))
+ self._traffic_process = Process(
+ name=name, target=self._traffic_runner,
+ args=(traffic_profile, uuid.uuid1().int))
self._traffic_process.start()
# Wait for traffic process to start
while self.resource_helper.client_started.value == 0:
if not self._traffic_process.is_alive():
break
- return self._traffic_process.is_alive()
-
def collect_kpi(self):
# check if the tg processes have exited
+ physical_node = Context.get_physical_node_from_server(
+ self.scenario_helper.nodes[self.name])
+
+ result = {"physical_node": physical_node}
for proc in (self._tg_process, self._traffic_process):
check_if_process_failed(proc)
- result = self.resource_helper.collect_kpi()
+
+ result["collect_stats"] = self.resource_helper.collect_kpi()
LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
return result
if self._traffic_process is not None:
# be proper and try to join before terminating
LOG.debug("joining before terminate %s", self._traffic_process.name)
- self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
+ self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
self._traffic_process.terminate()
if self._tg_process is not None:
# be proper and try to join before terminating
LOG.debug("joining before terminate %s", self._tg_process.name)
- self._tg_process.join(PROCESS_JOIN_TIMEOUT)
+ self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
self._tg_process.terminate()
# no terminate children here because we share processes with vnf