Merge "Create Dockerfile to create a yardstick-image of docker"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
index fbaaa0c..3ef7c33 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2016-2017 Intel Corporation
+# Copyright (c) 2016-2018 Intel Corporation
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-""" Base class implementation for generic vnf implementation """
 
 
-from collections import Mapping
 import logging
 from multiprocessing import Queue, Value, Process
 import os
 import posixpath
 import re
 import logging
 from multiprocessing import Queue, Value, Process
 import os
 import posixpath
 import re
-from six.moves import cStringIO
+import uuid
 import subprocess
 import time
 
 import subprocess
 import time
 
+import six
+
 from trex_stl_lib.trex_stl_client import LoggerApi
 from trex_stl_lib.trex_stl_client import STLClient
 from trex_stl_lib.trex_stl_exceptions import STLError
 from yardstick.benchmark.contexts.base import Context
 from trex_stl_lib.trex_stl_client import LoggerApi
 from trex_stl_lib.trex_stl_client import STLClient
 from trex_stl_lib.trex_stl_exceptions import STLError
 from yardstick.benchmark.contexts.base import Context
-from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
 from yardstick.common import exceptions as y_exceptions
 from yardstick.common.process import check_if_process_failed
 from yardstick.common import exceptions as y_exceptions
 from yardstick.common.process import check_if_process_failed
-from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
-from yardstick.network_services.helpers.samplevnf_helper import PortPairs
+from yardstick.common import utils
+from yardstick.common import yaml_loader
+from yardstick.network_services import constants
+from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
 from yardstick.network_services.nfvi.resource import ResourceProfile
 from yardstick.network_services.utils import get_nsb_option
 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
 from yardstick.network_services.nfvi.resource import ResourceProfile
 from yardstick.network_services.utils import get_nsb_option
-from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
-from yardstick.ssh import AutoConnectSSH
-
-
-DPDK_VERSION = "dpdk-16.07"
+from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
+from yardstick.benchmark.contexts.node import NodeContext
 
 LOG = logging.getLogger(__name__)
 
 
 
 LOG = logging.getLogger(__name__)
 
 
-REMOTE_TMP = "/tmp"
-DEFAULT_VNF_TIMEOUT = 3600
-PROCESS_JOIN_TIMEOUT = 3
-
-
-class VnfSshHelper(AutoConnectSSH):
-
-    def __init__(self, node, bin_path, wait=None):
-        self.node = node
-        kwargs = self.args_from_node(self.node)
-        if wait:
-            kwargs.setdefault('wait', wait)
-
-        super(VnfSshHelper, self).__init__(**kwargs)
-        self.bin_path = bin_path
-
-    @staticmethod
-    def get_class():
-        # must return static class name, anything else refers to the calling class
-        # i.e. the subclass, not the superclass
-        return VnfSshHelper
-
-    def copy(self):
-        # this copy constructor is different from SSH classes, since it uses node
-        return self.get_class()(self.node, self.bin_path)
-
-    def upload_config_file(self, prefix, content):
-        cfg_file = os.path.join(REMOTE_TMP, prefix)
-        LOG.debug(content)
-        file_obj = cStringIO(content)
-        self.put_file_obj(file_obj, cfg_file)
-        return cfg_file
-
-    def join_bin_path(self, *args):
-        return os.path.join(self.bin_path, *args)
-
-    def provision_tool(self, tool_path=None, tool_file=None):
-        if tool_path is None:
-            tool_path = self.bin_path
-        return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
-
-
 class SetupEnvHelper(object):
 
 class SetupEnvHelper(object):
 
-    CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
-    CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
+    CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
+    CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
     PIPELINE_COMMAND = ''
     VNF_TYPE = "SAMPLE"
     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
     PIPELINE_COMMAND = ''
     VNF_TYPE = "SAMPLE"
@@ -101,6 +58,7 @@ class SetupEnvHelper(object):
         self.vnfd_helper = vnfd_helper
         self.ssh_helper = ssh_helper
         self.scenario_helper = scenario_helper
         self.vnfd_helper = vnfd_helper
         self.ssh_helper = ssh_helper
         self.scenario_helper = scenario_helper
+        self.collectd_options = {}
 
     def build_config(self):
         raise NotImplementedError
 
     def build_config(self):
         raise NotImplementedError
@@ -119,6 +77,7 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
 
     APP_NAME = 'DpdkVnf'
     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
 
     APP_NAME = 'DpdkVnf'
     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
+    NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
 
     @staticmethod
     def _update_packet_type(ip_pipeline_cfg, traffic_options):
 
     @staticmethod
     def _update_packet_type(ip_pipeline_cfg, traffic_options):
@@ -155,24 +114,23 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
 
     def _setup_hugepages(self):
         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
 
     def _setup_hugepages(self):
-        cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
-        hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
-
-        memory_path = \
-            '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
-        self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
-
-        if hugepages == "2048kB":
-            pages = 8192
-        else:
-            pages = 16
-
-        self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
+        meminfo = utils.read_meminfo(self.ssh_helper)
+        hp_size_kb = int(meminfo['Hugepagesize'])
+        hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
+        nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
+        self.ssh_helper.execute('echo %s | sudo tee %s' %
+                                (nr_hugepages, self.NR_HUGEPAGES_PATH))
+        hp = six.BytesIO()
+        self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
+        nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
+        LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
+                 hp_size_kb, nr_hugepages, nr_hugepages_set)
 
     def build_config(self):
         vnf_cfg = self.scenario_helper.vnf_cfg
         task_path = self.scenario_helper.task_path
 
 
     def build_config(self):
         vnf_cfg = self.scenario_helper.vnf_cfg
         task_path = self.scenario_helper.task_path
 
+        config_file = vnf_cfg.get('file')
         lb_count = vnf_cfg.get('lb_count', 3)
         lb_config = vnf_cfg.get('lb_config', 'SW')
         worker_config = vnf_cfg.get('worker_config', '1C/1T')
         lb_count = vnf_cfg.get('lb_count', 3)
         lb_config = vnf_cfg.get('lb_config', 'SW')
         worker_config = vnf_cfg.get('worker_config', '1C/1T')
@@ -185,7 +143,15 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
             'vnf_type': self.VNF_TYPE,
         }
 
             'vnf_type': self.VNF_TYPE,
         }
 
-        config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
+        # read actions/rules from file
+        acl_options = None
+        acl_file_name = self.scenario_helper.options.get('rules')
+        if acl_file_name:
+            with utils.open_relative_file(acl_file_name, task_path) as infile:
+                acl_options = yaml_loader.yaml_load(infile)
+
+        config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
+                                                  task_path)
         config_basename = posixpath.basename(self.CFG_CONFIG)
         script_basename = posixpath.basename(self.CFG_SCRIPT)
         multiport = MultiPortConfig(self.scenario_helper.topology,
         config_basename = posixpath.basename(self.CFG_CONFIG)
         script_basename = posixpath.basename(self.CFG_SCRIPT)
         multiport = MultiPortConfig(self.scenario_helper.topology,
@@ -200,20 +166,33 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
                                     self.socket)
 
         multiport.generate_config()
                                     self.socket)
 
         multiport.generate_config()
-        with open(self.CFG_CONFIG) as handle:
-            new_config = handle.read()
-
-        new_config = self._update_traffic_type(new_config, traffic_options)
-        new_config = self._update_packet_type(new_config, traffic_options)
-
+        if config_file:
+            with utils.open_relative_file(config_file, task_path) as infile:
+                new_config = ['[EAL]']
+                vpci = []
+                for port in self.vnfd_helper.port_pairs.all_ports:
+                    interface = self.vnfd_helper.find_interface(name=port)
+                    vpci.append(interface['virtual-interface']["vpci"])
+                new_config.extend('w = {0}'.format(item) for item in vpci)
+                new_config = '\n'.join(new_config) + '\n' + infile.read()
+        else:
+            with open(self.CFG_CONFIG) as handle:
+                new_config = handle.read()
+            new_config = self._update_traffic_type(new_config, traffic_options)
+            new_config = self._update_packet_type(new_config, traffic_options)
         self.ssh_helper.upload_config_file(config_basename, new_config)
         self.ssh_helper.upload_config_file(script_basename,
         self.ssh_helper.upload_config_file(config_basename, new_config)
         self.ssh_helper.upload_config_file(script_basename,
-                                           multiport.generate_script(self.vnfd_helper))
+            multiport.generate_script(self.vnfd_helper,
+                                      self.get_flows_config(acl_options)))
 
         LOG.info("Provision and start the %s", self.APP_NAME)
         self._build_pipeline_kwargs()
         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
 
 
         LOG.info("Provision and start the %s", self.APP_NAME)
         self._build_pipeline_kwargs()
         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
 
+    def get_flows_config(self, options=None): # pylint: disable=unused-argument
+        """No actions/rules (flows) by default"""
+        return None
+
     def _build_pipeline_kwargs(self):
         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
         # count the number of actual ports in the list of pairs
     def _build_pipeline_kwargs(self):
         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
         # count the number of actual ports in the list of pairs
@@ -225,16 +204,24 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         port_nums = self.vnfd_helper.port_nums(ports)
         # create mask from all the dpdk port numbers
         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
         port_nums = self.vnfd_helper.port_nums(ports)
         # create mask from all the dpdk port numbers
         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
+
+        vnf_cfg = self.scenario_helper.vnf_cfg
+        lb_config = vnf_cfg.get('lb_config', 'SW')
+        worker_threads = vnf_cfg.get('worker_threads', 3)
+        hwlb = ''
+        if lb_config == 'HW':
+            hwlb = ' --hwlb %s' % worker_threads
+
         self.pipeline_kwargs = {
             'cfg_file': self.CFG_CONFIG,
             'script': self.CFG_SCRIPT,
             'port_mask_hex': ports_mask_hex,
             'tool_path': tool_path,
         self.pipeline_kwargs = {
             'cfg_file': self.CFG_CONFIG,
             'script': self.CFG_SCRIPT,
             'port_mask_hex': ports_mask_hex,
             'tool_path': tool_path,
+            'hwlb': hwlb,
         }
 
     def setup_vnf_environment(self):
         self._setup_dpdk()
         }
 
     def setup_vnf_environment(self):
         self._setup_dpdk()
-        self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
         self.kill_vnf()
         # bind before _setup_resources so we can use dpdk_port_num
         self._detect_and_bind_drivers()
         self.kill_vnf()
         # bind before _setup_resources so we can use dpdk_port_num
         self._detect_and_bind_drivers()
@@ -250,27 +237,14 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
 
     def _setup_dpdk(self):
         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
 
     def _setup_dpdk(self):
-        """ setup dpdk environment needed for vnf to run """
-
+        """Setup DPDK environment needed for VNF to run"""
         self._setup_hugepages()
         self._setup_hugepages()
-        self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
+        self.dpdk_bind_helper.load_dpdk_driver()
 
 
-        exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
+        exit_status = self.dpdk_bind_helper.check_dpdk_driver()
         if exit_status == 0:
             return
 
         if exit_status == 0:
             return
 
-        dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
-        dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
-        exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
-        if exit_status != 0:
-            self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
-
-    def get_collectd_options(self):
-        options = self.scenario_helper.all_options.get("collectd", {})
-        # override with specific node settings
-        options.update(self.scenario_helper.options.get("collectd", {}))
-        return options
-
     def _setup_resources(self):
         # what is this magic?  how do we know which socket is for which port?
         # what about quad-socket?
     def _setup_resources(self):
         # what is this magic?  how do we know which socket is for which port?
         # what about quad-socket?
@@ -283,16 +257,29 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         # this won't work because we don't have DPDK port numbers yet
         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
         port_names = (intf["name"] for intf in ports)
         # this won't work because we don't have DPDK port numbers yet
         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
         port_names = (intf["name"] for intf in ports)
-        collectd_options = self.get_collectd_options()
-        plugins = collectd_options.get("plugins", {})
+        plugins = self.collectd_options.get("plugins", {})
+        interval = self.collectd_options.get("interval")
         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
-                               plugins=plugins, interval=collectd_options.get("interval"),
+                               plugins=plugins, interval=interval,
                                timeout=self.scenario_helper.timeout)
 
                                timeout=self.scenario_helper.timeout)
 
+    def _check_interface_fields(self):
+        num_nodes = len(self.scenario_helper.nodes)
+        # OpenStack instance creation time is probably proportional to the number
+        # of instances
+        timeout = 120 * num_nodes
+        dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
+                             self.ssh_helper, timeout)
+        dpdk_node.check()
+
     def _detect_and_bind_drivers(self):
         interfaces = self.vnfd_helper.interfaces
 
     def _detect_and_bind_drivers(self):
         interfaces = self.vnfd_helper.interfaces
 
+        self._check_interface_fields()
+        # check for bound after probe
+        self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
+
         self.dpdk_bind_helper.read_status()
         self.dpdk_bind_helper.save_used_drivers()
 
         self.dpdk_bind_helper.read_status()
         self.dpdk_bind_helper.save_used_drivers()
 
@@ -333,6 +320,7 @@ class ResourceHelper(object):
         self.resource = None
         self.setup_helper = setup_helper
         self.ssh_helper = setup_helper.ssh_helper
         self.resource = None
         self.setup_helper = setup_helper
         self.ssh_helper = setup_helper.ssh_helper
+        self._enable = True
 
     def setup(self):
         self.resource = self.setup_helper.setup_vnf_environment()
 
     def setup(self):
         self.resource = self.setup_helper.setup_vnf_environment()
@@ -340,22 +328,33 @@ class ResourceHelper(object):
     def generate_cfg(self):
         pass
 
     def generate_cfg(self):
         pass
 
+    def update_from_context(self, context, attr_name):
+        """Disable resource helper in case of baremetal context.
+
+        And update appropriate node collectd options in context
+        """
+        if isinstance(context, NodeContext):
+            self._enable = False
+            context.update_collectd_options_for_node(self.setup_helper.collectd_options,
+                                                     attr_name)
+
     def _collect_resource_kpi(self):
         result = {}
         status = self.resource.check_if_system_agent_running("collectd")[0]
     def _collect_resource_kpi(self):
         result = {}
         status = self.resource.check_if_system_agent_running("collectd")[0]
-        if status == 0:
+        if status == 0 and self._enable:
             result = self.resource.amqp_collect_nfvi_kpi()
 
         result = {"core": result}
         return result
 
     def start_collect(self):
             result = self.resource.amqp_collect_nfvi_kpi()
 
         result = {"core": result}
         return result
 
     def start_collect(self):
-        self.resource.initiate_systemagent(self.ssh_helper.bin_path)
-        self.resource.start()
-        self.resource.amqp_process_for_nfvi_kpi()
+        if self._enable:
+            self.resource.initiate_systemagent(self.ssh_helper.bin_path)
+            self.resource.start()
+            self.resource.amqp_process_for_nfvi_kpi()
 
     def stop_collect(self):
 
     def stop_collect(self):
-        if self.resource:
+        if self.resource and self._enable:
             self.resource.stop()
 
     def collect_kpi(self):
             self.resource.stop()
 
     def collect_kpi(self):
@@ -396,51 +395,27 @@ class ClientResourceHelper(ResourceHelper):
         try:
             return self.client.get_stats(*args, **kwargs)
         except STLError:
         try:
             return self.client.get_stats(*args, **kwargs)
         except STLError:
-            LOG.exception("TRex client not connected")
+            LOG.error('TRex client not connected')
             return {}
 
             return {}
 
-    def generate_samples(self, ports, key=None, default=None):
-        # needs to be used ports
-        last_result = self.get_stats(ports)
-        key_value = last_result.get(key, default)
-
-        if not isinstance(last_result, Mapping):  # added for mock unit test
-            self._terminated.value = 1
-            return {}
-
-        samples = {}
-        # recalculate port for interface and see if it matches ports provided
-        for intf in self.vnfd_helper.interfaces:
-            name = intf["name"]
-            port = self.vnfd_helper.port_num(name)
-            if port in ports:
-                xe_value = last_result.get(port, {})
-                samples[name] = {
-                    "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
-                    "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
-                    "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
-                    "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
-                    "in_packets": int(xe_value.get("ipackets", 0)),
-                    "out_packets": int(xe_value.get("opackets", 0)),
-                }
-                if key:
-                    samples[name][key] = key_value
-        return samples
+    def _get_samples(self, ports, port_pg_id=False):
+        raise NotImplementedError()
 
     def _run_traffic_once(self, traffic_profile):
         traffic_profile.execute_traffic(self)
         self.client_started.value = 1
         time.sleep(self.RUN_DURATION)
 
     def _run_traffic_once(self, traffic_profile):
         traffic_profile.execute_traffic(self)
         self.client_started.value = 1
         time.sleep(self.RUN_DURATION)
-        samples = self.generate_samples(traffic_profile.ports)
+        samples = self._get_samples(traffic_profile.ports)
         time.sleep(self.QUEUE_WAIT_TIME)
         self._queue.put(samples)
 
         time.sleep(self.QUEUE_WAIT_TIME)
         self._queue.put(samples)
 
-    def run_traffic(self, traffic_profile):
+    def run_traffic(self, traffic_profile, mq_producer):
         # if we don't do this we can hang waiting for the queue to drain
         # have to do this in the subprocess
         self._queue.cancel_join_thread()
         # fixme: fix passing correct trex config file,
         # instead of searching the default path
         # if we don't do this we can hang waiting for the queue to drain
         # have to do this in the subprocess
         self._queue.cancel_join_thread()
         # fixme: fix passing correct trex config file,
         # instead of searching the default path
+        mq_producer.tg_method_started()
         try:
             self._build_ports()
             self.client = self._connect()
         try:
             self._build_ports()
             self.client = self._connect()
@@ -448,8 +423,11 @@ class ClientResourceHelper(ResourceHelper):
             self.client.remove_all_streams(self.all_ports)  # remove all streams
             traffic_profile.register_generator(self)
 
             self.client.remove_all_streams(self.all_ports)  # remove all streams
             traffic_profile.register_generator(self)
 
+            iteration_index = 0
             while self._terminated.value == 0:
             while self._terminated.value == 0:
+                iteration_index += 1
                 self._run_traffic_once(traffic_profile)
                 self._run_traffic_once(traffic_profile)
+                mq_producer.tg_method_iteration(iteration_index)
 
             self.client.stop(self.all_ports)
             self.client.disconnect()
 
             self.client.stop(self.all_ports)
             self.client.disconnect()
@@ -460,6 +438,8 @@ class ClientResourceHelper(ResourceHelper):
                 return  # return if trex/tg server is stopped.
             raise
 
                 return  # return if trex/tg server is stopped.
             raise
 
+        mq_producer.tg_method_finished()
+
     def terminate(self):
         self._terminated.value = 1  # stop client
 
     def terminate(self):
         self._terminated.value = 1  # stop client
 
@@ -634,7 +614,10 @@ class ScenarioHelper(object):
 
     @property
     def timeout(self):
 
     @property
     def timeout(self):
-        return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
+        test_duration = self.scenario_cfg.get('runner', {}).get('duration',
+            self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
+        test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
+        return test_duration if test_duration > test_timeout else test_timeout
 
 
 class SampleVNF(GenericVNF):
 
 
 class SampleVNF(GenericVNF):
@@ -646,8 +629,9 @@ class SampleVNF(GenericVNF):
     APP_NAME = "SampleVNF"
     # we run the VNF interactively, so the ssh command will timeout after this long
 
     APP_NAME = "SampleVNF"
     # we run the VNF interactively, so the ssh command will timeout after this long
 
-    def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
-        super(SampleVNF, self).__init__(name, vnfd)
+    def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+                 resource_helper_type=None):
+        super(SampleVNF, self).__init__(name, vnfd, task_id)
         self.bin_path = get_nsb_option('bin_path', '')
 
         self.scenario_helper = ScenarioHelper(self.name)
         self.bin_path = get_nsb_option('bin_path', '')
 
         self.scenario_helper = ScenarioHelper(self.name)
@@ -668,7 +652,6 @@ class SampleVNF(GenericVNF):
         self.resource_helper = resource_helper_type(self.setup_helper)
 
         self.context_cfg = None
         self.resource_helper = resource_helper_type(self.setup_helper)
 
         self.context_cfg = None
-        self.nfvi_context = None
         self.pipeline_kwargs = {}
         self.uplink_ports = None
         self.downlink_ports = None
         self.pipeline_kwargs = {}
         self.uplink_ports = None
         self.downlink_ports = None
@@ -682,49 +665,6 @@ class SampleVNF(GenericVNF):
         self.vnf_port_pairs = None
         self._vnf_process = None
 
         self.vnf_port_pairs = None
         self._vnf_process = None
 
-    def _build_ports(self):
-        self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
-        self.networks = self._port_pairs.networks
-        self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
-        self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
-        self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
-
-    def _get_route_data(self, route_index, route_type):
-        route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
-        for _ in range(route_index):
-            next(route_iter, '')
-        return next(route_iter, {}).get(route_type, '')
-
-    def _get_port0localip6(self):
-        return_value = self._get_route_data(0, 'network')
-        LOG.info("_get_port0localip6 : %s", return_value)
-        return return_value
-
-    def _get_port1localip6(self):
-        return_value = self._get_route_data(1, 'network')
-        LOG.info("_get_port1localip6 : %s", return_value)
-        return return_value
-
-    def _get_port0prefixlen6(self):
-        return_value = self._get_route_data(0, 'netmask')
-        LOG.info("_get_port0prefixlen6 : %s", return_value)
-        return return_value
-
-    def _get_port1prefixlen6(self):
-        return_value = self._get_route_data(1, 'netmask')
-        LOG.info("_get_port1prefixlen6 : %s", return_value)
-        return return_value
-
-    def _get_port0gateway6(self):
-        return_value = self._get_route_data(0, 'network')
-        LOG.info("_get_port0gateway6 : %s", return_value)
-        return return_value
-
-    def _get_port1gateway6(self):
-        return_value = self._get_route_data(1, 'network')
-        LOG.info("_get_port1gateway6 : %s", return_value)
-        return return_value
-
     def _start_vnf(self):
         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
     def _start_vnf(self):
         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
@@ -735,10 +675,13 @@ class SampleVNF(GenericVNF):
         pass
 
     def instantiate(self, scenario_cfg, context_cfg):
         pass
 
     def instantiate(self, scenario_cfg, context_cfg):
+        self._update_collectd_options(scenario_cfg, context_cfg)
         self.scenario_helper.scenario_cfg = scenario_cfg
         self.context_cfg = context_cfg
         self.scenario_helper.scenario_cfg = scenario_cfg
         self.context_cfg = context_cfg
-        self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
-        # self.nfvi_context = None
+        self.resource_helper.update_from_context(
+            Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
+            self.scenario_helper.nodes[self.name]
+        )
 
         # vnf deploy is unsupported, use ansible playbooks
         if self.scenario_helper.options.get("vnf_deploy", False):
 
         # vnf deploy is unsupported, use ansible playbooks
         if self.scenario_helper.options.get("vnf_deploy", False):
@@ -746,6 +689,54 @@ class SampleVNF(GenericVNF):
         self.resource_helper.setup()
         self._start_vnf()
 
         self.resource_helper.setup()
         self._start_vnf()
 
+    def _update_collectd_options(self, scenario_cfg, context_cfg):
+        """Update collectd configuration options
+        This function retrieves all collectd options contained in the test case
+
+        definition builds a single dictionary combining them. The following fragment
+        represents a test case with the collectd options and priorities (1 highest, 3 lowest):
+        ---
+        schema: yardstick:task:0.1
+        scenarios:
+        - type: NSPerf
+          nodes:
+            tg__0: trafficgen_1.yardstick
+            vnf__0: vnf.yardstick
+          options:
+            collectd:
+              <options>  # COLLECTD priority 3
+            vnf__0:
+              collectd:
+                plugins:
+                    load
+                <options> # COLLECTD priority 2
+        context:
+          type: Node
+          name: yardstick
+          nfvi_type: baremetal
+          file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
+        """
+        scenario_options = scenario_cfg.get('options', {})
+        generic_options = scenario_options.get('collectd', {})
+        scenario_node_options = scenario_options.get(self.name, {})\
+            .get('collectd', {})
+        context_node_options = context_cfg.get('nodes', {})\
+            .get(self.name, {}).get('collectd', {})
+
+        options = generic_options
+        self._update_options(options, scenario_node_options)
+        self._update_options(options, context_node_options)
+
+        self.setup_helper.collectd_options = options
+
+    def _update_options(self, options, additional_options):
+        """Update collectd options and plugins dictionary"""
+        for k, v in additional_options.items():
+            if isinstance(v, dict) and k in options:
+                options[k].update(v)
+            else:
+                options[k] = v
+
     def wait_for_instantiate(self):
         buf = []
         time.sleep(self.WAIT_TIME)  # Give some time for config to load
     def wait_for_instantiate(self):
         buf = []
         time.sleep(self.WAIT_TIME)  # Give some time for config to load
@@ -761,7 +752,6 @@ class SampleVNF(GenericVNF):
                     LOG.info("%s VNF is up and running.", self.APP_NAME)
                     self._vnf_up_post()
                     self.queue_wrapper.clear()
                     LOG.info("%s VNF is up and running.", self.APP_NAME)
                     self._vnf_up_post()
                     self.queue_wrapper.clear()
-                    self.resource_helper.start_collect()
                     return self._vnf_process.exitcode
 
                 if "PANIC" in message:
                     return self._vnf_process.exitcode
 
                 if "PANIC" in message:
@@ -774,6 +764,12 @@ class SampleVNF(GenericVNF):
             # by other VNF output
             self.q_in.put('\r\n')
 
             # by other VNF output
             self.q_in.put('\r\n')
 
+    def start_collect(self):
+        self.resource_helper.start_collect()
+
+    def stop_collect(self):
+        self.resource_helper.stop_collect()
+
     def _build_run_kwargs(self):
         self.run_kwargs = {
             'stdin': self.queue_wrapper,
     def _build_run_kwargs(self):
         self.run_kwargs = {
             'stdin': self.queue_wrapper,
@@ -819,7 +815,7 @@ class SampleVNF(GenericVNF):
         if self._vnf_process is not None:
             # be proper and join first before we kill
             LOG.debug("joining before terminate %s", self._vnf_process.name)
         if self._vnf_process is not None:
             # be proper and join first before we kill
             LOG.debug("joining before terminate %s", self._vnf_process.name)
-            self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
+            self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
             self._vnf_process.terminate()
         # no terminate children here because we share processes with tg
 
             self._vnf_process.terminate()
         # no terminate children here because we share processes with tg
 
@@ -836,18 +832,21 @@ class SampleVNF(GenericVNF):
 
     def collect_kpi(self):
         # we can't get KPIs if the VNF is down
 
     def collect_kpi(self):
         # we can't get KPIs if the VNF is down
-        check_if_process_failed(self._vnf_process)
+        check_if_process_failed(self._vnf_process, 0.01)
         stats = self.get_stats()
         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
         stats = self.get_stats()
         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+        physical_node = Context.get_physical_node_from_server(
+            self.scenario_helper.nodes[self.name])
+
+        result = {"physical_node": physical_node}
         if m:
         if m:
-            result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
+            result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
             result["collect_stats"] = self.resource_helper.collect_kpi()
         else:
             result["collect_stats"] = self.resource_helper.collect_kpi()
         else:
-            result = {
-                "packets_in": 0,
-                "packets_fwd": 0,
-                "packets_dropped": 0,
-            }
+            result.update({"packets_in": 0,
+                           "packets_fwd": 0,
+                           "packets_dropped": 0})
+
         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
         return result
 
         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
         return result
 
@@ -863,8 +862,9 @@ class SampleVNFTrafficGen(GenericTrafficGen):
     APP_NAME = 'Sample'
     RUN_WAIT = 1
 
     APP_NAME = 'Sample'
     RUN_WAIT = 1
 
-    def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
-        super(SampleVNFTrafficGen, self).__init__(name, vnfd)
+    def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+                 resource_helper_type=None):
+        super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id)
         self.bin_path = get_nsb_option('bin_path', '')
 
         self.scenario_helper = ScenarioHelper(self.name)
         self.bin_path = get_nsb_option('bin_path', '')
 
         self.scenario_helper = ScenarioHelper(self.name)
@@ -893,6 +893,11 @@ class SampleVNFTrafficGen(GenericTrafficGen):
 
     def instantiate(self, scenario_cfg, context_cfg):
         self.scenario_helper.scenario_cfg = scenario_cfg
 
     def instantiate(self, scenario_cfg, context_cfg):
         self.scenario_helper.scenario_cfg = scenario_cfg
+        self.resource_helper.update_from_context(
+            Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
+            self.scenario_helper.nodes[self.name]
+        )
+
         self.resource_helper.setup()
         # must generate_cfg after DPDK bind because we need port number
         self.resource_helper.generate_cfg()
         self.resource_helper.setup()
         # must generate_cfg after DPDK bind because we need port number
         self.resource_helper.generate_cfg()
@@ -916,12 +921,13 @@ class SampleVNFTrafficGen(GenericTrafficGen):
                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
                 return self._tg_process.exitcode
 
                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
                 return self._tg_process.exitcode
 
-    def _traffic_runner(self, traffic_profile):
+    def _traffic_runner(self, traffic_profile, mq_id):
         # always drop connections first thing in new processes
         # so we don't get paramiko errors
         self.ssh_helper.drop_connection()
         LOG.info("Starting %s client...", self.APP_NAME)
         # always drop connections first thing in new processes
         # so we don't get paramiko errors
         self.ssh_helper.drop_connection()
         LOG.info("Starting %s client...", self.APP_NAME)
-        self.resource_helper.run_traffic(traffic_profile)
+        self._mq_producer = self._setup_mq_producer(mq_id)
+        self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
 
     def run_traffic(self, traffic_profile):
         """ Generate traffic on the wire according to the given params.
 
     def run_traffic(self, traffic_profile):
         """ Generate traffic on the wire according to the given params.
@@ -931,10 +937,12 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         :param traffic_profile:
         :return: True/False
         """
         :param traffic_profile:
         :return: True/False
         """
-        name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
+        name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
+                                    traffic_profile.__class__.__name__,
                                     os.getpid())
                                     os.getpid())
-        self._traffic_process = Process(name=name, target=self._traffic_runner,
-                                        args=(traffic_profile,))
+        self._traffic_process = Process(
+            name=name, target=self._traffic_runner,
+            args=(traffic_profile, uuid.uuid1().int))
         self._traffic_process.start()
         # Wait for traffic process to start
         while self.resource_helper.client_started.value == 0:
         self._traffic_process.start()
         # Wait for traffic process to start
         while self.resource_helper.client_started.value == 0:
@@ -943,13 +951,16 @@ class SampleVNFTrafficGen(GenericTrafficGen):
             if not self._traffic_process.is_alive():
                 break
 
             if not self._traffic_process.is_alive():
                 break
 
-        return self._traffic_process.is_alive()
-
     def collect_kpi(self):
         # check if the tg processes have exited
     def collect_kpi(self):
         # check if the tg processes have exited
+        physical_node = Context.get_physical_node_from_server(
+            self.scenario_helper.nodes[self.name])
+
+        result = {"physical_node": physical_node}
         for proc in (self._tg_process, self._traffic_process):
             check_if_process_failed(proc)
         for proc in (self._tg_process, self._traffic_process):
             check_if_process_failed(proc)
-        result = self.resource_helper.collect_kpi()
+
+        result["collect_stats"] = self.resource_helper.collect_kpi()
         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
         return result
 
         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
         return result
 
@@ -963,12 +974,12 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         if self._traffic_process is not None:
             # be proper and try to join before terminating
             LOG.debug("joining before terminate %s", self._traffic_process.name)
         if self._traffic_process is not None:
             # be proper and try to join before terminating
             LOG.debug("joining before terminate %s", self._traffic_process.name)
-            self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
+            self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
             self._traffic_process.terminate()
         if self._tg_process is not None:
             # be proper and try to join before terminating
             LOG.debug("joining before terminate %s", self._tg_process.name)
             self._traffic_process.terminate()
         if self._tg_process is not None:
             # be proper and try to join before terminating
             LOG.debug("joining before terminate %s", self._tg_process.name)
-            self._tg_process.join(PROCESS_JOIN_TIMEOUT)
+            self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
             self._tg_process.terminate()
         # no terminate children here because we share processes with vnf
 
             self._tg_process.terminate()
         # no terminate children here because we share processes with vnf