Enable the MQ producer in "SampleVNFTrafficGen" class 81/56281/23
authorRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
Mon, 23 Apr 2018 16:06:25 +0000 (17:06 +0100)
committerRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
Thu, 5 Jul 2018 07:41:51 +0000 (08:41 +0100)
Now all traffic generators using the default implementation of
"ClientResourceHelper.run_traffic" will update the status of
the traffic injection using the MQ.

The available methods are listed in common.messsaging (VNF_METHOD_*):
- tg_method_started: VNF app started
- tg_method_finished: VNF app finished
- tg_method_iteration: VNF app execution loop started

JIRA: YARDSTICK-1127

Change-Id: I12829e1762fc20cc95da3b50767a66f031e25ee8
Signed-off-by: Rodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
yardstick/network_services/vnf_generic/vnf/base.py
yardstick/network_services/vnf_generic/vnf/prox_helpers.py
yardstick/network_services/vnf_generic/vnf/sample_vnf.py
yardstick/network_services/vnf_generic/vnf/tg_ping.py
yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py
yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py
yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py
yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py
yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py
yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py

index 804257f..c385eb6 100644 (file)
@@ -19,6 +19,7 @@ import logging
 import six
 
 from yardstick.common import messaging
+from yardstick.common.messaging import payloads
 from yardstick.common.messaging import producer
 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
 
@@ -150,6 +151,28 @@ class TrafficGeneratorProducer(producer.MessagingProducer):
         super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG,
                                                        pid=pid)
 
+    def tg_method_started(self, version=1):
+        """Send a message to inform the traffic generation has started"""
+        self.send_message(
+            messaging.TG_METHOD_STARTED,
+            payloads.TrafficGeneratorPayload(version=version, iteration=0,
+                                             kpi={}))
+
+    def tg_method_finished(self, version=1):
+        """Send a message to inform the traffic generation has finished"""
+        self.send_message(
+            messaging.TG_METHOD_FINISHED,
+            payloads.TrafficGeneratorPayload(version=version, iteration=0,
+                                             kpi={}))
+
+    def tg_method_iteration(self, iteration, version=1, kpi=None):
+        """Send a message, with KPI, once an iteration has finished"""
+        kpi = {} if kpi is None else kpi
+        self.send_message(
+            messaging.TG_METHOD_ITERATION,
+            payloads.TrafficGeneratorPayload(version=version,
+                                             iteration=iteration, kpi=kpi))
+
 
 @six.add_metaclass(abc.ABCMeta)
 class GenericVNF(object):
index 6d28f47..3241719 100644 (file)
@@ -969,7 +969,7 @@ class ProxResourceHelper(ClientResourceHelper):
             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
         return self._test_type
 
-    def run_traffic(self, traffic_profile):
+    def run_traffic(self, traffic_profile, *args):
         self._queue.cancel_join_thread()
         self.lower = 0.0
         self.upper = 100.0
index 1ee71aa..a37f4f7 100644 (file)
 
 import logging
 from multiprocessing import Queue, Value, Process
-
 import os
 import posixpath
 import re
-import six
+import uuid
 import subprocess
 import time
 
+import six
+
 from trex_stl_lib.trex_stl_client import LoggerApi
 from trex_stl_lib.trex_stl_client import STLClient
 from trex_stl_lib.trex_stl_exceptions import STLError
@@ -408,12 +409,13 @@ class ClientResourceHelper(ResourceHelper):
         time.sleep(self.QUEUE_WAIT_TIME)
         self._queue.put(samples)
 
-    def run_traffic(self, traffic_profile):
+    def run_traffic(self, traffic_profile, mq_producer):
         # if we don't do this we can hang waiting for the queue to drain
         # have to do this in the subprocess
         self._queue.cancel_join_thread()
         # fixme: fix passing correct trex config file,
         # instead of searching the default path
+        mq_producer.tg_method_started()
         try:
             self._build_ports()
             self.client = self._connect()
@@ -421,8 +423,11 @@ class ClientResourceHelper(ResourceHelper):
             self.client.remove_all_streams(self.all_ports)  # remove all streams
             traffic_profile.register_generator(self)
 
+            iteration_index = 0
             while self._terminated.value == 0:
+                iteration_index += 1
                 self._run_traffic_once(traffic_profile)
+                mq_producer.tg_method_iteration(iteration_index)
 
             self.client.stop(self.all_ports)
             self.client.disconnect()
@@ -433,6 +438,8 @@ class ClientResourceHelper(ResourceHelper):
                 return  # return if trex/tg server is stopped.
             raise
 
+        mq_producer.tg_method_finished()
+
     def terminate(self):
         self._terminated.value = 1  # stop client
 
@@ -911,12 +918,13 @@ class SampleVNFTrafficGen(GenericTrafficGen):
                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
                 return self._tg_process.exitcode
 
-    def _traffic_runner(self, traffic_profile):
+    def _traffic_runner(self, traffic_profile, mq_pid):
         # always drop connections first thing in new processes
         # so we don't get paramiko errors
         self.ssh_helper.drop_connection()
         LOG.info("Starting %s client...", self.APP_NAME)
-        self.resource_helper.run_traffic(traffic_profile)
+        self._mq_producer = self._setup_mq_producer(mq_pid)
+        self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
 
     def run_traffic(self, traffic_profile):
         """ Generate traffic on the wire according to the given params.
@@ -926,10 +934,12 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         :param traffic_profile:
         :return: True/False
         """
-        name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
+        name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
+                                    traffic_profile.__class__.__name__,
                                     os.getpid())
+        mq_pid = uuid.uuid1().int
         self._traffic_process = Process(name=name, target=self._traffic_runner,
-                                        args=(traffic_profile,))
+                                        args=(traffic_profile, mq_pid))
         self._traffic_process.start()
         # Wait for traffic process to start
         while self.resource_helper.client_started.value == 0:
@@ -938,7 +948,7 @@ class SampleVNFTrafficGen(GenericTrafficGen):
             if not self._traffic_process.is_alive():
                 break
 
-        return self._traffic_process.is_alive()
+        return mq_pid
 
     def collect_kpi(self):
         # check if the tg processes have exited
index a989543..4050dc6 100644 (file)
@@ -71,7 +71,7 @@ class PingResourceHelper(ClientResourceHelper):
         self._queue = Queue()
         self._parser = PingParser(self._queue)
 
-    def run_traffic(self, traffic_profile):
+    def run_traffic(self, traffic_profile, *args):
         # drop the connection in order to force a new one
         self.ssh_helper.drop_connection()
 
index a1f9fbe..875ae93 100644 (file)
@@ -102,7 +102,7 @@ class IxiaResourceHelper(ClientResourceHelper):
         self.client.assign_ports()
         self.client.create_traffic_model()
 
-    def run_traffic(self, traffic_profile):
+    def run_traffic(self, traffic_profile, *args):
         if self._terminated.value:
             return
 
index ca3831c..11cba4d 100644 (file)
@@ -23,6 +23,7 @@ import oslo_messaging
 import unittest
 
 from yardstick.common import messaging
+from yardstick.common.messaging import payloads
 from yardstick.network_services.vnf_generic.vnf import base
 from yardstick.ssh import SSH
 
@@ -259,3 +260,51 @@ class TrafficGeneratorProducerTestCase(unittest.TestCase):
         mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target')
         self.assertEqual(pid, tg_producer._pid)
         self.assertEqual(messaging.TOPIC_TG, tg_producer._topic)
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(payloads, 'TrafficGeneratorPayload',
+                       return_value='tg_pload')
+    def test_tg_method_started(self, mock_tg_payload, *args):
+        tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int)
+        with mock.patch.object(tg_producer, 'send_message') as mock_message:
+            tg_producer.tg_method_started(version=10)
+
+        mock_message.assert_called_once_with(messaging.TG_METHOD_STARTED,
+                                             'tg_pload')
+        mock_tg_payload.assert_called_once_with(version=10, iteration=0,
+                                                kpi={})
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(payloads, 'TrafficGeneratorPayload',
+                       return_value='tg_pload')
+    def test_tg_method_finished(self, mock_tg_payload, *args):
+        tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int)
+        with mock.patch.object(tg_producer, 'send_message') as mock_message:
+            tg_producer.tg_method_finished(version=20)
+
+        mock_message.assert_called_once_with(messaging.TG_METHOD_FINISHED,
+                                             'tg_pload')
+        mock_tg_payload.assert_called_once_with(version=20, iteration=0,
+                                                kpi={})
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(payloads, 'TrafficGeneratorPayload',
+                       return_value='tg_pload')
+    def test_tg_method_iteration(self, mock_tg_payload, *args):
+        tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int)
+        with mock.patch.object(tg_producer, 'send_message') as mock_message:
+            tg_producer.tg_method_iteration(100, version=30, kpi={'k': 'v'})
+
+        mock_message.assert_called_once_with(messaging.TG_METHOD_ITERATION,
+                                             'tg_pload')
+        mock_tg_payload.assert_called_once_with(version=30, iteration=100,
+                                                kpi={'k': 'v'})
index 331e80d..48ae3b5 100644 (file)
@@ -1090,6 +1090,57 @@ class TestClientResourceHelper(unittest.TestCase):
 
         self.assertIs(client_resource_helper._connect(client), client)
 
+    @mock.patch.object(ClientResourceHelper, '_build_ports')
+    @mock.patch.object(ClientResourceHelper, '_run_traffic_once')
+    def test_run_traffic(self, mock_run_traffic_once, mock_build_ports):
+        client_resource_helper = ClientResourceHelper(mock.Mock())
+        client = mock.Mock()
+        traffic_profile = mock.Mock()
+        mq_producer = mock.Mock()
+        with mock.patch.object(client_resource_helper, '_connect') \
+                as mock_connect, \
+                mock.patch.object(client_resource_helper, '_terminated') \
+                as mock_terminated:
+            mock_connect.return_value = client
+            type(mock_terminated).value = mock.PropertyMock(
+                side_effect=[0, 1, lambda x: x])
+            client_resource_helper.run_traffic(traffic_profile, mq_producer)
+
+        mock_build_ports.assert_called_once()
+        traffic_profile.register_generator.assert_called_once()
+        mq_producer.tg_method_started.assert_called_once()
+        mq_producer.tg_method_finished.assert_called_once()
+        mq_producer.tg_method_iteration.assert_called_once_with(1)
+        mock_run_traffic_once.assert_called_once_with(traffic_profile)
+
+    @mock.patch.object(ClientResourceHelper, '_build_ports')
+    @mock.patch.object(ClientResourceHelper, '_run_traffic_once',
+                       side_effect=Exception)
+    def test_run_traffic_exception(self, mock_run_traffic_once,
+                                   mock_build_ports):
+        client_resource_helper = ClientResourceHelper(mock.Mock())
+        client = mock.Mock()
+        traffic_profile = mock.Mock()
+        mq_producer = mock.Mock()
+        with mock.patch.object(client_resource_helper, '_connect') \
+                as mock_connect, \
+                mock.patch.object(client_resource_helper, '_terminated') \
+                as mock_terminated:
+            mock_connect.return_value = client
+            type(mock_terminated).value = mock.PropertyMock(return_value=0)
+            mq_producer.reset_mock()
+            # NOTE(ralonsoh): "trex_stl_exceptions.STLError" is mocked
+            with self.assertRaises(Exception):
+                client_resource_helper.run_traffic(traffic_profile,
+                                                   mq_producer)
+
+        mock_build_ports.assert_called_once()
+        traffic_profile.register_generator.assert_called_once()
+        mock_run_traffic_once.assert_called_once_with(traffic_profile)
+        mq_producer.tg_method_started.assert_called_once()
+        mq_producer.tg_method_finished.assert_not_called()
+        mq_producer.tg_method_iteration.assert_not_called()
+
 
 class TestRfc2544ResourceHelper(unittest.TestCase):
 
index 050aa4a..3e2f598 100644 (file)
@@ -406,7 +406,8 @@ class TestProxTrafficGen(unittest.TestCase):
         sut.setup_helper.prox_config_dict = {}
         sut._connect_client = mock.Mock(autospec=STLClient)
         sut._connect_client.get_stats = mock.Mock(return_value="0")
-        sut._traffic_runner(mock_traffic_profile)
+        sut._setup_mq_producer = mock.Mock(return_value='mq_producer')
+        sut._traffic_runner(mock_traffic_profile, mock.ANY)
 
     @mock.patch('yardstick.network_services.vnf_generic.vnf.prox_helpers.socket')
     @mock.patch(SSH_HELPER)
index 42ac40b..4ade157 100644 (file)
@@ -379,7 +379,8 @@ class TestIXIATrafficGen(unittest.TestCase):
                     mock.mock_open(), create=True)
         @mock.patch('yardstick.network_services.vnf_generic.vnf.tg_rfc2544_ixia.LOG.exception')
         def _traffic_runner(*args):
-            result = sut._traffic_runner(mock_traffic_profile)
+            sut._setup_mq_producer = mock.Mock(return_value='mq_producer')
+            result = sut._traffic_runner(mock_traffic_profile, mock.ANY)
             self.assertIsNone(result)
 
         _traffic_runner()
index 350ba84..08ae274 100644 (file)
@@ -387,8 +387,9 @@ class TestTrexTrafficGen(unittest.TestCase):
         # must generate cfg before we can run traffic so Trex port mapping is
         # created
         self.sut.resource_helper.generate_cfg()
+        self.sut._setup_mq_producer = mock.Mock()
         with mock.patch.object(self.sut.resource_helper, 'run_traffic'):
-            self.sut._traffic_runner(mock_traffic_profile)
+            self.sut._traffic_runner(mock_traffic_profile, mock.ANY)
 
     def test__generate_trex_cfg(self):
         vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]