Merge "update docker version to 16.04"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
index 96e7030..20e5895 100644 (file)
@@ -22,17 +22,16 @@ import os
 import re
 import subprocess
 from collections import Mapping
-
 from multiprocessing import Queue, Value, Process
 
 from six.moves import cStringIO
 
 from yardstick.benchmark.contexts.base import Context
 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
-from yardstick.network_services.helpers.cpu import CpuSysCores
+from yardstick.common.process import check_if_process_failed
 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
-from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper
+from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
 from yardstick.network_services.nfvi.resource import ResourceProfile
 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
@@ -51,6 +50,8 @@ LOG = logging.getLogger(__name__)
 
 
 REMOTE_TMP = "/tmp"
+DEFAULT_VNF_TIMEOUT = 3600
+PROCESS_JOIN_TIMEOUT = 3
 
 
 class VnfSshHelper(AutoConnectSSH):
@@ -94,7 +95,6 @@ class SetupEnvHelper(object):
 
     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
-    CORES = []
     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
     PIPELINE_COMMAND = ''
     VNF_TYPE = "SAMPLE"
@@ -105,13 +105,6 @@ class SetupEnvHelper(object):
         self.ssh_helper = ssh_helper
         self.scenario_helper = scenario_helper
 
-    def _get_ports_gateway(self, name):
-        routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
-        for route in routing_table:
-            if name == route['if']:
-                return route['gateway']
-        return None
-
     def build_config(self):
         raise NotImplementedError
 
@@ -130,9 +123,6 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
     APP_NAME = 'DpdkVnf'
     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
 
-    HW_DEFAULT_CORE = 3
-    SW_DEFAULT_CORE = 2
-
     @staticmethod
     def _update_packet_type(ip_pipeline_cfg, traffic_options):
         match_str = 'pkt_type = ipv4'
@@ -245,51 +235,22 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
             'tool_path': tool_path,
         }
 
-    def _get_app_cpu(self):
-        if self.CORES:
-            return self.CORES
-
-        vnf_cfg = self.scenario_helper.vnf_cfg
-        sys_obj = CpuSysCores(self.ssh_helper)
-        self.sys_cpu = sys_obj.get_core_socket()
-        num_core = int(vnf_cfg["worker_threads"])
-        if vnf_cfg.get("lb_config", "SW") == 'HW':
-            num_core += self.HW_DEFAULT_CORE
-        else:
-            num_core += self.SW_DEFAULT_CORE
-        app_cpu = self.sys_cpu[str(self.socket)][:num_core]
-        return app_cpu
-
-    def _get_cpu_sibling_list(self, cores=None):
-        if cores is None:
-            cores = self._get_app_cpu()
-        sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
-        awk_template = "awk -F: '{ print $1 }' < %s"
-        sys_path = "/sys/devices/system/cpu/"
-        cpu_topology = []
-        try:
-            for core in cores:
-                sys_cmd = sys_cmd_template % (sys_path, core)
-                cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
-                cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
-
-            return cpu_topology
-        except Exception:
-            return []
-
-    def _validate_cpu_cfg(self):
-        return self._get_cpu_sibling_list()
-
     def setup_vnf_environment(self):
         self._setup_dpdk()
-        resource = self._setup_resources()
+        self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
         self.kill_vnf()
+        # bind before _setup_resources so we can use dpdk_port_num
         self._detect_and_bind_drivers()
+        resource = self._setup_resources()
         return resource
 
     def kill_vnf(self):
+        # pkill is not matching, debug with pgrep
+        self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
+        self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
         # have to use exact match
-        self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME)
+        # try using killall to match
+        self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
 
     def _setup_dpdk(self):
         """ setup dpdk environment needed for vnf to run """
@@ -307,10 +268,13 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         if exit_status != 0:
             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
 
-    def _setup_resources(self):
-        interfaces = self.vnfd_helper.interfaces
-        self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
+    def get_collectd_options(self):
+        options = self.scenario_helper.all_options.get("collectd", {})
+        # override with specific node settings
+        options.update(self.scenario_helper.options.get("collectd", {}))
+        return options
 
+    def _setup_resources(self):
         # what is this magic?  how do we know which socket is for which port?
         # what about quad-socket?
         if any(v[5] == "0" for v in self.bound_pci):
@@ -318,9 +282,16 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         else:
             self.socket = 1
 
-        cores = self._validate_cpu_cfg()
-        return ResourceProfile(self.vnfd_helper.mgmt_interface,
-                               interfaces=self.vnfd_helper.interfaces, cores=cores)
+        # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
+        # this won't work because we don't have DPDK port numbers yet
+        ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
+        port_names = (intf["name"] for intf in ports)
+        collectd_options = self.get_collectd_options()
+        plugins = collectd_options.get("plugins", {})
+        # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
+        return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
+                               plugins=plugins, interval=collectd_options.get("interval"),
+                               timeout=self.scenario_helper.timeout)
 
     def _detect_and_bind_drivers(self):
         interfaces = self.vnfd_helper.interfaces
@@ -375,7 +346,7 @@ class ResourceHelper(object):
     def _collect_resource_kpi(self):
         result = {}
         status = self.resource.check_if_sa_running("collectd")[0]
-        if status:
+        if status == 0:
             result = self.resource.amqp_collect_nfvi_kpi()
 
         result = {"core": result}
@@ -412,14 +383,18 @@ class ClientResourceHelper(ResourceHelper):
         self._queue = Queue()
         self._result = {}
         self._terminated = Value('i', 0)
-        self._vpci_ascending = None
 
     def _build_ports(self):
         self.networks = self.vnfd_helper.port_pairs.networks
-        self.priv_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.priv_ports)
-        self.pub_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.pub_ports)
+        self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
+        self.downlink_ports = \
+            self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
 
+    def port_num(self, intf):
+        # by default return port num
+        return self.vnfd_helper.port_num(intf)
+
     def get_stats(self, *args, **kwargs):
         try:
             return self.client.get_stats(*args, **kwargs)
@@ -464,6 +439,9 @@ class ClientResourceHelper(ResourceHelper):
         self._queue.put(samples)
 
     def run_traffic(self, traffic_profile):
+        # if we don't do this we can hang waiting for the queue to drain
+        # have to do this in the subprocess
+        self._queue.cancel_join_thread()
         # fixme: fix passing correct trex config file,
         # instead of searching the default path
         try:
@@ -592,13 +570,7 @@ class SampleVNFDeployHelper(object):
         self.ssh_helper = ssh_helper
         self.vnfd_helper = vnfd_helper
 
-    DISABLE_DEPLOY = True
-
     def deploy_vnfs(self, app_name):
-        # temp disable for now
-        if self.DISABLE_DEPLOY:
-            return
-
         vnf_bin = self.ssh_helper.join_bin_path(app_name)
         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
         if not exit_status:
@@ -658,12 +630,19 @@ class ScenarioHelper(object):
     def topology(self):
         return self.scenario_cfg['topology']
 
+    @property
+    def timeout(self):
+        return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
+
 
 class SampleVNF(GenericVNF):
     """ Class providing file-like API for generic VNF implementation """
 
     VNF_PROMPT = "pipeline>"
     WAIT_TIME = 1
+    WAIT_TIME_FOR_SCRIPT = 10
+    APP_NAME = "SampleVNF"
+    # we run the VNF interactively, so the ssh command will timeout after this long
 
     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
         super(SampleVNF, self).__init__(name, vnfd)
@@ -689,8 +668,8 @@ class SampleVNF(GenericVNF):
         self.context_cfg = None
         self.nfvi_context = None
         self.pipeline_kwargs = {}
-        self.priv_ports = None
-        self.pub_ports = None
+        self.uplink_ports = None
+        self.downlink_ports = None
         # TODO(esm): make QueueFileWrapper invert-able so that we
         #            never have to manage the queues
         self.q_in = Queue()
@@ -704,8 +683,8 @@ class SampleVNF(GenericVNF):
     def _build_ports(self):
         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
         self.networks = self._port_pairs.networks
-        self.priv_ports = self.vnfd_helper.port_nums(self._port_pairs.priv_ports)
-        self.pub_ports = self.vnfd_helper.port_nums(self._port_pairs.pub_ports)
+        self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
+        self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
 
     def _get_route_data(self, route_index, route_type):
@@ -746,7 +725,8 @@ class SampleVNF(GenericVNF):
 
     def _start_vnf(self):
         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
-        self._vnf_process = Process(target=self._run)
+        name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
+        self._vnf_process = Process(name=name, target=self._run)
         self._vnf_process.start()
 
     def _vnf_up_post(self):
@@ -758,7 +738,9 @@ class SampleVNF(GenericVNF):
         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
         # self.nfvi_context = None
 
-        self.deploy_helper.deploy_vnfs(self.APP_NAME)
+        # vnf deploy is unsupported, use ansible playbooks
+        if self.scenario_helper.options.get("vnf_deploy", False):
+            self.deploy_helper.deploy_vnfs(self.APP_NAME)
         self.resource_helper.setup()
         self._start_vnf()
 
@@ -785,9 +767,10 @@ class SampleVNF(GenericVNF):
                                        self.APP_NAME)
 
             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
-            time.sleep(1)
-            # put newline to force new prompt?
-            self.q_in.put("\r\n")
+            time.sleep(self.WAIT_TIME_FOR_SCRIPT)
+            # Send ENTER to display a new prompt in case the prompt text was corrupted
+            # by other VNF output
+            self.q_in.put('\r\n')
 
     def _build_run_kwargs(self):
         self.run_kwargs = {
@@ -795,6 +778,7 @@ class SampleVNF(GenericVNF):
             'stdout': self.queue_wrapper,
             'keep_stdin_open': True,
             'pty': True,
+            'timeout': self.scenario_helper.timeout,
         }
 
     def _build_config(self):
@@ -827,11 +811,15 @@ class SampleVNF(GenericVNF):
 
     def terminate(self):
         self.vnf_execute("quit")
-        if self._vnf_process:
-            self._vnf_process.terminate()
         self.setup_helper.kill_vnf()
         self._tear_down()
         self.resource_helper.stop_collect()
+        if self._vnf_process is not None:
+            # be proper and join first before we kill
+            LOG.debug("joining before terminate %s", self._vnf_process.name)
+            self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
+            self._vnf_process.terminate()
+        # no terminate children here because we share processes with tg
 
     def get_stats(self, *args, **kwargs):
         """
@@ -845,6 +833,8 @@ class SampleVNF(GenericVNF):
         return out
 
     def collect_kpi(self):
+        # we can't get KPIs if the VNF is down
+        check_if_process_failed(self._vnf_process)
         stats = self.get_stats()
         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
         if m:
@@ -869,7 +859,6 @@ class SampleVNFTrafficGen(GenericTrafficGen):
     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
         self.bin_path = get_nsb_option('bin_path', '')
-        self.name = "tgen__1"  # name in topology file
 
         self.scenario_helper = ScenarioHelper(self.name)
         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
@@ -897,17 +886,15 @@ class SampleVNFTrafficGen(GenericTrafficGen):
 
     def instantiate(self, scenario_cfg, context_cfg):
         self.scenario_helper.scenario_cfg = scenario_cfg
-        self.resource_helper.generate_cfg()
         self.resource_helper.setup()
+        # must generate_cfg after DPDK bind because we need port number
+        self.resource_helper.generate_cfg()
 
         LOG.info("Starting %s server...", self.APP_NAME)
-        self._tg_process = Process(target=self._start_server)
+        name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
+        self._tg_process = Process(name=name, target=self._start_server)
         self._tg_process.start()
 
-    def wait_for_instantiate(self):
-        # overridden by subclasses
-        return self._wait_for_process()
-
     def _check_status(self):
         raise NotImplementedError
 
@@ -937,7 +924,9 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         :param traffic_profile:
         :return: True/False
         """
-        self._traffic_process = Process(target=self._traffic_runner,
+        name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
+                                    os.getpid())
+        self._traffic_process = Process(name=name, target=self._traffic_runner,
                                         args=(traffic_profile,))
         self._traffic_process.start()
         # Wait for traffic process to start
@@ -949,25 +938,10 @@ class SampleVNFTrafficGen(GenericTrafficGen):
 
         return self._traffic_process.is_alive()
 
-    def listen_traffic(self, traffic_profile):
-        """ Listen to traffic with the given parameters.
-        Method is non-blocking, returns immediately when traffic process
-        is running. Optional.
-
-        :param traffic_profile:
-        :return: True/False
-        """
-        pass
-
-    def verify_traffic(self, traffic_profile):
-        """ Verify captured traffic after it has ended. Optional.
-
-        :param traffic_profile:
-        :return: dict
-        """
-        pass
-
     def collect_kpi(self):
+        # check if the tg processes have exited
+        for proc in (self._tg_process, self._traffic_process):
+            check_if_process_failed(proc)
         result = self.resource_helper.collect_kpi()
         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
         return result
@@ -978,5 +952,15 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         :return: True/False
         """
         self.traffic_finished = True
+        # we must kill client before we kill the server, or the client will raise exception
         if self._traffic_process is not None:
+            # be proper and try to join before terminating
+            LOG.debug("joining before terminate %s", self._traffic_process.name)
+            self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
             self._traffic_process.terminate()
+        if self._tg_process is not None:
+            # be proper and try to join before terminating
+            LOG.debug("joining before terminate %s", self._tg_process.name)
+            self._tg_process.join(PROCESS_JOIN_TIMEOUT)
+            self._tg_process.terminate()
+        # no terminate children here because we share processes with vnf