Merge "Create Dockerfile to create a yardstick-image of docker"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
index 8d02446..3ef7c33 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2016-2017 Intel Corporation
+# Copyright (c) 2016-2018 Intel Corporation
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-""" Base class implementation for generic vnf implementation """
 
-from collections import Mapping
 import logging
 from multiprocessing import Queue, Value, Process
 import os
 import posixpath
 import re
-from six.moves import cStringIO
+import uuid
 import subprocess
 import time
 
+import six
+
 from trex_stl_lib.trex_stl_client import LoggerApi
 from trex_stl_lib.trex_stl_client import STLClient
 from trex_stl_lib.trex_stl_exceptions import STLError
 from yardstick.benchmark.contexts.base import Context
-from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
 from yardstick.common import exceptions as y_exceptions
 from yardstick.common.process import check_if_process_failed
-from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
-from yardstick.network_services.helpers.samplevnf_helper import PortPairs
+from yardstick.common import utils
+from yardstick.common import yaml_loader
+from yardstick.network_services import constants
+from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
 from yardstick.network_services.nfvi.resource import ResourceProfile
 from yardstick.network_services.utils import get_nsb_option
-from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
-from yardstick.ssh import AutoConnectSSH
-
-
-DPDK_VERSION = "dpdk-16.07"
+from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
+from yardstick.benchmark.contexts.node import NodeContext
 
 LOG = logging.getLogger(__name__)
 
 
-REMOTE_TMP = "/tmp"
-DEFAULT_VNF_TIMEOUT = 3600
-PROCESS_JOIN_TIMEOUT = 3
-
-
-class VnfSshHelper(AutoConnectSSH):
-
-    def __init__(self, node, bin_path, wait=None):
-        self.node = node
-        kwargs = self.args_from_node(self.node)
-        if wait:
-            kwargs.setdefault('wait', wait)
-
-        super(VnfSshHelper, self).__init__(**kwargs)
-        self.bin_path = bin_path
-
-    @staticmethod
-    def get_class():
-        # must return static class name, anything else refers to the calling class
-        # i.e. the subclass, not the superclass
-        return VnfSshHelper
-
-    def copy(self):
-        # this copy constructor is different from SSH classes, since it uses node
-        return self.get_class()(self.node, self.bin_path)
-
-    def upload_config_file(self, prefix, content):
-        cfg_file = os.path.join(REMOTE_TMP, prefix)
-        LOG.debug(content)
-        file_obj = cStringIO(content)
-        self.put_file_obj(file_obj, cfg_file)
-        return cfg_file
-
-    def join_bin_path(self, *args):
-        return os.path.join(self.bin_path, *args)
-
-    def provision_tool(self, tool_path=None, tool_file=None):
-        if tool_path is None:
-            tool_path = self.bin_path
-        return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
-
-
 class SetupEnvHelper(object):
 
-    CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
-    CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
+    CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
+    CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
     PIPELINE_COMMAND = ''
     VNF_TYPE = "SAMPLE"
@@ -101,6 +58,7 @@ class SetupEnvHelper(object):
         self.vnfd_helper = vnfd_helper
         self.ssh_helper = ssh_helper
         self.scenario_helper = scenario_helper
+        self.collectd_options = {}
 
     def build_config(self):
         raise NotImplementedError
@@ -119,6 +77,7 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
 
     APP_NAME = 'DpdkVnf'
     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
+    NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
 
     @staticmethod
     def _update_packet_type(ip_pipeline_cfg, traffic_options):
@@ -155,24 +114,23 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
 
     def _setup_hugepages(self):
-        cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
-        hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
-
-        memory_path = \
-            '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
-        self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
-
-        if hugepages == "2048kB":
-            pages = 8192
-        else:
-            pages = 16
-
-        self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
+        meminfo = utils.read_meminfo(self.ssh_helper)
+        hp_size_kb = int(meminfo['Hugepagesize'])
+        hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
+        nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
+        self.ssh_helper.execute('echo %s | sudo tee %s' %
+                                (nr_hugepages, self.NR_HUGEPAGES_PATH))
+        hp = six.BytesIO()
+        self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
+        nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
+        LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
+                 hp_size_kb, nr_hugepages, nr_hugepages_set)
 
     def build_config(self):
         vnf_cfg = self.scenario_helper.vnf_cfg
         task_path = self.scenario_helper.task_path
 
+        config_file = vnf_cfg.get('file')
         lb_count = vnf_cfg.get('lb_count', 3)
         lb_config = vnf_cfg.get('lb_config', 'SW')
         worker_config = vnf_cfg.get('worker_config', '1C/1T')
@@ -185,7 +143,15 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
             'vnf_type': self.VNF_TYPE,
         }
 
-        config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
+        # read actions/rules from file
+        acl_options = None
+        acl_file_name = self.scenario_helper.options.get('rules')
+        if acl_file_name:
+            with utils.open_relative_file(acl_file_name, task_path) as infile:
+                acl_options = yaml_loader.yaml_load(infile)
+
+        config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
+                                                  task_path)
         config_basename = posixpath.basename(self.CFG_CONFIG)
         script_basename = posixpath.basename(self.CFG_SCRIPT)
         multiport = MultiPortConfig(self.scenario_helper.topology,
@@ -200,20 +166,33 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
                                     self.socket)
 
         multiport.generate_config()
-        with open(self.CFG_CONFIG) as handle:
-            new_config = handle.read()
-
-        new_config = self._update_traffic_type(new_config, traffic_options)
-        new_config = self._update_packet_type(new_config, traffic_options)
-
+        if config_file:
+            with utils.open_relative_file(config_file, task_path) as infile:
+                new_config = ['[EAL]']
+                vpci = []
+                for port in self.vnfd_helper.port_pairs.all_ports:
+                    interface = self.vnfd_helper.find_interface(name=port)
+                    vpci.append(interface['virtual-interface']["vpci"])
+                new_config.extend('w = {0}'.format(item) for item in vpci)
+                new_config = '\n'.join(new_config) + '\n' + infile.read()
+        else:
+            with open(self.CFG_CONFIG) as handle:
+                new_config = handle.read()
+            new_config = self._update_traffic_type(new_config, traffic_options)
+            new_config = self._update_packet_type(new_config, traffic_options)
         self.ssh_helper.upload_config_file(config_basename, new_config)
         self.ssh_helper.upload_config_file(script_basename,
-                                           multiport.generate_script(self.vnfd_helper))
+            multiport.generate_script(self.vnfd_helper,
+                                      self.get_flows_config(acl_options)))
 
         LOG.info("Provision and start the %s", self.APP_NAME)
         self._build_pipeline_kwargs()
         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
 
+    def get_flows_config(self, options=None): # pylint: disable=unused-argument
+        """No actions/rules (flows) by default"""
+        return None
+
     def _build_pipeline_kwargs(self):
         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
         # count the number of actual ports in the list of pairs
@@ -225,16 +204,24 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         port_nums = self.vnfd_helper.port_nums(ports)
         # create mask from all the dpdk port numbers
         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
+
+        vnf_cfg = self.scenario_helper.vnf_cfg
+        lb_config = vnf_cfg.get('lb_config', 'SW')
+        worker_threads = vnf_cfg.get('worker_threads', 3)
+        hwlb = ''
+        if lb_config == 'HW':
+            hwlb = ' --hwlb %s' % worker_threads
+
         self.pipeline_kwargs = {
             'cfg_file': self.CFG_CONFIG,
             'script': self.CFG_SCRIPT,
             'port_mask_hex': ports_mask_hex,
             'tool_path': tool_path,
+            'hwlb': hwlb,
         }
 
     def setup_vnf_environment(self):
         self._setup_dpdk()
-        self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
         self.kill_vnf()
         # bind before _setup_resources so we can use dpdk_port_num
         self._detect_and_bind_drivers()
@@ -252,16 +239,11 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
     def _setup_dpdk(self):
         """Setup DPDK environment needed for VNF to run"""
         self._setup_hugepages()
-        self.ssh_helper.execute('sudo modprobe uio && sudo modprobe igb_uio')
-        exit_status = self.ssh_helper.execute('lsmod | grep -i igb_uio')[0]
-        if exit_status:
-            raise y_exceptions.DPDKSetupDriverError()
+        self.dpdk_bind_helper.load_dpdk_driver()
 
-    def get_collectd_options(self):
-        options = self.scenario_helper.all_options.get("collectd", {})
-        # override with specific node settings
-        options.update(self.scenario_helper.options.get("collectd", {}))
-        return options
+        exit_status = self.dpdk_bind_helper.check_dpdk_driver()
+        if exit_status == 0:
+            return
 
     def _setup_resources(self):
         # what is this magic?  how do we know which socket is for which port?
@@ -275,16 +257,29 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         # this won't work because we don't have DPDK port numbers yet
         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
         port_names = (intf["name"] for intf in ports)
-        collectd_options = self.get_collectd_options()
-        plugins = collectd_options.get("plugins", {})
+        plugins = self.collectd_options.get("plugins", {})
+        interval = self.collectd_options.get("interval")
         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
-                               plugins=plugins, interval=collectd_options.get("interval"),
+                               plugins=plugins, interval=interval,
                                timeout=self.scenario_helper.timeout)
 
+    def _check_interface_fields(self):
+        num_nodes = len(self.scenario_helper.nodes)
+        # OpenStack instance creation time is probably proportional to the number
+        # of instances
+        timeout = 120 * num_nodes
+        dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
+                             self.ssh_helper, timeout)
+        dpdk_node.check()
+
     def _detect_and_bind_drivers(self):
         interfaces = self.vnfd_helper.interfaces
 
+        self._check_interface_fields()
+        # check for bound after probe
+        self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
+
         self.dpdk_bind_helper.read_status()
         self.dpdk_bind_helper.save_used_drivers()
 
@@ -325,6 +320,7 @@ class ResourceHelper(object):
         self.resource = None
         self.setup_helper = setup_helper
         self.ssh_helper = setup_helper.ssh_helper
+        self._enable = True
 
     def setup(self):
         self.resource = self.setup_helper.setup_vnf_environment()
@@ -332,22 +328,33 @@ class ResourceHelper(object):
     def generate_cfg(self):
         pass
 
+    def update_from_context(self, context, attr_name):
+        """Disable resource helper in case of baremetal context.
+
+        And update appropriate node collectd options in context
+        """
+        if isinstance(context, NodeContext):
+            self._enable = False
+            context.update_collectd_options_for_node(self.setup_helper.collectd_options,
+                                                     attr_name)
+
     def _collect_resource_kpi(self):
         result = {}
         status = self.resource.check_if_system_agent_running("collectd")[0]
-        if status == 0:
+        if status == 0 and self._enable:
             result = self.resource.amqp_collect_nfvi_kpi()
 
         result = {"core": result}
         return result
 
     def start_collect(self):
-        self.resource.initiate_systemagent(self.ssh_helper.bin_path)
-        self.resource.start()
-        self.resource.amqp_process_for_nfvi_kpi()
+        if self._enable:
+            self.resource.initiate_systemagent(self.ssh_helper.bin_path)
+            self.resource.start()
+            self.resource.amqp_process_for_nfvi_kpi()
 
     def stop_collect(self):
-        if self.resource:
+        if self.resource and self._enable:
             self.resource.stop()
 
     def collect_kpi(self):
@@ -388,51 +395,27 @@ class ClientResourceHelper(ResourceHelper):
         try:
             return self.client.get_stats(*args, **kwargs)
         except STLError:
-            LOG.exception("TRex client not connected")
-            return {}
-
-    def generate_samples(self, ports, key=None, default=None):
-        # needs to be used ports
-        last_result = self.get_stats(ports)
-        key_value = last_result.get(key, default)
-
-        if not isinstance(last_result, Mapping):  # added for mock unit test
-            self._terminated.value = 1
+            LOG.error('TRex client not connected')
             return {}
 
-        samples = {}
-        # recalculate port for interface and see if it matches ports provided
-        for intf in self.vnfd_helper.interfaces:
-            name = intf["name"]
-            port = self.vnfd_helper.port_num(name)
-            if port in ports:
-                xe_value = last_result.get(port, {})
-                samples[name] = {
-                    "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
-                    "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
-                    "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
-                    "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
-                    "in_packets": int(xe_value.get("ipackets", 0)),
-                    "out_packets": int(xe_value.get("opackets", 0)),
-                }
-                if key:
-                    samples[name][key] = key_value
-        return samples
+    def _get_samples(self, ports, port_pg_id=False):
+        raise NotImplementedError()
 
     def _run_traffic_once(self, traffic_profile):
         traffic_profile.execute_traffic(self)
         self.client_started.value = 1
         time.sleep(self.RUN_DURATION)
-        samples = self.generate_samples(traffic_profile.ports)
+        samples = self._get_samples(traffic_profile.ports)
         time.sleep(self.QUEUE_WAIT_TIME)
         self._queue.put(samples)
 
-    def run_traffic(self, traffic_profile):
+    def run_traffic(self, traffic_profile, mq_producer):
         # if we don't do this we can hang waiting for the queue to drain
         # have to do this in the subprocess
         self._queue.cancel_join_thread()
         # fixme: fix passing correct trex config file,
         # instead of searching the default path
+        mq_producer.tg_method_started()
         try:
             self._build_ports()
             self.client = self._connect()
@@ -440,8 +423,11 @@ class ClientResourceHelper(ResourceHelper):
             self.client.remove_all_streams(self.all_ports)  # remove all streams
             traffic_profile.register_generator(self)
 
+            iteration_index = 0
             while self._terminated.value == 0:
+                iteration_index += 1
                 self._run_traffic_once(traffic_profile)
+                mq_producer.tg_method_iteration(iteration_index)
 
             self.client.stop(self.all_ports)
             self.client.disconnect()
@@ -452,6 +438,8 @@ class ClientResourceHelper(ResourceHelper):
                 return  # return if trex/tg server is stopped.
             raise
 
+        mq_producer.tg_method_finished()
+
     def terminate(self):
         self._terminated.value = 1  # stop client
 
@@ -627,10 +615,11 @@ class ScenarioHelper(object):
     @property
     def timeout(self):
         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
-            self.options.get('timeout', DEFAULT_VNF_TIMEOUT))
-        test_timeout = self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
+            self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
+        test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
         return test_duration if test_duration > test_timeout else test_timeout
 
+
 class SampleVNF(GenericVNF):
     """ Class providing file-like API for generic VNF implementation """
 
@@ -640,8 +629,9 @@ class SampleVNF(GenericVNF):
     APP_NAME = "SampleVNF"
     # we run the VNF interactively, so the ssh command will timeout after this long
 
-    def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
-        super(SampleVNF, self).__init__(name, vnfd)
+    def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+                 resource_helper_type=None):
+        super(SampleVNF, self).__init__(name, vnfd, task_id)
         self.bin_path = get_nsb_option('bin_path', '')
 
         self.scenario_helper = ScenarioHelper(self.name)
@@ -662,7 +652,6 @@ class SampleVNF(GenericVNF):
         self.resource_helper = resource_helper_type(self.setup_helper)
 
         self.context_cfg = None
-        self.nfvi_context = None
         self.pipeline_kwargs = {}
         self.uplink_ports = None
         self.downlink_ports = None
@@ -676,49 +665,6 @@ class SampleVNF(GenericVNF):
         self.vnf_port_pairs = None
         self._vnf_process = None
 
-    def _build_ports(self):
-        self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
-        self.networks = self._port_pairs.networks
-        self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
-        self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
-        self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
-
-    def _get_route_data(self, route_index, route_type):
-        route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
-        for _ in range(route_index):
-            next(route_iter, '')
-        return next(route_iter, {}).get(route_type, '')
-
-    def _get_port0localip6(self):
-        return_value = self._get_route_data(0, 'network')
-        LOG.info("_get_port0localip6 : %s", return_value)
-        return return_value
-
-    def _get_port1localip6(self):
-        return_value = self._get_route_data(1, 'network')
-        LOG.info("_get_port1localip6 : %s", return_value)
-        return return_value
-
-    def _get_port0prefixlen6(self):
-        return_value = self._get_route_data(0, 'netmask')
-        LOG.info("_get_port0prefixlen6 : %s", return_value)
-        return return_value
-
-    def _get_port1prefixlen6(self):
-        return_value = self._get_route_data(1, 'netmask')
-        LOG.info("_get_port1prefixlen6 : %s", return_value)
-        return return_value
-
-    def _get_port0gateway6(self):
-        return_value = self._get_route_data(0, 'network')
-        LOG.info("_get_port0gateway6 : %s", return_value)
-        return return_value
-
-    def _get_port1gateway6(self):
-        return_value = self._get_route_data(1, 'network')
-        LOG.info("_get_port1gateway6 : %s", return_value)
-        return return_value
-
     def _start_vnf(self):
         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
@@ -729,10 +675,13 @@ class SampleVNF(GenericVNF):
         pass
 
     def instantiate(self, scenario_cfg, context_cfg):
+        self._update_collectd_options(scenario_cfg, context_cfg)
         self.scenario_helper.scenario_cfg = scenario_cfg
         self.context_cfg = context_cfg
-        self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
-        # self.nfvi_context = None
+        self.resource_helper.update_from_context(
+            Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
+            self.scenario_helper.nodes[self.name]
+        )
 
         # vnf deploy is unsupported, use ansible playbooks
         if self.scenario_helper.options.get("vnf_deploy", False):
@@ -740,6 +689,54 @@ class SampleVNF(GenericVNF):
         self.resource_helper.setup()
         self._start_vnf()
 
+    def _update_collectd_options(self, scenario_cfg, context_cfg):
+        """Update collectd configuration options
+        This function retrieves all collectd options contained in the test case
+
+        definition builds a single dictionary combining them. The following fragment
+        represents a test case with the collectd options and priorities (1 highest, 3 lowest):
+        ---
+        schema: yardstick:task:0.1
+        scenarios:
+        - type: NSPerf
+          nodes:
+            tg__0: trafficgen_1.yardstick
+            vnf__0: vnf.yardstick
+          options:
+            collectd:
+              <options>  # COLLECTD priority 3
+            vnf__0:
+              collectd:
+                plugins:
+                    load
+                <options> # COLLECTD priority 2
+        context:
+          type: Node
+          name: yardstick
+          nfvi_type: baremetal
+          file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
+        """
+        scenario_options = scenario_cfg.get('options', {})
+        generic_options = scenario_options.get('collectd', {})
+        scenario_node_options = scenario_options.get(self.name, {})\
+            .get('collectd', {})
+        context_node_options = context_cfg.get('nodes', {})\
+            .get(self.name, {}).get('collectd', {})
+
+        options = generic_options
+        self._update_options(options, scenario_node_options)
+        self._update_options(options, context_node_options)
+
+        self.setup_helper.collectd_options = options
+
+    def _update_options(self, options, additional_options):
+        """Update collectd options and plugins dictionary"""
+        for k, v in additional_options.items():
+            if isinstance(v, dict) and k in options:
+                options[k].update(v)
+            else:
+                options[k] = v
+
     def wait_for_instantiate(self):
         buf = []
         time.sleep(self.WAIT_TIME)  # Give some time for config to load
@@ -755,7 +752,6 @@ class SampleVNF(GenericVNF):
                     LOG.info("%s VNF is up and running.", self.APP_NAME)
                     self._vnf_up_post()
                     self.queue_wrapper.clear()
-                    self.resource_helper.start_collect()
                     return self._vnf_process.exitcode
 
                 if "PANIC" in message:
@@ -768,6 +764,12 @@ class SampleVNF(GenericVNF):
             # by other VNF output
             self.q_in.put('\r\n')
 
+    def start_collect(self):
+        self.resource_helper.start_collect()
+
+    def stop_collect(self):
+        self.resource_helper.stop_collect()
+
     def _build_run_kwargs(self):
         self.run_kwargs = {
             'stdin': self.queue_wrapper,
@@ -813,7 +815,7 @@ class SampleVNF(GenericVNF):
         if self._vnf_process is not None:
             # be proper and join first before we kill
             LOG.debug("joining before terminate %s", self._vnf_process.name)
-            self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
+            self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
             self._vnf_process.terminate()
         # no terminate children here because we share processes with tg
 
@@ -830,18 +832,21 @@ class SampleVNF(GenericVNF):
 
     def collect_kpi(self):
         # we can't get KPIs if the VNF is down
-        check_if_process_failed(self._vnf_process)
+        check_if_process_failed(self._vnf_process, 0.01)
         stats = self.get_stats()
         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+        physical_node = Context.get_physical_node_from_server(
+            self.scenario_helper.nodes[self.name])
+
+        result = {"physical_node": physical_node}
         if m:
-            result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
+            result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
             result["collect_stats"] = self.resource_helper.collect_kpi()
         else:
-            result = {
-                "packets_in": 0,
-                "packets_fwd": 0,
-                "packets_dropped": 0,
-            }
+            result.update({"packets_in": 0,
+                           "packets_fwd": 0,
+                           "packets_dropped": 0})
+
         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
         return result
 
@@ -857,8 +862,9 @@ class SampleVNFTrafficGen(GenericTrafficGen):
     APP_NAME = 'Sample'
     RUN_WAIT = 1
 
-    def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
-        super(SampleVNFTrafficGen, self).__init__(name, vnfd)
+    def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+                 resource_helper_type=None):
+        super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id)
         self.bin_path = get_nsb_option('bin_path', '')
 
         self.scenario_helper = ScenarioHelper(self.name)
@@ -887,6 +893,11 @@ class SampleVNFTrafficGen(GenericTrafficGen):
 
     def instantiate(self, scenario_cfg, context_cfg):
         self.scenario_helper.scenario_cfg = scenario_cfg
+        self.resource_helper.update_from_context(
+            Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
+            self.scenario_helper.nodes[self.name]
+        )
+
         self.resource_helper.setup()
         # must generate_cfg after DPDK bind because we need port number
         self.resource_helper.generate_cfg()
@@ -910,12 +921,13 @@ class SampleVNFTrafficGen(GenericTrafficGen):
                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
                 return self._tg_process.exitcode
 
-    def _traffic_runner(self, traffic_profile):
+    def _traffic_runner(self, traffic_profile, mq_id):
         # always drop connections first thing in new processes
         # so we don't get paramiko errors
         self.ssh_helper.drop_connection()
         LOG.info("Starting %s client...", self.APP_NAME)
-        self.resource_helper.run_traffic(traffic_profile)
+        self._mq_producer = self._setup_mq_producer(mq_id)
+        self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
 
     def run_traffic(self, traffic_profile):
         """ Generate traffic on the wire according to the given params.
@@ -925,10 +937,12 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         :param traffic_profile:
         :return: True/False
         """
-        name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
+        name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
+                                    traffic_profile.__class__.__name__,
                                     os.getpid())
-        self._traffic_process = Process(name=name, target=self._traffic_runner,
-                                        args=(traffic_profile,))
+        self._traffic_process = Process(
+            name=name, target=self._traffic_runner,
+            args=(traffic_profile, uuid.uuid1().int))
         self._traffic_process.start()
         # Wait for traffic process to start
         while self.resource_helper.client_started.value == 0:
@@ -937,13 +951,16 @@ class SampleVNFTrafficGen(GenericTrafficGen):
             if not self._traffic_process.is_alive():
                 break
 
-        return self._traffic_process.is_alive()
-
     def collect_kpi(self):
         # check if the tg processes have exited
+        physical_node = Context.get_physical_node_from_server(
+            self.scenario_helper.nodes[self.name])
+
+        result = {"physical_node": physical_node}
         for proc in (self._tg_process, self._traffic_process):
             check_if_process_failed(proc)
-        result = self.resource_helper.collect_kpi()
+
+        result["collect_stats"] = self.resource_helper.collect_kpi()
         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
         return result
 
@@ -957,12 +974,12 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         if self._traffic_process is not None:
             # be proper and try to join before terminating
             LOG.debug("joining before terminate %s", self._traffic_process.name)
-            self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
+            self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
             self._traffic_process.terminate()
         if self._tg_process is not None:
             # be proper and try to join before terminating
             LOG.debug("joining before terminate %s", self._tg_process.name)
-            self._tg_process.join(PROCESS_JOIN_TIMEOUT)
+            self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
             self._tg_process.terminate()
         # no terminate children here because we share processes with vnf