Add new scenario NSPerf-RFC2544 61/66561/18
authorSerhiy Pshyk <serhiyx.pshyk@intel.com>
Tue, 22 Jan 2019 13:31:58 +0000 (13:31 +0000)
committerSerhiy Pshyk <serhiyx.pshyk@intel.com>
Thu, 31 Jan 2019 09:47:30 +0000 (09:47 +0000)
List of changes:
 - Added new scenario NSPerf-RFC2544 that runs complete test per
   runner's iteration
 - NSPerf is still present for backward compatibility
 - Added ScenarioOutput class which allows scenario to report
   data (kpi,...) at any moment of time (not only once per run)
 - New output mechanism is used only by new NSPerf-RFC2544 scenario

JIRA: YARDSTICK-1592

Change-Id: Ica8078f61d895f74df90e8eaad7a675b28bd19c6
Signed-off-by: Serhiy Pshyk <serhiyx.pshyk@intel.com>
samples/vnf_samples/nsut/agnostic/tc_baremetal_rfc2544_ipv4_ixia.yaml [new file with mode: 0644]
yardstick/benchmark/runners/base.py
yardstick/benchmark/runners/iteration.py
yardstick/benchmark/scenarios/networking/vnf_generic.py
yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py
yardstick/tests/unit/benchmark/runner/test_base.py
yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py
yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py

diff --git a/samples/vnf_samples/nsut/agnostic/tc_baremetal_rfc2544_ipv4_ixia.yaml b/samples/vnf_samples/nsut/agnostic/tc_baremetal_rfc2544_ipv4_ixia.yaml
new file mode 100644 (file)
index 0000000..edf36b8
--- /dev/null
@@ -0,0 +1,45 @@
+# Copyright (c) 2019 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+schema: yardstick:task:0.1
+scenarios:
+- type: NSPerf-RFC2544
+  traffic_profile: ../../traffic_profiles/ixia_ipv4_latency.yaml
+  topology: agnostic_vnf_topology_ixia_2ports.yaml
+  nodes:
+    tg__0: tg_0.yardstick
+    vnf__0: vnf_0.yardstick
+  options:
+    framesize:
+      uplink: {64B: 100}
+      downlink: {64B: 100}
+    flow:
+      src_ip: [{'tg__0': 'xe0'}]
+      dst_ip: [{'tg__0': 'xe1'}]
+      count: 1
+    traffic_type: 4
+    rfc2544:
+      allowed_drop_rate: 0.0001 - 0.0001
+    vnf__0:
+        []
+  runner:
+    type: Iteration
+    iterations: 2
+    interval: 5
+context:
+  type: Node
+  name: yardstick
+  nfvi_type: baremetal
+  file: /etc/yardstick/nodes/pod_ixia.yaml
index 3878f20..94de45d 100755 (executable)
@@ -77,6 +77,33 @@ def _periodic_action(interval, command, queue):
         queue.put({'periodic-action-data': data})
 
 
+class ScenarioOutput(dict):
+
+    QUEUE_PUT_TIMEOUT = 10
+
+    def __init__(self, queue, **kwargs):
+        super(ScenarioOutput, self).__init__()
+        self._queue = queue
+        self.result_ext = dict()
+        for key, val in kwargs.items():
+            self.result_ext[key] = val
+            setattr(self, key, val)
+
+    def push(self, data=None, add_timestamp=True):
+        if data is None:
+            data = dict(self)
+
+        if add_timestamp:
+            result = {'timestamp': time.time(), 'data': data}
+        else:
+            result = data
+
+        for key in self.result_ext.keys():
+            result[key] = getattr(self, key)
+
+        self._queue.put(result, True, self.QUEUE_PUT_TIMEOUT)
+
+
 class Runner(object):
     runners = []
 
index 58ab06a..15dad2c 100644 (file)
@@ -23,7 +23,6 @@ from __future__ import absolute_import
 
 import logging
 import multiprocessing
-import time
 import traceback
 
 import os
@@ -40,8 +39,6 @@ QUEUE_PUT_TIMEOUT = 10
 def _worker_process(queue, cls, method_name, scenario_cfg,
                     context_cfg, aborted, output_queue):
 
-    sequence = 1
-
     runner_cfg = scenario_cfg['runner']
 
     interval = runner_cfg.get("interval", 1)
@@ -53,6 +50,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
 
     runner_cfg['runner_id'] = os.getpid()
 
+    scenario_output = base.ScenarioOutput(queue, sequence=1, errors="")
     benchmark = cls(scenario_cfg, context_cfg)
     if "setup" in run_step:
         benchmark.setup()
@@ -67,22 +65,21 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
 
             LOG.debug("runner=%(runner)s seq=%(sequence)s START",
                       {"runner": runner_cfg["runner_id"],
-                       "sequence": sequence})
-
-            data = {}
-            errors = ""
+                       "sequence": scenario_output.sequence})
 
+            scenario_output.clear()
+            scenario_output.errors = ""
             benchmark.pre_run_wait_time(interval)
 
             try:
-                result = method(data)
+                result = method(scenario_output)
             except y_exc.SLAValidationError as error:
                 # SLA validation failed in scenario, determine what to do now
                 if sla_action == "assert":
                     raise
                 elif sla_action == "monitor":
                     LOG.warning("SLA validation failed: %s", error.args)
-                    errors = error.args
+                    scenario_output.errors = error.args
                 elif sla_action == "rate-control":
                     try:
                         scenario_cfg['options']['rate']
@@ -91,10 +88,10 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
                         scenario_cfg['options']['rate'] = 100
 
                     scenario_cfg['options']['rate'] -= delta
-                    sequence = 1
+                    scenario_output.sequence = 1
                     continue
             except Exception:  # pylint: disable=broad-except
-                errors = traceback.format_exc()
+                scenario_output.errors = traceback.format_exc()
                 LOG.exception("")
                 raise
             else:
@@ -105,23 +102,17 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
 
             benchmark.post_run_wait_time(interval)
 
-            benchmark_output = {
-                'timestamp': time.time(),
-                'sequence': sequence,
-                'data': data,
-                'errors': errors
-            }
-
-            queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
+            if scenario_output:
+                scenario_output.push()
 
             LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                       {"runner": runner_cfg["runner_id"],
-                       "sequence": sequence})
+                       "sequence": scenario_output.sequence})
 
-            sequence += 1
+            scenario_output.sequence += 1
 
-            if (errors and sla_action is None) or \
-                    (sequence > iterations or aborted.is_set()):
+            if (scenario_output.errors and sla_action is None) or \
+                    (scenario_output.sequence > iterations or aborted.is_set()):
                 LOG.info("worker END")
                 break
     if "teardown" in run_step:
index daca9ba..3fd1845 100644 (file)
@@ -44,14 +44,13 @@ traffic_profile.register_modules()
 LOG = logging.getLogger(__name__)
 
 
-class NetworkServiceTestCase(scenario_base.Scenario):
-    """Class handles Generic framework to do pre-deployment VNF &
-       Network service testing  """
+class NetworkServiceBase(scenario_base.Scenario):
+    """Base class for Network service testing scenarios"""
 
-    __scenario_type__ = "NSPerf"
+    __scenario_type__ = ""
 
     def __init__(self, scenario_cfg, context_cfg):  # pragma: no cover
-        super(NetworkServiceTestCase, self).__init__()
+        super(NetworkServiceBase, self).__init__()
         self.scenario_cfg = scenario_cfg
         self.context_cfg = context_cfg
 
@@ -62,6 +61,32 @@ class NetworkServiceTestCase(scenario_base.Scenario):
         self.node_netdevs = {}
         self.bin_path = get_nsb_option('bin_path', '')
 
+    def run(self, *args):
+        pass
+
+    def teardown(self):
+        """ Stop the collector and terminate VNF & TG instance
+
+        :return
+        """
+
+        try:
+            try:
+                self.collector.stop()
+                for vnf in self.vnfs:
+                    LOG.info("Stopping %s", vnf.name)
+                    vnf.terminate()
+                LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs))
+            finally:
+                terminate_children()
+        except Exception:
+            # catch any exception in teardown and convert to simple exception
+            # never pass exceptions back to multiprocessing, because some exceptions can
+            # be unpicklable
+            # https://bugs.python.org/issue9400
+            LOG.exception("")
+            raise RuntimeError("Error in teardown")
+
     def is_ended(self):
         return self.traffic_profile is not None and self.traffic_profile.is_ended()
 
@@ -451,6 +476,24 @@ class NetworkServiceTestCase(scenario_base.Scenario):
         self.vnfs = vnfs
         return vnfs
 
+    def pre_run_wait_time(self, time_seconds):  # pragma: no cover
+        """Time waited before executing the run method"""
+        time.sleep(time_seconds)
+
+    def post_run_wait_time(self, time_seconds):  # pragma: no cover
+        """Time waited after executing the run method"""
+        pass
+
+
+class NetworkServiceTestCase(NetworkServiceBase):
+    """Class handles Generic framework to do pre-deployment VNF &
+       Network service testing  """
+
+    __scenario_type__ = "NSPerf"
+
+    def __init__(self, scenario_cfg, context_cfg):  # pragma: no cover
+        super(NetworkServiceTestCase, self).__init__(scenario_cfg, context_cfg)
+
     def setup(self):
         """Setup infrastructure, provission VNFs & start traffic"""
         # 1. Verify if infrastructure mapping can meet topology
@@ -509,33 +552,70 @@ class NetworkServiceTestCase(scenario_base.Scenario):
 
         result.update(self.collector.get_kpi())
 
-    def teardown(self):
-        """ Stop the collector and terminate VNF & TG instance
 
-        :return
-        """
+class NetworkServiceRFC2544(NetworkServiceBase):
+    """Class handles RFC2544 Network service testing"""
+
+    __scenario_type__ = "NSPerf-RFC2544"
+
+    def __init__(self, scenario_cfg, context_cfg):  # pragma: no cover
+        super(NetworkServiceRFC2544, self).__init__(scenario_cfg, context_cfg)
 
+    def setup(self):
+        """Setup infrastructure, provision VNFs"""
+        self.map_topology_to_infrastructure()
+        self.load_vnf_models()
+
+        traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic]
+        non_traffic_runners = [vnf for vnf in self.vnfs if not vnf.runs_traffic]
         try:
-            try:
-                self.collector.stop()
-                for vnf in self.vnfs:
-                    LOG.info("Stopping %s", vnf.name)
-                    vnf.terminate()
-                LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs))
-            finally:
-                terminate_children()
-        except Exception:
-            # catch any exception in teardown and convert to simple exception
-            # never pass exceptions back to multiprocessing, because some exceptions can
-            # be unpicklable
-            # https://bugs.python.org/issue9400
+            for vnf in chain(traffic_runners, non_traffic_runners):
+                LOG.info("Instantiating %s", vnf.name)
+                vnf.instantiate(self.scenario_cfg, self.context_cfg)
+                LOG.info("Waiting for %s to instantiate", vnf.name)
+                vnf.wait_for_instantiate()
+        except:
             LOG.exception("")
-            raise RuntimeError("Error in teardown")
+            for vnf in self.vnfs:
+                vnf.terminate()
+            raise
 
-    def pre_run_wait_time(self, time_seconds):  # pragma: no cover
-        """Time waited before executing the run method"""
-        time.sleep(time_seconds)
+        self._generate_pod_yaml()
 
-    def post_run_wait_time(self, time_seconds):  # pragma: no cover
-        """Time waited after executing the run method"""
-        pass
+    def run(self, output):
+        """ Run experiment
+
+        :param output: scenario output to push results
+        :return: None
+        """
+
+        self._fill_traffic_profile()
+
+        traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic]
+
+        for traffic_gen in traffic_runners:
+            traffic_gen.listen_traffic(self.traffic_profile)
+
+        self.collector = Collector(self.vnfs,
+                                   context_base.Context.get_physical_nodes())
+        self.collector.start()
+
+        test_completed = False
+        while not test_completed:
+            for traffic_gen in traffic_runners:
+                LOG.info("Run traffic on %s", traffic_gen.name)
+                traffic_gen.run_traffic_once(self.traffic_profile)
+
+            test_completed = True
+            for traffic_gen in traffic_runners:
+                # wait for all tg to complete running traffic
+                status = traffic_gen.wait_on_traffic()
+                LOG.info("Run traffic on %s complete status=%s",
+                         traffic_gen.name, status)
+                if status == 'CONTINUE':
+                    # continue running if at least one tg is running
+                    test_completed = False
+
+            output.push(self.collector.get_kpi())
+
+        self.collector.stop()
index 8f3698c..c232773 100644 (file)
 import ipaddress
 import logging
 import six
+import os
+import time
+
+from six import moves
+
+from multiprocessing import Queue, Process, JoinableQueue
 
 from yardstick.common import utils
 from yardstick.common import exceptions
@@ -577,8 +583,76 @@ class IxiaResourceHelper(ClientResourceHelper):
             LOG.exception('Run Traffic terminated')
 
         self._ix_scenario.stop_protocols()
+        self.client_started.value = 0
         self._terminated.value = 1
 
+    def run_test(self, traffic_profile, tasks_queue, results_queue, *args): # pragma: no cover
+        LOG.info("Ixia resource_helper run_test")
+        if self._terminated.value:
+            return
+
+        min_tol = self.rfc_helper.tolerance_low
+        max_tol = self.rfc_helper.tolerance_high
+        precision = self.rfc_helper.tolerance_precision
+        default = "00:00:00:00:00:00"
+
+        self._build_ports()
+        traffic_profile.update_traffic_profile(self)
+        self._initialize_client(traffic_profile)
+
+        mac = {}
+        for port_name in self.vnfd_helper.port_pairs.all_ports:
+            intf = self.vnfd_helper.find_interface(name=port_name)
+            virt_intf = intf["virtual-interface"]
+            # we only know static traffic id by reading the json
+            # this is used by _get_ixia_trafficrofile
+            port_num = self.vnfd_helper.port_num(intf)
+            mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default)
+            mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default)
+
+        self._ix_scenario.run_protocols()
+
+        try:
+            completed = False
+            self.client_started.value = 1
+            while completed is False and not self._terminated.value:
+                LOG.info("Wait for task ...")
+
+                try:
+                    task = tasks_queue.get(True, 5)
+                except moves.queue.Empty:
+                    continue
+                else:
+                    if task != 'RUN_TRAFFIC':
+                        continue
+
+                LOG.info("Got %s task", task)
+                first_run = traffic_profile.execute_traffic(
+                    self, self.client, mac)
+                # pylint: disable=unnecessary-lambda
+                utils.wait_until_true(lambda: self.client.is_traffic_stopped(),
+                                      timeout=traffic_profile.config.duration * 2)
+                samples = self.generate_samples(traffic_profile.ports,
+                                                traffic_profile.config.duration)
+
+                completed, samples = traffic_profile.get_drop_percentage(
+                    samples, min_tol, max_tol, precision, first_run=first_run)
+                self._queue.put(samples)
+
+                if completed:
+                    LOG.debug("IxiaResourceHelper::run_test - test completed")
+                    results_queue.put('COMPLETE')
+                else:
+                    results_queue.put('CONTINUE')
+                tasks_queue.task_done()
+
+        except Exception:  # pylint: disable=broad-except
+            LOG.exception('Run Traffic terminated')
+
+        self._ix_scenario.stop_protocols()
+        self.client_started.value = 0
+        LOG.debug("IxiaResourceHelper::run_test done")
+
     def collect_kpi(self):
         self.rfc_helper.iteration.value += 1
         return super(IxiaResourceHelper, self).collect_kpi()
@@ -597,6 +671,9 @@ class IxiaTrafficGen(SampleVNFTrafficGen):
         self._ixia_traffic_gen = None
         self.ixia_file_name = ''
         self.vnf_port_pairs = []
+        self._traffic_process = None
+        self._tasks_queue = JoinableQueue()
+        self._result_queue = Queue()
 
     def _check_status(self):
         pass
@@ -604,3 +681,34 @@ class IxiaTrafficGen(SampleVNFTrafficGen):
     def terminate(self):
         self.resource_helper.stop_collect()
         super(IxiaTrafficGen, self).terminate()
+
+    def _test_runner(self, traffic_profile, tasks, results):
+        self.resource_helper.run_test(traffic_profile, tasks, results)
+
+    def _init_traffic_process(self, traffic_profile):
+        name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
+                                    traffic_profile.__class__.__name__,
+                                    os.getpid())
+        self._traffic_process = Process(name=name, target=self._test_runner,
+                                        args=(
+                                        traffic_profile, self._tasks_queue,
+                                        self._result_queue))
+
+        self._traffic_process.start()
+        while self.resource_helper.client_started.value == 0:
+            time.sleep(1)
+            if not self._traffic_process.is_alive():
+                break
+
+    def run_traffic_once(self, traffic_profile):
+        if self.resource_helper.client_started.value == 0:
+            self._init_traffic_process(traffic_profile)
+
+        # continue test - run next iteration
+        LOG.info("Run next iteration ...")
+        self._tasks_queue.put('RUN_TRAFFIC')
+
+    def wait_on_traffic(self):
+        self._tasks_queue.join()
+        result = self._result_queue.get()
+        return result
index 559c991..07d6f18 100644 (file)
@@ -43,6 +43,29 @@ class ActionTestCase(ut_base.BaseUnitTestCase):
         runner_base._periodic_action(0, 'echo', mock.Mock())
 
 
+class ScenarioOutputTestCase(ut_base.BaseUnitTestCase):
+
+    def setUp(self):
+        self.output_queue = mock.Mock()
+        self.scenario_output = runner_base.ScenarioOutput(self.output_queue,
+                                                          sequence=1)
+
+    @mock.patch.object(time, 'time')
+    def test_push(self, mock_time):
+        mock_time.return_value = 2
+        data = {"value1": 1}
+        self.scenario_output.push(data)
+        self.output_queue.put.assert_called_once_with({'timestamp': 2,
+                                                       'sequence': 1,
+                                                       'data': data}, True, 10)
+
+    def test_push_no_timestamp(self):
+        self.scenario_output["value1"] = 1
+        self.scenario_output.push(None, False)
+        self.output_queue.put.assert_called_once_with({'sequence': 1,
+                                                       'value1': 1}, True, 10)
+
+
 class RunnerTestCase(ut_base.BaseUnitTestCase):
 
     def setUp(self):
index addd72b..8ad35aa 100644 (file)
@@ -727,3 +727,87 @@ class TestNetworkServiceTestCase(unittest.TestCase):
             mock.Mock(return_value=True)
         with self.assertRaises(RuntimeError):
             self.s.teardown()
+
+
+class TestNetworkServiceRFC2544TestCase(TestNetworkServiceTestCase):
+
+    def setUp(self):
+        super(TestNetworkServiceRFC2544TestCase, self).setUp()
+        self.s = vnf_generic.NetworkServiceRFC2544(self.scenario_cfg,
+                                                    self.context_cfg)
+
+    def test_run(self):
+        tgen = mock.Mock(autospec=GenericTrafficGen)
+        tgen.traffic_finished = True
+        verified_dict = {"verified": True}
+        tgen.verify_traffic = lambda x: verified_dict
+        tgen.name = "tgen__1"
+        tgen.wait_on_trafic.return_value = 'COMPLETE'
+        vnf = mock.Mock(autospec=GenericVNF)
+        vnf.runs_traffic = False
+        self.s.vnfs = [tgen, vnf]
+        self.s.traffic_profile = mock.Mock()
+        self.s._fill_traffic_profile = mock.Mock()
+        self.s.collector = mock.Mock(autospec=Collector)
+        self.s.collector.get_kpi = mock.Mock(
+            return_value={tgen.name: verified_dict})
+        result = mock.Mock()
+        self.s.run(result)
+        self.s._fill_traffic_profile.assert_called_once()
+        result.push.assert_called_once()
+
+    def test_setup(self):
+        with mock.patch("yardstick.ssh.SSH") as ssh:
+            ssh_mock = mock.Mock(autospec=ssh.SSH)
+            ssh_mock.execute = \
+                mock.Mock(return_value=(0, SYS_CLASS_NET + IP_ADDR_SHOW, ""))
+            ssh.from_node.return_value = ssh_mock
+
+            tgen = mock.Mock(autospec=GenericTrafficGen)
+            tgen.traffic_finished = True
+            verified_dict = {"verified": True}
+            tgen.verify_traffic = lambda x: verified_dict
+            tgen.terminate = mock.Mock(return_value=True)
+            tgen.name = "tgen__1"
+            tgen.run_traffic.return_value = 'tg_id'
+            vnf = mock.Mock(autospec=GenericVNF)
+            vnf.runs_traffic = False
+            vnf.terminate = mock.Mock(return_value=True)
+            self.s.vnfs = [tgen, vnf]
+            self.s.traffic_profile = mock.Mock()
+            self.s.collector = mock.Mock(autospec=Collector)
+            self.s.collector.get_kpi = \
+                mock.Mock(return_value={tgen.name: verified_dict})
+            self.s.map_topology_to_infrastructure = mock.Mock(return_value=0)
+            self.s.load_vnf_models = mock.Mock(return_value=self.s.vnfs)
+            self.s.setup()
+
+    def test_setup_exception(self):
+        with mock.patch("yardstick.ssh.SSH") as ssh:
+            ssh_mock = mock.Mock(autospec=ssh.SSH)
+            ssh_mock.execute = \
+                mock.Mock(return_value=(0, SYS_CLASS_NET + IP_ADDR_SHOW, ""))
+            ssh.from_node.return_value = ssh_mock
+
+            tgen = mock.Mock(autospec=GenericTrafficGen)
+            tgen.traffic_finished = True
+            verified_dict = {"verified": True}
+            tgen.verify_traffic = lambda x: verified_dict
+            tgen.terminate = mock.Mock(return_value=True)
+            tgen.name = "tgen__1"
+            vnf = mock.Mock(autospec=GenericVNF)
+            vnf.runs_traffic = False
+            vnf.instantiate.side_effect = RuntimeError(
+                "error during instantiate")
+            vnf.terminate = mock.Mock(return_value=True)
+            self.s.vnfs = [tgen, vnf]
+            self.s.traffic_profile = mock.Mock()
+            self.s.collector = mock.Mock(autospec=Collector)
+            self.s.collector.get_kpi = \
+                mock.Mock(return_value={tgen.name: verified_dict})
+            self.s.map_topology_to_infrastructure = mock.Mock(return_value=0)
+            self.s.load_vnf_models = mock.Mock(return_value=self.s.vnfs)
+            self.s._fill_traffic_profile = \
+                mock.Mock(return_value=TRAFFIC_PROFILE)
+            with self.assertRaises(RuntimeError):
+                self.s.setup()
index 494e2c2..a8f697d 100644 (file)
@@ -18,6 +18,7 @@ import mock
 import six
 import unittest
 import ipaddress
+import time
 from collections import OrderedDict
 
 from yardstick.common import utils
@@ -108,6 +109,29 @@ class TestIxiaResourceHelper(unittest.TestCase):
         self.assertEqual('fake_samples', ixia_rhelper._queue.get())
         mock_tprofile.update_traffic_profile.assert_called_once()
 
+    def test_run_test(self):
+        mock_tprofile = mock.Mock()
+        mock_tprofile.config.duration = 10
+        mock_tprofile.get_drop_percentage.return_value = True, 'fake_samples'
+        ixia_rhelper = tg_rfc2544_ixia.IxiaResourceHelper(mock.Mock())
+        tasks_queue = mock.Mock()
+        tasks_queue.get.return_value = 'RUN_TRAFFIC'
+        results_queue = mock.Mock()
+        ixia_rhelper.rfc_helper = mock.Mock()
+        ixia_rhelper.vnfd_helper = mock.Mock()
+        ixia_rhelper._ix_scenario = mock.Mock()
+        ixia_rhelper.vnfd_helper.port_pairs.all_ports = []
+        with mock.patch.object(ixia_rhelper, 'generate_samples'), \
+                mock.patch.object(ixia_rhelper, '_build_ports'), \
+                mock.patch.object(ixia_rhelper, '_initialize_client'), \
+                mock.patch.object(utils, 'wait_until_true'):
+            ixia_rhelper.run_test(mock_tprofile, tasks_queue, results_queue)
+
+        self.assertEqual('fake_samples', ixia_rhelper._queue.get())
+        mock_tprofile.update_traffic_profile.assert_called_once()
+        tasks_queue.task_done.assert_called_once()
+        results_queue.put.assert_called_once_with('COMPLETE')
+
 
 @mock.patch.object(tg_rfc2544_ixia, 'ixnet_api')
 class TestIXIATrafficGen(unittest.TestCase):
@@ -426,6 +450,45 @@ class TestIXIATrafficGen(unittest.TestCase):
 
         _traffic_runner()
 
+    def test_run_traffic_once(self, *args):
+        vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
+        sut = tg_rfc2544_ixia.IxiaTrafficGen('vnf1', vnfd)
+        sut._init_traffic_process = mock.Mock()
+        sut._tasks_queue.put = mock.Mock()
+        sut.resource_helper.client_started.value = 0
+        sut.run_traffic_once(self.TRAFFIC_PROFILE)
+        sut._tasks_queue.put.assert_called_once_with("RUN_TRAFFIC")
+        sut._init_traffic_process.assert_called_once_with(self.TRAFFIC_PROFILE)
+
+    def test__test_runner(self, *args):
+        vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
+        sut = tg_rfc2544_ixia.IxiaTrafficGen('vnf1', vnfd)
+        tasks = 'tasks'
+        results = 'results'
+        sut.resource_helper = mock.Mock()
+        sut._test_runner(self.TRAFFIC_PROFILE, tasks, results)
+        sut.resource_helper.run_test.assert_called_once_with(self.TRAFFIC_PROFILE,
+                                                             tasks, results)
+
+    @mock.patch.object(time, 'sleep', return_value=0)
+    def test__init_traffic_process(self, *args):
+        vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
+        sut = tg_rfc2544_ixia.IxiaTrafficGen('vnf1', vnfd)
+        sut._test_runner = mock.Mock(return_value=0)
+        sut.resource_helper = mock.Mock()
+        sut.resource_helper.client_started.value = 0
+        sut._init_traffic_process(self.TRAFFIC_PROFILE)
+
+    def test_wait_on_traffic(self, *args):
+        vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
+        sut = tg_rfc2544_ixia.IxiaTrafficGen('vnf1', vnfd)
+        sut._tasks_queue.join = mock.Mock(return_value=0)
+        sut._result_queue.get = mock.Mock(return_value='COMPLETE')
+        result = sut.wait_on_traffic()
+        sut._tasks_queue.join.assert_called_once()
+        sut._result_queue.get.assert_called_once()
+        self.assertEqual(result, 'COMPLETE')
+
 
 class TestIxiaBasicScenario(unittest.TestCase):
     def setUp(self):