Merge "Improve "get_server" function in Kubernetes context"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / vpe_vnf.py
index e9e80bd..bfff45c 100644 (file)
 
 from __future__ import absolute_import
 from __future__ import print_function
-import tempfile
-import time
+
+
 import os
 import logging
 import re
-from multiprocessing import Queue
-import multiprocessing
-import ipaddress
-import six
+import posixpath
 
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
-from yardstick.network_services.utils import provision_tool
-from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
-from yardstick.network_services.nfvi.resource import ResourceProfile
+from six.moves import configparser, zip
 
-LOG = logging.getLogger(__name__)
-VPE_PIPELINE_COMMAND = '{tool_path} -p 0x3 -f {cfg_file} -s {script}'
-CORES = ['0', '1', '2']
-WAIT_TIME = 20
+from yardstick.common.process import check_if_process_failed
+from yardstick.network_services.helpers.samplevnf_helper import PortPairs
+from yardstick.network_services.pipeline import PipelineRules
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper
+from yardstick.benchmark.contexts import base as ctx_base
 
+LOG = logging.getLogger(__name__)
 
-class VpeApproxVnf(GenericVNF):
+VPE_PIPELINE_COMMAND = "sudo {tool_path} -p {port_mask_hex} -f {cfg_file} -s {script} {hwlb}"
+
+VPE_COLLECT_KPI = """\
+Pkts in:\\s(\\d+)\r\n\
+\tPkts dropped by AH:\\s(\\d+)\r\n\
+\tPkts dropped by other:\\s(\\d+)\
+"""
+
+
+class ConfigCreate(object):
+
+    @staticmethod
+    def vpe_tmq(config, index):
+        tm_q = 'TM{0}'.format(index)
+        config.add_section(tm_q)
+        config.set(tm_q, 'burst_read', '24')
+        config.set(tm_q, 'burst_write', '32')
+        config.set(tm_q, 'cfg', '/tmp/full_tm_profile_10G.cfg')
+        return config
+
+    def __init__(self, vnfd_helper, socket):
+        super(ConfigCreate, self).__init__()
+        self.sw_q = -1
+        self.sink_q = -1
+        self.n_pipeline = 1
+        self.vnfd_helper = vnfd_helper
+        self.uplink_ports = self.vnfd_helper.port_pairs.uplink_ports
+        self.downlink_ports = self.vnfd_helper.port_pairs.downlink_ports
+        self.pipeline_per_port = 9
+        self.socket = socket
+        self._dpdk_port_to_link_id_map = None
+
+    @property
+    def dpdk_port_to_link_id_map(self):
+        # we need interface name -> DPDK port num (PMD ID) -> LINK ID
+        # LINK ID -> PMD ID is governed by the port mask
+        # LINK instances are created implicitly based on the PORT_MASK application startup
+        # argument. LINK0 is the first port enabled in the PORT_MASK, port 1 is the next one,
+        # etc. The LINK ID is different than the DPDK PMD-level NIC port ID, which is the actual
+        #  position in the bitmask mentioned above. For example, if bit 5 is the first bit set
+        # in the bitmask, then LINK0 is having the PMD ID of 5. This mechanism creates a
+        # contiguous LINK ID space and isolates the configuration file against changes in the
+        # board PCIe slots where NICs are plugged in.
+        if self._dpdk_port_to_link_id_map is None:
+            self._dpdk_port_to_link_id_map = {}
+            for link_id, port_name in enumerate(sorted(self.vnfd_helper.port_pairs.all_ports,
+                                                       key=self.vnfd_helper.port_num)):
+                self._dpdk_port_to_link_id_map[port_name] = link_id
+        return self._dpdk_port_to_link_id_map
+
+    def vpe_initialize(self, config):
+        config.add_section('EAL')
+        config.set('EAL', 'log_level', '0')
+
+        config.add_section('PIPELINE0')
+        config.set('PIPELINE0', 'type', 'MASTER')
+        config.set('PIPELINE0', 'core', 's%sC0' % self.socket)
+
+        config.add_section('MEMPOOL0')
+        config.set('MEMPOOL0', 'pool_size', '256K')
+
+        config.add_section('MEMPOOL1')
+        config.set('MEMPOOL1', 'pool_size', '2M')
+        return config
+
+    def vpe_rxq(self, config):
+        for port in self.downlink_ports:
+            new_section = 'RXQ{0}.0'.format(self.dpdk_port_to_link_id_map[port])
+            config.add_section(new_section)
+            config.set(new_section, 'mempool', 'MEMPOOL1')
+
+        return config
+
+    def get_sink_swq(self, parser, pipeline, k, index):
+        sink = ""
+        pktq = parser.get(pipeline, k)
+        if "SINK" in pktq:
+            self.sink_q += 1
+            sink = " SINK{0}".format(self.sink_q)
+        if "TM" in pktq:
+            sink = " TM{0}".format(index)
+        pktq = "SWQ{0}{1}".format(self.sw_q, sink)
+        return pktq
+
+    def vpe_upstream(self, vnf_cfg, index=0):  # pragma: no cover
+        # NOTE(ralonsoh): this function must be covered in UTs.
+        parser = configparser.ConfigParser()
+        parser.read(os.path.join(vnf_cfg, 'vpe_upstream'))
+
+        for pipeline in parser.sections():
+            for k, v in parser.items(pipeline):
+                if k == "pktq_in":
+                    if "RXQ" in v:
+                        port = self.dpdk_port_to_link_id_map[self.uplink_ports[index]]
+                        value = "RXQ{0}.0".format(port)
+                    else:
+                        value = self.get_sink_swq(parser, pipeline, k, index)
+
+                    parser.set(pipeline, k, value)
+
+                elif k == "pktq_out":
+                    if "TXQ" in v:
+                        port = self.dpdk_port_to_link_id_map[self.downlink_ports[index]]
+                        value = "TXQ{0}.0".format(port)
+                    else:
+                        self.sw_q += 1
+                        value = self.get_sink_swq(parser, pipeline, k, index)
+
+                    parser.set(pipeline, k, value)
+
+            new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+            if new_pipeline != pipeline:
+                parser._sections[new_pipeline] = parser._sections[pipeline]
+                parser._sections.pop(pipeline)
+            self.n_pipeline += 1
+        return parser
+
+    def vpe_downstream(self, vnf_cfg, index):  # pragma: no cover
+        # NOTE(ralonsoh): this function must be covered in UTs.
+        parser = configparser.ConfigParser()
+        parser.read(os.path.join(vnf_cfg, 'vpe_downstream'))
+        for pipeline in parser.sections():
+            for k, v in parser.items(pipeline):
+
+                if k == "pktq_in":
+                    port = self.dpdk_port_to_link_id_map[self.downlink_ports[index]]
+                    if "RXQ" not in v:
+                        value = self.get_sink_swq(parser, pipeline, k, index)
+                    elif "TM" in v:
+                        value = "RXQ{0}.0 TM{1}".format(port, index)
+                    else:
+                        value = "RXQ{0}.0".format(port)
+
+                    parser.set(pipeline, k, value)
+
+                if k == "pktq_out":
+                    port = self.dpdk_port_to_link_id_map[self.uplink_ports[index]]
+                    if "TXQ" not in v:
+                        self.sw_q += 1
+                        value = self.get_sink_swq(parser, pipeline, k, index)
+                    elif "TM" in v:
+                        value = "TXQ{0}.0 TM{1}".format(port, index)
+                    else:
+                        value = "TXQ{0}.0".format(port)
+
+                    parser.set(pipeline, k, value)
+
+            new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+            if new_pipeline != pipeline:
+                parser._sections[new_pipeline] = parser._sections[pipeline]
+                parser._sections.pop(pipeline)
+            self.n_pipeline += 1
+        return parser
+
+    def create_vpe_config(self, vnf_cfg):
+        config = configparser.ConfigParser()
+        vpe_cfg = os.path.join("/tmp/vpe_config")
+        with open(vpe_cfg, 'w') as cfg_file:
+            config = self.vpe_initialize(config)
+            config = self.vpe_rxq(config)
+            config.write(cfg_file)
+            for index, _ in enumerate(self.uplink_ports):
+                config = self.vpe_upstream(vnf_cfg, index)
+                config.write(cfg_file)
+                config = self.vpe_downstream(vnf_cfg, index)
+                config = self.vpe_tmq(config, index)
+                config.write(cfg_file)
+
+    def generate_vpe_script(self, interfaces):
+        rules = PipelineRules(pipeline_id=1)
+        for uplink_port, downlink_port in zip(self.uplink_ports, self.downlink_ports):
+
+            uplink_intf = \
+                next(intf["virtual-interface"] for intf in interfaces
+                     if intf["name"] == uplink_port)
+            downlink_intf = \
+                next(intf["virtual-interface"] for intf in interfaces
+                     if intf["name"] == downlink_port)
+
+            dst_port0_ip = uplink_intf["dst_ip"]
+            dst_port1_ip = downlink_intf["dst_ip"]
+            dst_port0_mac = uplink_intf["dst_mac"]
+            dst_port1_mac = downlink_intf["dst_mac"]
+
+            rules.add_firewall_script(dst_port0_ip)
+            rules.next_pipeline()
+            rules.add_flow_classification_script()
+            rules.next_pipeline()
+            rules.add_flow_action()
+            rules.next_pipeline()
+            rules.add_flow_action2()
+            rules.next_pipeline()
+            rules.add_route_script(dst_port1_ip, dst_port1_mac)
+            rules.next_pipeline()
+            rules.add_route_script2(dst_port0_ip, dst_port0_mac)
+            rules.next_pipeline(num=4)
+
+        return rules.get_string()
+
+    def generate_tm_cfg(self, vnf_cfg):
+        vnf_cfg = os.path.join(vnf_cfg, "full_tm_profile_10G.cfg")
+        if os.path.exists(vnf_cfg):
+            return open(vnf_cfg).read()
+
+
+class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper):
+
+    APP_NAME = 'vPE_vnf'
+    CFG_CONFIG = "/tmp/vpe_config"
+    CFG_SCRIPT = "/tmp/vpe_script"
+    TM_CONFIG = "/tmp/full_tm_profile_10G.cfg"
+    CORES = ['0', '1', '2', '3', '4', '5']
+    PIPELINE_COMMAND = VPE_PIPELINE_COMMAND
+
+    def _build_vnf_ports(self):
+        self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
+        self.uplink_ports = self._port_pairs.uplink_ports
+        self.downlink_ports = self._port_pairs.downlink_ports
+        self.all_ports = self._port_pairs.all_ports
+
+    def build_config(self):
+        vpe_vars = {
+            "bin_path": self.ssh_helper.bin_path,
+            "socket": self.socket,
+        }
+
+        self._build_vnf_ports()
+        vpe_conf = ConfigCreate(self.vnfd_helper, self.socket)
+        vpe_conf.create_vpe_config(self.scenario_helper.vnf_cfg)
+
+        config_basename = posixpath.basename(self.CFG_CONFIG)
+        script_basename = posixpath.basename(self.CFG_SCRIPT)
+        tm_basename = posixpath.basename(self.TM_CONFIG)
+        with open(self.CFG_CONFIG) as handle:
+            vpe_config = handle.read()
+
+        self.ssh_helper.upload_config_file(config_basename, vpe_config.format(**vpe_vars))
+
+        vpe_script = vpe_conf.generate_vpe_script(self.vnfd_helper.interfaces)
+        self.ssh_helper.upload_config_file(script_basename, vpe_script.format(**vpe_vars))
+
+        tm_config = vpe_conf.generate_tm_cfg(self.scenario_helper.vnf_cfg)
+        self.ssh_helper.upload_config_file(tm_basename, tm_config)
+
+        LOG.info("Provision and start the %s", self.APP_NAME)
+        LOG.info(self.CFG_CONFIG)
+        LOG.info(self.CFG_SCRIPT)
+        self._build_pipeline_kwargs()
+        return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
+
+
+class VpeApproxVnf(SampleVNF):
     """ This class handles vPE VNF model-driver definitions """
 
-    def __init__(self, vnfd):
-        super(VpeApproxVnf, self).__init__(vnfd)
-        self.socket = None
-        self.q_in = Queue()
-        self.q_out = Queue()
-        self.vnf_cfg = None
-        self._vnf_process = None
-        self.connection = None
-        self.resource = None
-
-    def _resource_collect_start(self):
-        self.resource.initiate_systemagent(self.bin_path)
-        self.resource.start()
-
-    def _resource_collect_stop(self):
-        self.resource.stop()
+    APP_NAME = 'vPE_vnf'
+    APP_WORD = 'vpe'
+    COLLECT_KPI = VPE_COLLECT_KPI
+    WAIT_TIME = 20
 
-    def _collect_resource_kpi(self):
-        result = {}
+    def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+        if setup_env_helper_type is None:
+            setup_env_helper_type = VpeApproxSetupEnvHelper
 
-        status = self.resource.check_if_sa_running("collectd")[0]
-        if status:
-            result = self.resource.amqp_collect_nfvi_kpi()
+        super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type)
 
-        result = {"core": result}
-
-        return result
-
-    @classmethod
-    def __setup_hugepages(cls, connection):
-        hugepages = \
-            connection.execute(
-                "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
-        hugepages = hugepages.rstrip()
-
-        memory_path = \
-            '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
-        connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
-
-        pages = 16384 if hugepages.rstrip() == "2048kB" else 16
-        connection.execute("echo %s > %s" % (pages, memory_path))
-
-    def setup_vnf_environment(self, connection):
-        ''' setup dpdk environment needed for vnf to run '''
-
-        self.__setup_hugepages(connection)
-        connection.execute("modprobe uio && modprobe igb_uio")
-
-        exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
-        if exit_status == 0:
-            return
-
-        dpdk = os.path.join(self.bin_path, "dpdk-16.07")
-        dpdk_setup = \
-            provision_tool(self.connection,
-                           os.path.join(self.bin_path, "nsb_setup.sh"))
-        status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
-        if status:
-            connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
-
-    def _get_cpu_sibling_list(self):
-        cpu_topo = []
-        for core in CORES:
-            sys_cmd = \
-                "/sys/devices/system/cpu/cpu%s/topology/thread_siblings_list" \
-                % core
-            cpuid = \
-                self.connection.execute("awk -F: '{ print $1 }' < %s" %
-                                        sys_cmd)[1]
-            cpu_topo += \
-                [(idx) if idx.isdigit() else idx for idx in cpuid.split(',')]
-
-        return [cpu.strip() for cpu in cpu_topo]
-
-    def scale(self, flavor=""):
-        ''' scale vnfbased on flavor input '''
-        super(VpeApproxVnf, self).scale(flavor)
-
-    def instantiate(self, scenario_cfg, context_cfg):
-        vnf_cfg = scenario_cfg['vnf_options']['vpe']['cfg']
-
-        mgmt_interface = self.vnfd["mgmt-interface"]
-        self.connection = ssh.SSH.from_node(mgmt_interface)
-
-        self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc'])
-
-        self.setup_vnf_environment(self.connection)
-
-        cores = self._get_cpu_sibling_list()
-        self.resource = ResourceProfile(self.vnfd, cores)
-
-        self.connection.execute("pkill vPE_vnf")
-        dpdk_nic_bind = \
-            provision_tool(self.connection,
-                           os.path.join(self.bin_path, "dpdk_nic_bind.py"))
-
-        interfaces = self.vnfd["vdu"][0]['external-interface']
-        self.socket = \
-            next((0 for v in interfaces
-                  if v['virtual-interface']["vpci"][5] == "0"), 1)
-
-        bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
-        for vpci in bound_pci:
-            self.connection.execute(
-                "%s --force -b igb_uio %s" % (dpdk_nic_bind, vpci))
-        queue_wrapper = \
-            QueueFileWrapper(self.q_in, self.q_out, "pipeline>")
-        self._vnf_process = multiprocessing.Process(target=self._run_vpe,
-                                                    args=(queue_wrapper,
-                                                          vnf_cfg,))
-        self._vnf_process.start()
-        buf = []
-        time.sleep(WAIT_TIME)  # Give some time for config to load
-        while True:
-            message = ''
-            while self.q_out.qsize() > 0:
-                buf.append(self.q_out.get())
-                message = ''.join(buf)
-                if "pipeline>" in message:
-                    LOG.info("VPE VNF is up and running.")
-                    queue_wrapper.clear()
-                    self._resource_collect_start()
-                    return self._vnf_process.exitcode
-                if "PANIC" in message:
-                    raise RuntimeError("Error starting vPE VNF.")
-
-            LOG.info("Waiting for VNF to start.. ")
-            time.sleep(3)
-            if not self._vnf_process.is_alive():
-                raise RuntimeError("vPE VNF process died.")
-
-    def _get_ports_gateway(self, name):
-        if 'routing_table' in self.vnfd['vdu'][0]:
-            routing_table = self.vnfd['vdu'][0]['routing_table']
-
-            for route in routing_table:
-                if name == route['if']:
-                    return route['gateway']
-
-    def terminate(self):
-        self.execute_command("quit")
-        if self._vnf_process:
-            self._vnf_process.terminate()
-
-    def _run_vpe(self, filewrapper, vnf_cfg):
-        mgmt_interface = self.vnfd["mgmt-interface"]
-
-        self.connection = ssh.SSH.from_node(mgmt_interface)
-        self.connection.wait()
-
-        interfaces = self.vnfd["vdu"][0]['external-interface']
-        port0_ip = ipaddress.ip_interface(six.text_type(
-            "%s/%s" % (interfaces[0]["virtual-interface"]["local_ip"],
-                       interfaces[0]["virtual-interface"]["netmask"])))
-        port1_ip = ipaddress.ip_interface(six.text_type(
-            "%s/%s" % (interfaces[1]["virtual-interface"]["local_ip"],
-                       interfaces[1]["virtual-interface"]["netmask"])))
-        dst_port0_ip = ipaddress.ip_interface(
-            u"%s/%s" % (interfaces[0]["virtual-interface"]["dst_ip"],
-                        interfaces[0]["virtual-interface"]["netmask"]))
-        dst_port1_ip = ipaddress.ip_interface(
-            u"%s/%s" % (interfaces[1]["virtual-interface"]["dst_ip"],
-                        interfaces[1]["virtual-interface"]["netmask"]))
-
-        vpe_vars = {"port0_local_ip": port0_ip.ip.exploded,
-                    "port0_dst_ip": dst_port0_ip.ip.exploded,
-                    "port0_local_ip_hex":
-                    self._ip_to_hex(port0_ip.ip.exploded),
-                    "port0_prefixlen": port0_ip.network.prefixlen,
-                    "port0_netmask": port0_ip.network.netmask.exploded,
-                    "port0_netmask_hex":
-                    self._ip_to_hex(port0_ip.network.netmask.exploded),
-                    "port0_local_mac":
-                    interfaces[0]["virtual-interface"]["local_mac"],
-                    "port0_dst_mac":
-                    interfaces[0]["virtual-interface"]["dst_mac"],
-                    "port0_gateway":
-                    self._get_ports_gateway(interfaces[0]["name"]),
-                    "port0_local_network":
-                    port0_ip.network.network_address.exploded,
-                    "port0_prefix": port0_ip.network.prefixlen,
-                    "port1_local_ip": port1_ip.ip.exploded,
-                    "port1_dst_ip": dst_port1_ip.ip.exploded,
-                    "port1_local_ip_hex":
-                    self._ip_to_hex(port1_ip.ip.exploded),
-                    "port1_prefixlen": port1_ip.network.prefixlen,
-                    "port1_netmask": port1_ip.network.netmask.exploded,
-                    "port1_netmask_hex":
-                    self._ip_to_hex(port1_ip.network.netmask.exploded),
-                    "port1_local_mac":
-                    interfaces[1]["virtual-interface"]["local_mac"],
-                    "port1_dst_mac":
-                    interfaces[1]["virtual-interface"]["dst_mac"],
-                    "port1_gateway":
-                    self._get_ports_gateway(interfaces[1]["name"]),
-                    "port1_local_network":
-                    port1_ip.network.network_address.exploded,
-                    "port1_prefix": port1_ip.network.prefixlen,
-                    "port0_local_ip6": self._get_port0localip6(),
-                    "port1_local_ip6": self._get_port1localip6(),
-                    "port0_prefixlen6": self._get_port0prefixlen6(),
-                    "port1_prefixlen6": self._get_port1prefixlen6(),
-                    "port0_gateway6": self._get_port0gateway6(),
-                    "port1_gateway6": self._get_port1gateway6(),
-                    "port0_dst_ip_hex6": self._get_port0localip6(),
-                    "port1_dst_ip_hex6": self._get_port1localip6(),
-                    "port0_dst_netmask_hex6": self._get_port0prefixlen6(),
-                    "port1_dst_netmask_hex6": self._get_port1prefixlen6(),
-                    "bin_path": self.bin_path,
-                    "socket": self.socket}
-
-        for cfg in os.listdir(vnf_cfg):
-            vpe_config = ""
-            with open(os.path.join(vnf_cfg, cfg), 'r') as vpe_cfg:
-                vpe_config = vpe_cfg.read()
-
-            self._provide_config_file(cfg, vpe_config, vpe_vars)
-
-        LOG.info("Provision and start the vPE")
-        tool_path = provision_tool(self.connection,
-                                   os.path.join(self.bin_path, "vPE_vnf"))
-        cmd = VPE_PIPELINE_COMMAND.format(cfg_file="/tmp/vpe_config",
-                                          script="/tmp/vpe_script",
-                                          tool_path=tool_path)
-        self.connection.run(cmd, stdin=filewrapper, stdout=filewrapper,
-                            keep_stdin_open=True, pty=True)
-
-    def _provide_config_file(self, prefix, template, args):
-        cfg, cfg_content = tempfile.mkstemp()
-        cfg = os.fdopen(cfg, "w+")
-        cfg.write(template.format(**args))
-        cfg.close()
-        cfg_file = "/tmp/%s" % prefix
-        self.connection.put(cfg_content, cfg_file)
-        return cfg_file
-
-    def execute_command(self, cmd):
-        ''' send cmd to vnf process '''
-        LOG.info("VPE command: %s", cmd)
-        output = []
-        if self.q_in:
-            self.q_in.put(cmd + "\r\n")
-            time.sleep(3)
-            while self.q_out.qsize() > 0:
-                output.append(self.q_out.get())
-        return "".join(output)
+    def get_stats(self, *args, **kwargs):
+        raise NotImplementedError
 
     def collect_kpi(self):
-        result = self.get_stats_vpe()
-        collect_stats = self._collect_resource_kpi()
-        result["collect_stats"] = collect_stats
-        LOG.debug("vPE collet Kpis: %s", result)
-        return result
-
-    def get_stats_vpe(self):
-        ''' get vpe statistics '''
-        result = {'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0,
-                  'pkt_in_down_stream': 0, 'pkt_drop_down_stream': 0}
-        up_stat_commands = ['p 5 stats port in 0', 'p 5 stats port out 0',
-                            'p 5 stats port out 1']
-        down_stat_commands = ['p 9 stats port in 0', 'p 9 stats port out 0']
-        pattern = \
-            "Pkts in:\\s(\\d+)\\r\\n\\tPkts dropped by " \
-            "AH:\\s(\\d+)\\r\\n\\tPkts dropped by other:\\s(\\d+)"
-
-        for cmd in up_stat_commands:
-            stats = self.execute_command(cmd)
-            match = re.search(pattern, stats, re.MULTILINE)
-            if match:
-                result["pkt_in_up_stream"] = \
-                    result.get("pkt_in_up_stream", 0) + int(match.group(1))
-                result["pkt_drop_up_stream"] = \
-                    result.get("pkt_drop_up_stream", 0) + \
-                    int(match.group(2)) + int(match.group(3))
-
-        for cmd in down_stat_commands:
-            stats = self.execute_command(cmd)
-            match = re.search(pattern, stats, re.MULTILINE)
-            if match:
-                result["pkt_in_down_stream"] = \
-                    result.get("pkt_in_down_stream", 0) + int(match.group(1))
-                result["pkt_drop_down_stream"] = \
-                    result.get("pkt_drop_down_stream", 0) + \
-                    int(match.group(2)) + int(match.group(3))
+        # we can't get KPIs if the VNF is down
+        check_if_process_failed(self._vnf_process)
+        physical_node = ctx_base.Context.get_physical_node_from_server(
+            self.scenario_helper.nodes[self.name])
+
+        result = {
+            "physical_node": physical_node,
+            'pkt_in_up_stream': 0,
+            'pkt_drop_up_stream': 0,
+            'pkt_in_down_stream': 0,
+            'pkt_drop_down_stream': 0,
+            'collect_stats': self.resource_helper.collect_kpi(),
+        }
+
+        indexes_in = [1]
+        indexes_drop = [2, 3]
+        command = 'p {0} stats port {1} 0'
+        for index, direction in ((5, 'up'), (9, 'down')):
+            key_in = "pkt_in_{0}_stream".format(direction)
+            key_drop = "pkt_drop_{0}_stream".format(direction)
+            for mode in ('in', 'out'):
+                stats = self.vnf_execute(command.format(index, mode))
+                match = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+                if not match:
+                    continue
+                result[key_in] += sum(int(match.group(x)) for x in indexes_in)
+                result[key_drop] += sum(int(match.group(x)) for x in indexes_drop)
+
+        LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
         return result