# Copyright (c) 2016-2018 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from multiprocessing import Queue, Value, Process import os import posixpath import re import uuid import subprocess import time import six from trex_stl_lib.trex_stl_client import LoggerApi from trex_stl_lib.trex_stl_client import STLClient from trex_stl_lib.trex_stl_exceptions import STLError from yardstick.benchmark.contexts.base import Context from yardstick.common import exceptions as y_exceptions from yardstick.common.process import check_if_process_failed from yardstick.common import utils from yardstick.common import yaml_loader from yardstick.network_services import constants from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig from yardstick.network_services.nfvi.resource import ResourceProfile from yardstick.network_services.utils import get_nsb_option from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen from yardstick.network_services.vnf_generic.vnf.base import GenericVNF from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper from yardstick.benchmark.contexts.node import NodeContext LOG = logging.getLogger(__name__) class SetupEnvHelper(object): CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config") CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script") DEFAULT_CONFIG_TPL_CFG = "sample.cfg" PIPELINE_COMMAND = '' VNF_TYPE = "SAMPLE" def __init__(self, vnfd_helper, ssh_helper, scenario_helper): super(SetupEnvHelper, self).__init__() self.vnfd_helper = vnfd_helper self.ssh_helper = ssh_helper self.scenario_helper = scenario_helper self.collectd_options = {} def build_config(self): raise NotImplementedError def setup_vnf_environment(self): pass def kill_vnf(self): pass def tear_down(self): raise NotImplementedError class DpdkVnfSetupEnvHelper(SetupEnvHelper): APP_NAME = 'DpdkVnf' FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'" NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages' @staticmethod def _update_packet_type(ip_pipeline_cfg, traffic_options): match_str = 'pkt_type = ipv4' replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type']) pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str) return pipeline_config_str @classmethod def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options): traffic_type = traffic_options['traffic_type'] if traffic_options['vnf_type'] is not cls.APP_NAME: match_str = 'traffic_type = 4' replace_str = 'traffic_type = {0}'.format(traffic_type) elif traffic_type == 4: match_str = 'pkt_type = ipv4' replace_str = 'pkt_type = ipv4' else: match_str = 'pkt_type = ipv4' replace_str = 'pkt_type = ipv6' pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str) return pipeline_config_str def __init__(self, vnfd_helper, ssh_helper, scenario_helper): super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper) self.all_ports = None self.bound_pci = None self.socket = None self.used_drivers = None self.dpdk_bind_helper = DpdkBindHelper(ssh_helper) def _setup_hugepages(self): meminfo = utils.read_meminfo(self.ssh_helper) hp_size_kb = int(meminfo['Hugepagesize']) hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16) nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb)) self.ssh_helper.execute('echo %s | sudo tee %s' % (nr_hugepages, self.NR_HUGEPAGES_PATH)) hp = six.BytesIO() self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp) nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0]) LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s', hp_size_kb, nr_hugepages, nr_hugepages_set) def build_config(self): vnf_cfg = self.scenario_helper.vnf_cfg task_path = self.scenario_helper.task_path config_file = vnf_cfg.get('file') lb_count = vnf_cfg.get('lb_count', 3) lb_config = vnf_cfg.get('lb_config', 'SW') worker_config = vnf_cfg.get('worker_config', '1C/1T') worker_threads = vnf_cfg.get('worker_threads', 3) traffic_type = self.scenario_helper.all_options.get('traffic_type', 4) traffic_options = { 'traffic_type': traffic_type, 'pkt_type': 'ipv%s' % traffic_type, 'vnf_type': self.VNF_TYPE, } # read actions/rules from file acl_options = None acl_file_name = self.scenario_helper.options.get('rules') if acl_file_name: with utils.open_relative_file(acl_file_name, task_path) as infile: acl_options = yaml_loader.yaml_load(infile) config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path) config_basename = posixpath.basename(self.CFG_CONFIG) script_basename = posixpath.basename(self.CFG_SCRIPT) multiport = MultiPortConfig(self.scenario_helper.topology, config_tpl_cfg, config_basename, self.vnfd_helper, self.VNF_TYPE, lb_count, worker_threads, worker_config, lb_config, self.socket) multiport.generate_config() if config_file: with utils.open_relative_file(config_file, task_path) as infile: new_config = ['[EAL]'] vpci = [] for port in self.vnfd_helper.port_pairs.all_ports: interface = self.vnfd_helper.find_interface(name=port) vpci.append(interface['virtual-interface']["vpci"]) new_config.extend('w = {0}'.format(item) for item in vpci) new_config = '\n'.join(new_config) + '\n' + infile.read() else: with open(self.CFG_CONFIG) as handle: new_config = handle.read() new_config = self._update_traffic_type(new_config, traffic_options) new_config = self._update_packet_type(new_config, traffic_options) self.ssh_helper.upload_config_file(config_basename, new_config) self.ssh_helper.upload_config_file(script_basename, multiport.generate_script(self.vnfd_helper, self.get_flows_config(acl_options))) LOG.info("Provision and start the %s", self.APP_NAME) self._build_pipeline_kwargs() return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs) def get_flows_config(self, options=None): # pylint: disable=unused-argument """No actions/rules (flows) by default""" return None def _build_pipeline_kwargs(self): tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME) # count the number of actual ports in the list of pairs # remove duplicate ports # this is really a mapping from LINK ID to DPDK PMD ID # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2 # 0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3 ports = self.vnfd_helper.port_pairs.all_ports port_nums = self.vnfd_helper.port_nums(ports) # create mask from all the dpdk port numbers ports_mask_hex = hex(sum(2 ** num for num in port_nums)) vnf_cfg = self.scenario_helper.vnf_cfg lb_config = vnf_cfg.get('lb_config', 'SW') worker_threads = vnf_cfg.get('worker_threads', 3) hwlb = '' if lb_config == 'HW': hwlb = ' --hwlb %s' % worker_threads self.pipeline_kwargs = { 'cfg_file': self.CFG_CONFIG, 'script': self.CFG_SCRIPT, 'port_mask_hex': ports_mask_hex, 'tool_path': tool_path, 'hwlb': hwlb, } def setup_vnf_environment(self): self._setup_dpdk() self.kill_vnf() # bind before _setup_resources so we can use dpdk_port_num self._detect_and_bind_drivers() resource = self._setup_resources() return resource def kill_vnf(self): # pkill is not matching, debug with pgrep self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME) self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME) # have to use exact match # try using killall to match self.ssh_helper.execute("sudo killall %s" % self.APP_NAME) def _setup_dpdk(self): """Setup DPDK environment needed for VNF to run""" self._setup_hugepages() self.dpdk_bind_helper.load_dpdk_driver() exit_status = self.dpdk_bind_helper.check_dpdk_driver() if exit_status == 0: return def _setup_resources(self): # what is this magic? how do we know which socket is for which port? # what about quad-socket? if any(v[5] == "0" for v in self.bound_pci): self.socket = 0 else: self.socket = 1 # implicit ordering, presumably by DPDK port num, so pre-sort by port_num # this won't work because we don't have DPDK port numbers yet ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num) port_names = (intf["name"] for intf in ports) plugins = self.collectd_options.get("plugins", {}) interval = self.collectd_options.get("interval") # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, plugins=plugins, interval=interval, timeout=self.scenario_helper.timeout) def _check_interface_fields(self): num_nodes = len(self.scenario_helper.nodes) # OpenStack instance creation time is probably proportional to the number # of instances timeout = 120 * num_nodes dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces, self.ssh_helper, timeout) dpdk_node.check() def _detect_and_bind_drivers(self): interfaces = self.vnfd_helper.interfaces self._check_interface_fields() # check for bound after probe self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces] self.dpdk_bind_helper.read_status() self.dpdk_bind_helper.save_used_drivers() self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio') sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses) for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses): try: intf = next(v for v in interfaces if vpci == v['virtual-interface']['vpci']) # force to int intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num) except: # pylint: disable=bare-except pass time.sleep(2) def get_local_iface_name_by_vpci(self, vpci): find_net_cmd = self.FIND_NET_CMD.format(vpci) exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd) if exit_status == 0: return stdout return None def tear_down(self): self.dpdk_bind_helper.rebind_drivers() class ResourceHelper(object): COLLECT_KPI = '' MAKE_INSTALL = 'cd {0} && make && sudo make install' RESOURCE_WORD = 'sample' COLLECT_MAP = {} def __init__(self, setup_helper): super(ResourceHelper, self).__init__() self.resource = None self.setup_helper = setup_helper self.ssh_helper = setup_helper.ssh_helper self._enable = True def setup(self): self.resource = self.setup_helper.setup_vnf_environment() def generate_cfg(self): pass def update_from_context(self, context, attr_name): """Disable resource helper in case of baremetal context. And update appropriate node collectd options in context """ if isinstance(context, NodeContext): self._enable = False context.update_collectd_options_for_node(self.setup_helper.collectd_options, attr_name) def _collect_resource_kpi(self): result = {} status = self.resource.check_if_system_agent_running("collectd")[0] if status == 0 and self._enable: result = self.resource.amqp_collect_nfvi_kpi() result = {"core": result} return result def start_collect(self): if self._enable: self.resource.initiate_systemagent(self.ssh_helper.bin_path) self.resource.start() self.resource.amqp_process_for_nfvi_kpi() def stop_collect(self): if self.resource and self._enable: self.resource.stop() def collect_kpi(self): return self._collect_resource_kpi() class ClientResourceHelper(ResourceHelper): RUN_DURATION = 60 QUEUE_WAIT_TIME = 5 SYNC_PORT = 1 ASYNC_PORT = 2 def __init__(self, setup_helper): super(ClientResourceHelper, self).__init__(setup_helper) self.vnfd_helper = setup_helper.vnfd_helper self.scenario_helper = setup_helper.scenario_helper self.client = None self.client_started = Value('i', 0) self.all_ports = None self._queue = Queue() self._result = {} self._terminated = Value('i', 0) def _build_ports(self): self.networks = self.vnfd_helper.port_pairs.networks self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports) self.downlink_ports = \ self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports) self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports) def port_num(self, intf): # by default return port num return self.vnfd_helper.port_num(intf) def get_stats(self, *args, **kwargs): try: return self.client.get_stats(*args, **kwargs) except STLError: LOG.error('TRex client not connected') return {} def _get_samples(self, ports, port_pg_id=False): raise NotImplementedError() def _run_traffic_once(self, traffic_profile): traffic_profile.execute_traffic(self) self.client_started.value = 1 time.sleep(self.RUN_DURATION) samples = self._get_samples(traffic_profile.ports) time.sleep(self.QUEUE_WAIT_TIME) self._queue.put(samples) def run_traffic(self, traffic_profile, mq_producer): # if we don't do this we can hang waiting for the queue to drain # have to do this in the subprocess self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path mq_producer.tg_method_started() try: self._build_ports() self.client = self._connect() self.client.reset(ports=self.all_ports) self.client.remove_all_streams(self.all_ports) # remove all streams traffic_profile.register_generator(self) iteration_index = 0 while self._terminated.value == 0: iteration_index += 1 self._run_traffic_once(traffic_profile) mq_producer.tg_method_iteration(iteration_index) self.client.stop(self.all_ports) self.client.disconnect() self._terminated.value = 0 except STLError: if self._terminated.value: LOG.debug("traffic generator is stopped") return # return if trex/tg server is stopped. raise mq_producer.tg_method_finished() def terminate(self): self._terminated.value = 1 # stop client def clear_stats(self, ports=None): if ports is None: ports = self.all_ports self.client.clear_stats(ports=ports) def start(self, ports=None, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg # NOTE(ralonsoh): defining keyworded arguments before variable # positional arguments is a bug. This function definition doesn't work # in Python 2, although it works in Python 3. Reference: # https://www.python.org/dev/peps/pep-3102/ if ports is None: ports = self.all_ports self.client.start(ports=ports, *args, **kwargs) def collect_kpi(self): if not self._queue.empty(): kpi = self._queue.get() self._result.update(kpi) LOG.debug('Got KPIs from _queue for %s %s', self.scenario_helper.name, self.RESOURCE_WORD) return self._result def _connect(self, client=None): if client is None: client = STLClient(username=self.vnfd_helper.mgmt_interface["user"], server=self.vnfd_helper.mgmt_interface["ip"], verbose_level=LoggerApi.VERBOSE_QUIET) # try to connect with 5s intervals, 30s max for idx in range(6): try: client.connect() break except STLError: LOG.info("Unable to connect to Trex Server.. Attempt %s", idx) time.sleep(5) return client class Rfc2544ResourceHelper(object): DEFAULT_CORRELATED_TRAFFIC = False DEFAULT_LATENCY = False DEFAULT_TOLERANCE = '0.0001 - 0.0001' def __init__(self, scenario_helper): super(Rfc2544ResourceHelper, self).__init__() self.scenario_helper = scenario_helper self._correlated_traffic = None self.iteration = Value('i', 0) self._latency = None self._rfc2544 = None self._tolerance_low = None self._tolerance_high = None @property def rfc2544(self): if self._rfc2544 is None: self._rfc2544 = self.scenario_helper.all_options['rfc2544'] return self._rfc2544 @property def tolerance_low(self): if self._tolerance_low is None: self.get_rfc_tolerance() return self._tolerance_low @property def tolerance_high(self): if self._tolerance_high is None: self.get_rfc_tolerance() return self._tolerance_high @property def correlated_traffic(self): if self._correlated_traffic is None: self._correlated_traffic = \ self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC) return self._correlated_traffic @property def latency(self): if self._latency is None: self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY) return self._latency def get_rfc2544(self, name, default=None): return self.rfc2544.get(name, default) def get_rfc_tolerance(self): tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE) tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-'))) self._tolerance_low = next(tolerance_iter) self._tolerance_high = next(tolerance_iter, self.tolerance_low) class SampleVNFDeployHelper(object): SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf' REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO) SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME) def __init__(self, vnfd_helper, ssh_helper): super(SampleVNFDeployHelper, self).__init__() self.ssh_helper = ssh_helper self.vnfd_helper = vnfd_helper def deploy_vnfs(self, app_name): vnf_bin = self.ssh_helper.join_bin_path(app_name) exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0] if not exit_status: return subprocess.check_output(["rm", "-rf", self.REPO_NAME]) subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO]) time.sleep(2) self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR) self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True) build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh') time.sleep(2) http_proxy = os.environ.get('http_proxy', '') cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy) LOG.debug(cmd) self.ssh_helper.execute(cmd) vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name) self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path) self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin)) class ScenarioHelper(object): DEFAULT_VNF_CFG = { 'lb_config': 'SW', 'lb_count': 1, 'worker_config': '1C/1T', 'worker_threads': 1, } def __init__(self, name): self.name = name self.scenario_cfg = None @property def task_path(self): return self.scenario_cfg['task_path'] @property def nodes(self): return self.scenario_cfg.get('nodes') @property def all_options(self): return self.scenario_cfg.get('options', {}) @property def options(self): return self.all_options.get(self.name, {}) @property def vnf_cfg(self): return self.options.get('vnf_config', self.DEFAULT_VNF_CFG) @property def topology(self): return self.scenario_cfg['topology'] @property def timeout(self): test_duration = self.scenario_cfg.get('runner', {}).get('duration', self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)) test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT) return test_duration if test_duration > test_timeout else test_timeout class SampleVNF(GenericVNF): """ Class providing file-like API for generic VNF implementation """ VNF_PROMPT = "pipeline>" WAIT_TIME = 1 WAIT_TIME_FOR_SCRIPT = 10 APP_NAME = "SampleVNF" # we run the VNF interactively, so the ssh command will timeout after this long def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): super(SampleVNF, self).__init__(name, vnfd) self.bin_path = get_nsb_option('bin_path', '') self.scenario_helper = ScenarioHelper(self.name) self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path) if setup_env_helper_type is None: setup_env_helper_type = SetupEnvHelper self.setup_helper = setup_env_helper_type(self.vnfd_helper, self.ssh_helper, self.scenario_helper) self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper) if resource_helper_type is None: resource_helper_type = ResourceHelper self.resource_helper = resource_helper_type(self.setup_helper) self.context_cfg = None self.pipeline_kwargs = {} self.uplink_ports = None self.downlink_ports = None # NOTE(esm): make QueueFileWrapper invert-able so that we # never have to manage the queues self.q_in = Queue() self.q_out = Queue() self.queue_wrapper = None self.run_kwargs = {} self.used_drivers = {} self.vnf_port_pairs = None self._vnf_process = None def _start_vnf(self): self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT) name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) self._vnf_process = Process(name=name, target=self._run) self._vnf_process.start() def _vnf_up_post(self): pass def instantiate(self, scenario_cfg, context_cfg): self._update_collectd_options(scenario_cfg, context_cfg) self.scenario_helper.scenario_cfg = scenario_cfg self.context_cfg = context_cfg self.resource_helper.update_from_context( Context.get_context_from_server(self.scenario_helper.nodes[self.name]), self.scenario_helper.nodes[self.name] ) # vnf deploy is unsupported, use ansible playbooks if self.scenario_helper.options.get("vnf_deploy", False): self.deploy_helper.deploy_vnfs(self.APP_NAME) self.resource_helper.setup() self._start_vnf() def _update_collectd_options(self, scenario_cfg, context_cfg): """Update collectd configuration options This function retrieves all collectd options contained in the test case definition builds a single dictionary combining them. The following fragment represents a test case with the collectd options and priorities (1 highest, 3 lowest): --- schema: yardstick:task:0.1 scenarios: - type: NSPerf nodes: tg__0: trafficgen_1.yardstick vnf__0: vnf.yardstick options: collectd: # COLLECTD priority 3 vnf__0: collectd: plugins: load # COLLECTD priority 2 context: type: Node name: yardstick nfvi_type: baremetal file: /etc/yardstick/nodes/pod_ixia.yaml # COLLECTD priority 1 """ scenario_options = scenario_cfg.get('options', {}) generic_options = scenario_options.get('collectd', {}) scenario_node_options = scenario_options.get(self.name, {})\ .get('collectd', {}) context_node_options = context_cfg.get('nodes', {})\ .get(self.name, {}).get('collectd', {}) options = generic_options self._update_options(options, scenario_node_options) self._update_options(options, context_node_options) self.setup_helper.collectd_options = options def _update_options(self, options, additional_options): """Update collectd options and plugins dictionary""" for k, v in additional_options.items(): if isinstance(v, dict) and k in options: options[k].update(v) else: options[k] = v def wait_for_instantiate(self): buf = [] time.sleep(self.WAIT_TIME) # Give some time for config to load while True: if not self._vnf_process.is_alive(): raise RuntimeError("%s VNF process died." % self.APP_NAME) # NOTE(esm): move to QueueFileWrapper while self.q_out.qsize() > 0: buf.append(self.q_out.get()) message = ''.join(buf) if self.VNF_PROMPT in message: LOG.info("%s VNF is up and running.", self.APP_NAME) self._vnf_up_post() self.queue_wrapper.clear() return self._vnf_process.exitcode if "PANIC" in message: raise RuntimeError("Error starting %s VNF." % self.APP_NAME) LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME) time.sleep(self.WAIT_TIME_FOR_SCRIPT) # Send ENTER to display a new prompt in case the prompt text was corrupted # by other VNF output self.q_in.put('\r\n') def start_collect(self): self.resource_helper.start_collect() def stop_collect(self): self.resource_helper.stop_collect() def _build_run_kwargs(self): self.run_kwargs = { 'stdin': self.queue_wrapper, 'stdout': self.queue_wrapper, 'keep_stdin_open': True, 'pty': True, 'timeout': self.scenario_helper.timeout, } def _build_config(self): return self.setup_helper.build_config() def _run(self): # we can't share ssh paramiko objects to force new connection self.ssh_helper.drop_connection() cmd = self._build_config() # kill before starting self.setup_helper.kill_vnf() LOG.debug(cmd) self._build_run_kwargs() self.ssh_helper.run(cmd, **self.run_kwargs) def vnf_execute(self, cmd, wait_time=2): """ send cmd to vnf process """ LOG.info("%s command: %s", self.APP_NAME, cmd) self.q_in.put("{}\r\n".format(cmd)) time.sleep(wait_time) output = [] while self.q_out.qsize() > 0: output.append(self.q_out.get()) return "".join(output) def _tear_down(self): pass def terminate(self): self.vnf_execute("quit") self.setup_helper.kill_vnf() self._tear_down() self.resource_helper.stop_collect() if self._vnf_process is not None: # be proper and join first before we kill LOG.debug("joining before terminate %s", self._vnf_process.name) self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT) self._vnf_process.terminate() # no terminate children here because we share processes with tg def get_stats(self, *args, **kwargs): # pylint: disable=unused-argument """Method for checking the statistics This method could be overridden in children classes. :return: VNF statistics """ cmd = 'p {0} stats'.format(self.APP_WORD) out = self.vnf_execute(cmd) return out def collect_kpi(self): # we can't get KPIs if the VNF is down check_if_process_failed(self._vnf_process, 0.01) stats = self.get_stats() m = re.search(self.COLLECT_KPI, stats, re.MULTILINE) physical_node = Context.get_physical_node_from_server( self.scenario_helper.nodes[self.name]) result = {"physical_node": physical_node} if m: result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}) result["collect_stats"] = self.resource_helper.collect_kpi() else: result.update({"packets_in": 0, "packets_fwd": 0, "packets_dropped": 0}) LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result def scale(self, flavor=""): """The SampleVNF base class doesn't provide the 'scale' feature""" raise y_exceptions.FunctionNotImplemented( function_name='scale', class_name='SampleVNFTrafficGen') class SampleVNFTrafficGen(GenericTrafficGen): """ Class providing file-like API for generic traffic generator """ APP_NAME = 'Sample' RUN_WAIT = 1 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): super(SampleVNFTrafficGen, self).__init__(name, vnfd) self.bin_path = get_nsb_option('bin_path', '') self.scenario_helper = ScenarioHelper(self.name) self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True) if setup_env_helper_type is None: setup_env_helper_type = SetupEnvHelper self.setup_helper = setup_env_helper_type(self.vnfd_helper, self.ssh_helper, self.scenario_helper) if resource_helper_type is None: resource_helper_type = ClientResourceHelper self.resource_helper = resource_helper_type(self.setup_helper) self.runs_traffic = True self.traffic_finished = False self._tg_process = None self._traffic_process = None def _start_server(self): # we can't share ssh paramiko objects to force new connection self.ssh_helper.drop_connection() def instantiate(self, scenario_cfg, context_cfg): self.scenario_helper.scenario_cfg = scenario_cfg self.resource_helper.update_from_context( Context.get_context_from_server(self.scenario_helper.nodes[self.name]), self.scenario_helper.nodes[self.name] ) self.resource_helper.setup() # must generate_cfg after DPDK bind because we need port number self.resource_helper.generate_cfg() LOG.info("Starting %s server...", self.APP_NAME) name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) self._tg_process = Process(name=name, target=self._start_server) self._tg_process.start() def _check_status(self): raise NotImplementedError def _wait_for_process(self): while True: if not self._tg_process.is_alive(): raise RuntimeError("%s traffic generator process died." % self.APP_NAME) LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME) time.sleep(1) status = self._check_status() if status == 0: LOG.info("%s TG Server is up and running.", self.APP_NAME) return self._tg_process.exitcode def _traffic_runner(self, traffic_profile, mq_id): # always drop connections first thing in new processes # so we don't get paramiko errors self.ssh_helper.drop_connection() LOG.info("Starting %s client...", self.APP_NAME) self._mq_producer = self._setup_mq_producer(mq_id) self.resource_helper.run_traffic(traffic_profile, self._mq_producer) def run_traffic(self, traffic_profile): """ Generate traffic on the wire according to the given params. Method is non-blocking, returns immediately when traffic process is running. Mandatory. :param traffic_profile: :return: True/False """ name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, os.getpid()) self._traffic_process = Process( name=name, target=self._traffic_runner, args=(traffic_profile, uuid.uuid1().int)) self._traffic_process.start() # Wait for traffic process to start while self.resource_helper.client_started.value == 0: time.sleep(self.RUN_WAIT) # what if traffic process takes a few seconds to start? if not self._traffic_process.is_alive(): break def collect_kpi(self): # check if the tg processes have exited physical_node = Context.get_physical_node_from_server( self.scenario_helper.nodes[self.name]) result = {"physical_node": physical_node} for proc in (self._tg_process, self._traffic_process): check_if_process_failed(proc) result["collect_stats"] = self.resource_helper.collect_kpi() LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result def terminate(self): """ After this method finishes, all traffic processes should stop. Mandatory. :return: True/False """ self.traffic_finished = True # we must kill client before we kill the server, or the client will raise exception if self._traffic_process is not None: # be proper and try to join before terminating LOG.debug("joining before terminate %s", self._traffic_process.name) self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT) self._traffic_process.terminate() if self._tg_process is not None: # be proper and try to join before terminating LOG.debug("joining before terminate %s", self._tg_process.name) self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT) self._tg_process.terminate() # no terminate children here because we share processes with vnf def scale(self, flavor=""): """A traffic generator VFN doesn't provide the 'scale' feature""" raise y_exceptions.FunctionNotImplemented( function_name='scale', class_name='SampleVNFTrafficGen')