X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fnetwork_services%2Fvnf_generic%2Fvnf%2Fsample_vnf.py;h=ff81b5f5f3a92f8c84eb91e19ad26636f4b9e1aa;hb=c22c231d4061ae957d06206922978bfedf3aba77;hp=557009d303c44eb3c26d5d657780cc59eb9c7f3c;hpb=661d78e5ad9a831a4e6c36287d17ff6829b062d0;p=yardstick.git diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 557009d30..ff81b5f5f 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -22,17 +22,16 @@ import os import re import subprocess from collections import Mapping - from multiprocessing import Queue, Value, Process from six.moves import cStringIO from yardstick.benchmark.contexts.base import Context from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file -from yardstick.network_services.helpers.cpu import CpuSysCores +from yardstick.common.process import check_if_process_failed from yardstick.network_services.helpers.samplevnf_helper import PortPairs from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig -from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper +from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper from yardstick.network_services.nfvi.resource import ResourceProfile from yardstick.network_services.vnf_generic.vnf.base import GenericVNF from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper @@ -51,6 +50,8 @@ LOG = logging.getLogger(__name__) REMOTE_TMP = "/tmp" +DEFAULT_VNF_TIMEOUT = 3600 +PROCESS_JOIN_TIMEOUT = 3 class VnfSshHelper(AutoConnectSSH): @@ -94,7 +95,6 @@ class SetupEnvHelper(object): CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config") CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script") - CORES = [] DEFAULT_CONFIG_TPL_CFG = "sample.cfg" PIPELINE_COMMAND = '' VNF_TYPE = "SAMPLE" @@ -105,13 +105,6 @@ class SetupEnvHelper(object): self.ssh_helper = ssh_helper self.scenario_helper = scenario_helper - def _get_ports_gateway(self, name): - routing_table = self.vnfd_helper.vdu0.get('routing_table', []) - for route in routing_table: - if name == route['if']: - return route['gateway'] - return None - def build_config(self): raise NotImplementedError @@ -130,9 +123,6 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): APP_NAME = 'DpdkVnf' FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'" - HW_DEFAULT_CORE = 3 - SW_DEFAULT_CORE = 2 - @staticmethod def _update_packet_type(ip_pipeline_cfg, traffic_options): match_str = 'pkt_type = ipv4' @@ -245,51 +235,22 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): 'tool_path': tool_path, } - def _get_app_cpu(self): - if self.CORES: - return self.CORES - - vnf_cfg = self.scenario_helper.vnf_cfg - sys_obj = CpuSysCores(self.ssh_helper) - self.sys_cpu = sys_obj.get_core_socket() - num_core = int(vnf_cfg["worker_threads"]) - if vnf_cfg.get("lb_config", "SW") == 'HW': - num_core += self.HW_DEFAULT_CORE - else: - num_core += self.SW_DEFAULT_CORE - app_cpu = self.sys_cpu[str(self.socket)][:num_core] - return app_cpu - - def _get_cpu_sibling_list(self, cores=None): - if cores is None: - cores = self._get_app_cpu() - sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list" - awk_template = "awk -F: '{ print $1 }' < %s" - sys_path = "/sys/devices/system/cpu/" - cpu_topology = [] - try: - for core in cores: - sys_cmd = sys_cmd_template % (sys_path, core) - cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1] - cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(',')) - - return cpu_topology - except Exception: - return [] - - def _validate_cpu_cfg(self): - return self._get_cpu_sibling_list() - def setup_vnf_environment(self): self._setup_dpdk() - resource = self._setup_resources() + self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces] self.kill_vnf() + # bind before _setup_resources so we can use dpdk_port_num self._detect_and_bind_drivers() + resource = self._setup_resources() return resource def kill_vnf(self): + # pkill is not matching, debug with pgrep + self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME) + self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME) # have to use exact match - self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME) + # try using killall to match + self.ssh_helper.execute("sudo killall %s" % self.APP_NAME) def _setup_dpdk(self): """ setup dpdk environment needed for vnf to run """ @@ -307,10 +268,13 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): if exit_status != 0: self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup) - def _setup_resources(self): - interfaces = self.vnfd_helper.interfaces - self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces] + def get_collectd_options(self): + options = self.scenario_helper.all_options.get("collectd", {}) + # override with specific node settings + options.update(self.scenario_helper.options.get("collectd", {})) + return options + def _setup_resources(self): # what is this magic? how do we know which socket is for which port? # what about quad-socket? if any(v[5] == "0" for v in self.bound_pci): @@ -318,9 +282,16 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): else: self.socket = 1 - cores = self._validate_cpu_cfg() - return ResourceProfile(self.vnfd_helper.mgmt_interface, - interfaces=self.vnfd_helper.interfaces, cores=cores) + # implicit ordering, presumably by DPDK port num, so pre-sort by port_num + # this won't work because we don't have DPDK port numbers yet + ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num) + port_names = (intf["name"] for intf in ports) + collectd_options = self.get_collectd_options() + plugins = collectd_options.get("plugins", {}) + # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF + return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, + plugins=plugins, interval=collectd_options.get("interval"), + timeout=self.scenario_helper.timeout) def _detect_and_bind_drivers(self): interfaces = self.vnfd_helper.interfaces @@ -375,7 +346,7 @@ class ResourceHelper(object): def _collect_resource_kpi(self): result = {} status = self.resource.check_if_sa_running("collectd")[0] - if status: + if status == 0: result = self.resource.amqp_collect_nfvi_kpi() result = {"core": result} @@ -412,7 +383,6 @@ class ClientResourceHelper(ResourceHelper): self._queue = Queue() self._result = {} self._terminated = Value('i', 0) - self._vpci_ascending = None def _build_ports(self): self.networks = self.vnfd_helper.port_pairs.networks @@ -421,6 +391,10 @@ class ClientResourceHelper(ResourceHelper): self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports) self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports) + def port_num(self, intf): + # by default return port num + return self.vnfd_helper.port_num(intf) + def get_stats(self, *args, **kwargs): try: return self.client.get_stats(*args, **kwargs) @@ -465,6 +439,9 @@ class ClientResourceHelper(ResourceHelper): self._queue.put(samples) def run_traffic(self, traffic_profile): + # if we don't do this we can hang waiting for the queue to drain + # have to do this in the subprocess + self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path try: @@ -593,13 +570,7 @@ class SampleVNFDeployHelper(object): self.ssh_helper = ssh_helper self.vnfd_helper = vnfd_helper - DISABLE_DEPLOY = True - def deploy_vnfs(self, app_name): - # temp disable for now - if self.DISABLE_DEPLOY: - return - vnf_bin = self.ssh_helper.join_bin_path(app_name) exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0] if not exit_status: @@ -659,12 +630,19 @@ class ScenarioHelper(object): def topology(self): return self.scenario_cfg['topology'] + @property + def timeout(self): + return self.options.get('timeout', DEFAULT_VNF_TIMEOUT) + class SampleVNF(GenericVNF): """ Class providing file-like API for generic VNF implementation """ VNF_PROMPT = "pipeline>" WAIT_TIME = 1 + WAIT_TIME_FOR_SCRIPT = 10 + APP_NAME = "SampleVNF" + # we run the VNF interactively, so the ssh command will timeout after this long def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): super(SampleVNF, self).__init__(name, vnfd) @@ -747,7 +725,8 @@ class SampleVNF(GenericVNF): def _start_vnf(self): self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT) - self._vnf_process = Process(target=self._run) + name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) + self._vnf_process = Process(name=name, target=self._run) self._vnf_process.start() def _vnf_up_post(self): @@ -759,7 +738,9 @@ class SampleVNF(GenericVNF): self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name]) # self.nfvi_context = None - self.deploy_helper.deploy_vnfs(self.APP_NAME) + # vnf deploy is unsupported, use ansible playbooks + if self.scenario_helper.options.get("vnf_deploy", False): + self.deploy_helper.deploy_vnfs(self.APP_NAME) self.resource_helper.setup() self._start_vnf() @@ -786,7 +767,7 @@ class SampleVNF(GenericVNF): self.APP_NAME) LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME) - time.sleep(1) + time.sleep(self.WAIT_TIME_FOR_SCRIPT) # Send ENTER to display a new prompt in case the prompt text was corrupted # by other VNF output self.q_in.put('\r\n') @@ -797,6 +778,7 @@ class SampleVNF(GenericVNF): 'stdout': self.queue_wrapper, 'keep_stdin_open': True, 'pty': True, + 'timeout': self.scenario_helper.timeout, } def _build_config(self): @@ -829,11 +811,15 @@ class SampleVNF(GenericVNF): def terminate(self): self.vnf_execute("quit") - if self._vnf_process: - self._vnf_process.terminate() self.setup_helper.kill_vnf() self._tear_down() self.resource_helper.stop_collect() + if self._vnf_process is not None: + # be proper and join first before we kill + LOG.debug("joining before terminate %s", self._vnf_process.name) + self._vnf_process.join(PROCESS_JOIN_TIMEOUT) + self._vnf_process.terminate() + # no terminate children here because we share processes with tg def get_stats(self, *args, **kwargs): """ @@ -847,6 +833,8 @@ class SampleVNF(GenericVNF): return out def collect_kpi(self): + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) stats = self.get_stats() m = re.search(self.COLLECT_KPI, stats, re.MULTILINE) if m: @@ -871,7 +859,6 @@ class SampleVNFTrafficGen(GenericTrafficGen): def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): super(SampleVNFTrafficGen, self).__init__(name, vnfd) self.bin_path = get_nsb_option('bin_path', '') - self.name = "tgen__1" # name in topology file self.scenario_helper = ScenarioHelper(self.name) self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True) @@ -899,11 +886,13 @@ class SampleVNFTrafficGen(GenericTrafficGen): def instantiate(self, scenario_cfg, context_cfg): self.scenario_helper.scenario_cfg = scenario_cfg - self.resource_helper.generate_cfg() self.resource_helper.setup() + # must generate_cfg after DPDK bind because we need port number + self.resource_helper.generate_cfg() LOG.info("Starting %s server...", self.APP_NAME) - self._tg_process = Process(target=self._start_server) + name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) + self._tg_process = Process(name=name, target=self._start_server) self._tg_process.start() def wait_for_instantiate(self): @@ -939,7 +928,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): :param traffic_profile: :return: True/False """ - self._traffic_process = Process(target=self._traffic_runner, + name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, + os.getpid()) + self._traffic_process = Process(name=name, target=self._traffic_runner, args=(traffic_profile,)) self._traffic_process.start() # Wait for traffic process to start @@ -970,6 +961,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): pass def collect_kpi(self): + # check if the tg processes have exited + for proc in (self._tg_process, self._traffic_process): + check_if_process_failed(proc) result = self.resource_helper.collect_kpi() LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result @@ -980,5 +974,15 @@ class SampleVNFTrafficGen(GenericTrafficGen): :return: True/False """ self.traffic_finished = True + # we must kill client before we kill the server, or the client will raise exception if self._traffic_process is not None: + # be proper and try to join before terminating + LOG.debug("joining before terminate %s", self._traffic_process.name) + self._traffic_process.join(PROCESS_JOIN_TIMEOUT) self._traffic_process.terminate() + if self._tg_process is not None: + # be proper and try to join before terminating + LOG.debug("joining before terminate %s", self._tg_process.name) + self._tg_process.join(PROCESS_JOIN_TIMEOUT) + self._tg_process.terminate() + # no terminate children here because we share processes with vnf