X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fnetwork_services%2Fvnf_generic%2Fvnf%2Fsample_vnf.py;h=ff81b5f5f3a92f8c84eb91e19ad26636f4b9e1aa;hb=c22c231d4061ae957d06206922978bfedf3aba77;hp=659a7638ce7b291753406633a6dc881fa410fdd7;hpb=5421cf0bac5d0ecff13508bb488391cb043bae66;p=yardstick.git diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 659a7638c..ff81b5f5f 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -22,24 +22,25 @@ import os import re import subprocess from collections import Mapping - from multiprocessing import Queue, Value, Process from six.moves import cStringIO from yardstick.benchmark.contexts.base import Context from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file -from yardstick.network_services.helpers.cpu import CpuSysCores +from yardstick.common.process import check_if_process_failed +from yardstick.network_services.helpers.samplevnf_helper import PortPairs from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig +from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper from yardstick.network_services.nfvi.resource import ResourceProfile from yardstick.network_services.vnf_generic.vnf.base import GenericVNF from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen from yardstick.network_services.utils import get_nsb_option -from stl.trex_stl_lib.trex_stl_client import STLClient -from stl.trex_stl_lib.trex_stl_client import LoggerApi -from stl.trex_stl_lib.trex_stl_exceptions import STLError +from trex_stl_lib.trex_stl_client import STLClient +from trex_stl_lib.trex_stl_client import LoggerApi +from trex_stl_lib.trex_stl_exceptions import STLError from yardstick.ssh import AutoConnectSSH @@ -49,6 +50,8 @@ LOG = logging.getLogger(__name__) REMOTE_TMP = "/tmp" +DEFAULT_VNF_TIMEOUT = 3600 +PROCESS_JOIN_TIMEOUT = 3 class VnfSshHelper(AutoConnectSSH): @@ -92,7 +95,6 @@ class SetupEnvHelper(object): CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config") CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script") - CORES = [] DEFAULT_CONFIG_TPL_CFG = "sample.cfg" PIPELINE_COMMAND = '' VNF_TYPE = "SAMPLE" @@ -103,19 +105,14 @@ class SetupEnvHelper(object): self.ssh_helper = ssh_helper self.scenario_helper = scenario_helper - def _get_ports_gateway(self, name): - routing_table = self.vnfd_helper.vdu0.get('routing_table', []) - for route in routing_table: - if name == route['if']: - return route['gateway'] - return None - def build_config(self): raise NotImplementedError def setup_vnf_environment(self): pass - # raise NotImplementedError + + def kill_vnf(self): + pass def tear_down(self): raise NotImplementedError @@ -124,15 +121,8 @@ class SetupEnvHelper(object): class DpdkVnfSetupEnvHelper(SetupEnvHelper): APP_NAME = 'DpdkVnf' - DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}" - DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}" FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'" - HW_DEFAULT_CORE = 3 - SW_DEFAULT_CORE = 2 - - DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)") - @staticmethod def _update_packet_type(ip_pipeline_cfg, traffic_options): match_str = 'pkt_type = ipv4' @@ -163,15 +153,9 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper) self.all_ports = None self.bound_pci = None - self._dpdk_nic_bind = None self.socket = None self.used_drivers = None - - @property - def dpdk_nic_bind(self): - if self._dpdk_nic_bind is None: - self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py") - return self._dpdk_nic_bind + self.dpdk_bind_helper = DpdkBindHelper(ssh_helper) def _setup_hugepages(self): cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo" @@ -182,16 +166,12 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path) if hugepages == "2048kB": - pages = 16384 + pages = 8192 else: pages = 16 self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path)) - def _get_dpdk_port_num(self, name): - interface = self.vnfd_helper.find_interface(name=name) - return interface['virtual-interface']['dpdk_port_num'] - def build_config(self): vnf_cfg = self.scenario_helper.vnf_cfg task_path = self.scenario_helper.task_path @@ -214,7 +194,7 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): multiport = MultiPortConfig(self.scenario_helper.topology, config_tpl_cfg, config_basename, - self.vnfd_helper.interfaces, + self.vnfd_helper, self.VNF_TYPE, lb_count, worker_threads, @@ -232,7 +212,6 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): self.ssh_helper.upload_config_file(config_basename, new_config) self.ssh_helper.upload_config_file(script_basename, multiport.generate_script(self.vnfd_helper)) - self.all_ports = multiport.port_pair_list LOG.info("Provision and start the %s", self.APP_NAME) self._build_pipeline_kwargs() @@ -240,69 +219,38 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): def _build_pipeline_kwargs(self): tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME) - ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1) + # count the number of actual ports in the list of pairs + # remove duplicate ports + # this is really a mapping from LINK ID to DPDK PMD ID + # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2 + # 0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3 + ports = self.vnfd_helper.port_pairs.all_ports + port_nums = self.vnfd_helper.port_nums(ports) + # create mask from all the dpdk port numbers + ports_mask_hex = hex(sum(2 ** num for num in port_nums)) self.pipeline_kwargs = { 'cfg_file': self.CFG_CONFIG, 'script': self.CFG_SCRIPT, - 'ports_len_hex': ports_len_hex, + 'port_mask_hex': ports_mask_hex, 'tool_path': tool_path, } - def _get_app_cpu(self): - if self.CORES: - return self.CORES - - vnf_cfg = self.scenario_helper.vnf_cfg - sys_obj = CpuSysCores(self.ssh_helper) - self.sys_cpu = sys_obj.get_core_socket() - num_core = int(vnf_cfg["worker_threads"]) - if vnf_cfg.get("lb_config", "SW") == 'HW': - num_core += self.HW_DEFAULT_CORE - else: - num_core += self.SW_DEFAULT_CORE - app_cpu = self.sys_cpu[str(self.socket)][:num_core] - return app_cpu - - def _get_cpu_sibling_list(self, cores=None): - if cores is None: - cores = self._get_app_cpu() - sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list" - awk_template = "awk -F: '{ print $1 }' < %s" - sys_path = "/sys/devices/system/cpu/" - cpu_topology = [] - try: - for core in cores: - sys_cmd = sys_cmd_template % (sys_path, core) - cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1] - cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(',')) - - return cpu_topology - except Exception: - return [] - - def _validate_cpu_cfg(self): - return self._get_cpu_sibling_list() - - def _find_used_drivers(self): - cmd = "{0} -s".format(self.dpdk_nic_bind) - rc, dpdk_status, _ = self.ssh_helper.execute(cmd) - - self.used_drivers = { - vpci: (index, driver) - for index, (vpci, driver) - in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status)) - if any(b.endswith(vpci) for b in self.bound_pci) - } - def setup_vnf_environment(self): self._setup_dpdk() + self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces] + self.kill_vnf() + # bind before _setup_resources so we can use dpdk_port_num + self._detect_and_bind_drivers() resource = self._setup_resources() - self._kill_vnf() - self._detect_drivers() return resource - def _kill_vnf(self): - self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME) + def kill_vnf(self): + # pkill is not matching, debug with pgrep + self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME) + self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME) + # have to use exact match + # try using killall to match + self.ssh_helper.execute("sudo killall %s" % self.APP_NAME) def _setup_dpdk(self): """ setup dpdk environment needed for vnf to run """ @@ -320,10 +268,13 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): if exit_status != 0: self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup) - def _setup_resources(self): - interfaces = self.vnfd_helper.interfaces - self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces] + def get_collectd_options(self): + options = self.scenario_helper.all_options.get("collectd", {}) + # override with specific node settings + options.update(self.scenario_helper.options.get("collectd", {})) + return options + def _setup_resources(self): # what is this magic? how do we know which socket is for which port? # what about quad-socket? if any(v[5] == "0" for v in self.bound_pci): @@ -331,62 +282,45 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): else: self.socket = 1 - cores = self._validate_cpu_cfg() - return ResourceProfile(self.vnfd_helper.mgmt_interface, - interfaces=self.vnfd_helper.interfaces, cores=cores) - - def _detect_drivers(self): + # implicit ordering, presumably by DPDK port num, so pre-sort by port_num + # this won't work because we don't have DPDK port numbers yet + ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num) + port_names = (intf["name"] for intf in ports) + collectd_options = self.get_collectd_options() + plugins = collectd_options.get("plugins", {}) + # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF + return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, + plugins=plugins, interval=collectd_options.get("interval"), + timeout=self.scenario_helper.timeout) + + def _detect_and_bind_drivers(self): interfaces = self.vnfd_helper.interfaces - self._find_used_drivers() - for vpci, (index, _) in self.used_drivers.items(): - try: - intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci']) - except StopIteration: - pass - else: - intf1['dpdk_port_num'] = index + self.dpdk_bind_helper.read_status() + self.dpdk_bind_helper.save_used_drivers() - for vpci in self.bound_pci: - self._bind_dpdk('igb_uio', vpci) - time.sleep(2) + self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio') - def _bind_dpdk(self, driver, vpci, force=True): - if force: - force = '--force ' - else: - force = '' - cmd = self.DPDK_BIND_CMD.format(force=force, - dpdk_nic_bind=self.dpdk_nic_bind, - driver=driver, - vpci=vpci) - self.ssh_helper.execute(cmd) + sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses) + for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses): + try: + intf = next(v for v in interfaces + if vpci == v['virtual-interface']['vpci']) + # force to int + intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num) + except: + pass + time.sleep(2) - def _detect_and_bind_dpdk(self, vpci, driver): + def get_local_iface_name_by_vpci(self, vpci): find_net_cmd = self.FIND_NET_CMD.format(vpci) - exit_status, _, _ = self.ssh_helper.execute(find_net_cmd) - if exit_status == 0: - # already bound - return None - self._bind_dpdk(driver, vpci) exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd) - if exit_status != 0: - # failed to bind - return None - return stdout - - def _bind_kernel_devices(self): - for intf in self.vnfd_helper.interfaces: - vi = intf["virtual-interface"] - stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"]) - if stdout is not None: - vi["local_iface_name"] = posixpath.basename(stdout) + if exit_status == 0: + return stdout + return None def tear_down(self): - for vpci, (_, driver) in self.used_drivers.items(): - self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind, - driver=driver, - vpci=vpci)) + self.dpdk_bind_helper.rebind_drivers() class ResourceHelper(object): @@ -412,7 +346,7 @@ class ResourceHelper(object): def _collect_resource_kpi(self): result = {} status = self.resource.check_if_sa_running("collectd")[0] - if status: + if status == 0: result = self.resource.amqp_collect_nfvi_kpi() result = {"core": result} @@ -445,14 +379,21 @@ class ClientResourceHelper(ResourceHelper): self.client = None self.client_started = Value('i', 0) - self.my_ports = None + self.all_ports = None self._queue = Queue() self._result = {} self._terminated = Value('i', 0) - self._vpci_ascending = None def _build_ports(self): - self.my_ports = [0, 1] + self.networks = self.vnfd_helper.port_pairs.networks + self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports) + self.downlink_ports = \ + self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports) + self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports) + + def port_num(self, intf): + # by default return port num + return self.vnfd_helper.port_num(intf) def get_stats(self, *args, **kwargs): try: @@ -461,8 +402,9 @@ class ClientResourceHelper(ResourceHelper): LOG.exception("TRex client not connected") return {} - def generate_samples(self, key=None, default=None): - last_result = self.get_stats(self.my_ports) + def generate_samples(self, ports, key=None, default=None): + # needs to be used ports + last_result = self.get_stats(ports) key_value = last_result.get(key, default) if not isinstance(last_result, Mapping): # added for mock unit test @@ -470,44 +412,49 @@ class ClientResourceHelper(ResourceHelper): return {} samples = {} - for vpci_idx, vpci in enumerate(self._vpci_ascending): - name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"] - # fixme: VNFDs KPIs values needs to be mapped to TRex structure - xe_value = last_result.get(vpci_idx, {}) - samples[name] = { - "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)), - "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)), - "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)), - "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)), - "in_packets": int(xe_value.get("ipackets", 0)), - "out_packets": int(xe_value.get("opackets", 0)), - } - if key: - samples[name][key] = key_value + # recalculate port for interface and see if it matches ports provided + for intf in self.vnfd_helper.interfaces: + name = intf["name"] + port = self.vnfd_helper.port_num(name) + if port in ports: + xe_value = last_result.get(port, {}) + samples[name] = { + "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)), + "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)), + "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)), + "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)), + "in_packets": int(xe_value.get("ipackets", 0)), + "out_packets": int(xe_value.get("opackets", 0)), + } + if key: + samples[name][key] = key_value return samples def _run_traffic_once(self, traffic_profile): - traffic_profile.execute(self) + traffic_profile.execute_traffic(self) self.client_started.value = 1 time.sleep(self.RUN_DURATION) - samples = self.generate_samples() + samples = self.generate_samples(traffic_profile.ports) time.sleep(self.QUEUE_WAIT_TIME) self._queue.put(samples) def run_traffic(self, traffic_profile): + # if we don't do this we can hang waiting for the queue to drain + # have to do this in the subprocess + self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path try: self._build_ports() self.client = self._connect() - self.client.reset(ports=self.my_ports) - self.client.remove_all_streams(self.my_ports) # remove all streams + self.client.reset(ports=self.all_ports) + self.client.remove_all_streams(self.all_ports) # remove all streams traffic_profile.register_generator(self) while self._terminated.value == 0: self._run_traffic_once(traffic_profile) - self.client.stop(self.my_ports) + self.client.stop(self.all_ports) self.client.disconnect() self._terminated.value = 0 except STLError: @@ -521,19 +468,20 @@ class ClientResourceHelper(ResourceHelper): def clear_stats(self, ports=None): if ports is None: - ports = self.my_ports + ports = self.all_ports self.client.clear_stats(ports=ports) def start(self, ports=None, *args, **kwargs): if ports is None: - ports = self.my_ports + ports = self.all_ports self.client.start(ports=ports, *args, **kwargs) def collect_kpi(self): if not self._queue.empty(): kpi = self._queue.get() self._result.update(kpi) - LOG.debug("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result)) + LOG.debug("Got KPIs from _queue for {0} {1}".format( + self.scenario_helper.name, self.RESOURCE_WORD)) return self._result def _connect(self, client=None): @@ -622,13 +570,7 @@ class SampleVNFDeployHelper(object): self.ssh_helper = ssh_helper self.vnfd_helper = vnfd_helper - DISABLE_DEPLOY = True - def deploy_vnfs(self, app_name): - # temp disable for now - if self.DISABLE_DEPLOY: - return - vnf_bin = self.ssh_helper.join_bin_path(app_name) exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0] if not exit_status: @@ -666,19 +608,19 @@ class ScenarioHelper(object): @property def task_path(self): - return self.scenario_cfg["task_path"] + return self.scenario_cfg['task_path'] @property def nodes(self): - return self.scenario_cfg['nodes'] + return self.scenario_cfg.get('nodes') @property def all_options(self): - return self.scenario_cfg["options"] + return self.scenario_cfg.get('options', {}) @property def options(self): - return self.all_options[self.name] + return self.all_options.get(self.name, {}) @property def vnf_cfg(self): @@ -688,12 +630,19 @@ class ScenarioHelper(object): def topology(self): return self.scenario_cfg['topology'] + @property + def timeout(self): + return self.options.get('timeout', DEFAULT_VNF_TIMEOUT) + class SampleVNF(GenericVNF): """ Class providing file-like API for generic VNF implementation """ VNF_PROMPT = "pipeline>" WAIT_TIME = 1 + WAIT_TIME_FOR_SCRIPT = 10 + APP_NAME = "SampleVNF" + # we run the VNF interactively, so the ssh command will timeout after this long def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): super(SampleVNF, self).__init__(name, vnfd) @@ -716,24 +665,28 @@ class SampleVNF(GenericVNF): self.resource_helper = resource_helper_type(self.setup_helper) - self.all_ports = None self.context_cfg = None self.nfvi_context = None self.pipeline_kwargs = {} - self.priv_ports = None - self.pub_ports = None + self.uplink_ports = None + self.downlink_ports = None # TODO(esm): make QueueFileWrapper invert-able so that we # never have to manage the queues self.q_in = Queue() self.q_out = Queue() self.queue_wrapper = None self.run_kwargs = {} - self.scenario_cfg = None - self.tg_port_pairs = None self.used_drivers = {} self.vnf_port_pairs = None self._vnf_process = None + def _build_ports(self): + self._port_pairs = PortPairs(self.vnfd_helper.interfaces) + self.networks = self._port_pairs.networks + self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports) + self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports) + self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports) + def _get_route_data(self, route_index, route_type): route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', [])) for _ in range(route_index): @@ -772,7 +725,8 @@ class SampleVNF(GenericVNF): def _start_vnf(self): self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT) - self._vnf_process = Process(target=self._run) + name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) + self._vnf_process = Process(name=name, target=self._run) self._vnf_process.start() def _vnf_up_post(self): @@ -784,7 +738,9 @@ class SampleVNF(GenericVNF): self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name]) # self.nfvi_context = None - self.deploy_helper.deploy_vnfs(self.APP_NAME) + # vnf deploy is unsupported, use ansible playbooks + if self.scenario_helper.options.get("vnf_deploy", False): + self.deploy_helper.deploy_vnfs(self.APP_NAME) self.resource_helper.setup() self._start_vnf() @@ -811,7 +767,10 @@ class SampleVNF(GenericVNF): self.APP_NAME) LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME) - time.sleep(1) + time.sleep(self.WAIT_TIME_FOR_SCRIPT) + # Send ENTER to display a new prompt in case the prompt text was corrupted + # by other VNF output + self.q_in.put('\r\n') def _build_run_kwargs(self): self.run_kwargs = { @@ -819,6 +778,7 @@ class SampleVNF(GenericVNF): 'stdout': self.queue_wrapper, 'keep_stdin_open': True, 'pty': True, + 'timeout': self.scenario_helper.timeout, } def _build_config(self): @@ -829,7 +789,7 @@ class SampleVNF(GenericVNF): self.ssh_helper.drop_connection() cmd = self._build_config() # kill before starting - self.ssh_helper.execute("pkill {}".format(self.APP_NAME)) + self.setup_helper.kill_vnf() LOG.debug(cmd) self._build_run_kwargs() @@ -851,11 +811,15 @@ class SampleVNF(GenericVNF): def terminate(self): self.vnf_execute("quit") - if self._vnf_process: - self._vnf_process.terminate() - self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME) + self.setup_helper.kill_vnf() self._tear_down() self.resource_helper.stop_collect() + if self._vnf_process is not None: + # be proper and join first before we kill + LOG.debug("joining before terminate %s", self._vnf_process.name) + self._vnf_process.join(PROCESS_JOIN_TIMEOUT) + self._vnf_process.terminate() + # no terminate children here because we share processes with tg def get_stats(self, *args, **kwargs): """ @@ -869,6 +833,8 @@ class SampleVNF(GenericVNF): return out def collect_kpi(self): + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) stats = self.get_stats() m = re.search(self.COLLECT_KPI, stats, re.MULTILINE) if m: @@ -893,7 +859,6 @@ class SampleVNFTrafficGen(GenericTrafficGen): def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): super(SampleVNFTrafficGen, self).__init__(name, vnfd) self.bin_path = get_nsb_option('bin_path', '') - self.name = "tgen__1" # name in topology file self.scenario_helper = ScenarioHelper(self.name) self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True) @@ -912,7 +877,6 @@ class SampleVNFTrafficGen(GenericTrafficGen): self.runs_traffic = True self.traffic_finished = False - self.tg_port_pairs = None self._tg_process = None self._traffic_process = None @@ -922,15 +886,17 @@ class SampleVNFTrafficGen(GenericTrafficGen): def instantiate(self, scenario_cfg, context_cfg): self.scenario_helper.scenario_cfg = scenario_cfg - self.resource_helper.generate_cfg() - self.setup_helper.setup_vnf_environment() self.resource_helper.setup() + # must generate_cfg after DPDK bind because we need port number + self.resource_helper.generate_cfg() LOG.info("Starting %s server...", self.APP_NAME) - self._tg_process = Process(target=self._start_server) + name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) + self._tg_process = Process(name=name, target=self._start_server) self._tg_process.start() def wait_for_instantiate(self): + # overridden by subclasses return self._wait_for_process() def _check_status(self): @@ -948,6 +914,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): return self._tg_process.exitcode def _traffic_runner(self, traffic_profile): + # always drop connections first thing in new processes + # so we don't get paramiko errors + self.ssh_helper.drop_connection() LOG.info("Starting %s client...", self.APP_NAME) self.resource_helper.run_traffic(traffic_profile) @@ -959,12 +928,17 @@ class SampleVNFTrafficGen(GenericTrafficGen): :param traffic_profile: :return: True/False """ - self._traffic_process = Process(target=self._traffic_runner, + name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, + os.getpid()) + self._traffic_process = Process(name=name, target=self._traffic_runner, args=(traffic_profile,)) self._traffic_process.start() # Wait for traffic process to start while self.resource_helper.client_started.value == 0: time.sleep(self.RUN_WAIT) + # what if traffic process takes a few seconds to start? + if not self._traffic_process.is_alive(): + break return self._traffic_process.is_alive() @@ -987,6 +961,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): pass def collect_kpi(self): + # check if the tg processes have exited + for proc in (self._tg_process, self._traffic_process): + check_if_process_failed(proc) result = self.resource_helper.collect_kpi() LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result @@ -997,5 +974,15 @@ class SampleVNFTrafficGen(GenericTrafficGen): :return: True/False """ self.traffic_finished = True + # we must kill client before we kill the server, or the client will raise exception if self._traffic_process is not None: + # be proper and try to join before terminating + LOG.debug("joining before terminate %s", self._traffic_process.name) + self._traffic_process.join(PROCESS_JOIN_TIMEOUT) self._traffic_process.terminate() + if self._tg_process is not None: + # be proper and try to join before terminating + LOG.debug("joining before terminate %s", self._tg_process.name) + self._tg_process.join(PROCESS_JOIN_TIMEOUT) + self._tg_process.terminate() + # no terminate children here because we share processes with vnf