Make "IterationIPC" MQ producer for VNF control messages 89/59489/6
authorRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
Tue, 3 Jul 2018 08:14:35 +0000 (09:14 +0100)
committerRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
Thu, 5 Jul 2018 07:59:45 +0000 (07:59 +0000)
"IterationIPC" runner class is a consumer for MQ aware VNFs. A MQ aware
traffic generator can send "started", "finished" and "iteration"
messages.

This feature implements a MQ producer in the runner in order to send
messages to the VNFs. The messages implemented are:
  - "start_iteration"
  - "stop_iteration"

JIRA: YARDSTICK-1286

Change-Id: I706f9a9dda5e5beed52231be7d71452945a7dbed
Signed-off-by: Rodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
yardstick/benchmark/runners/base.py
yardstick/benchmark/runners/iteration_ipc.py
yardstick/common/messaging/__init__.py
yardstick/common/messaging/payloads.py
yardstick/tests/unit/benchmark/runner/test_base.py
yardstick/tests/unit/common/messaging/test_payloads.py

index fbdf6c2..af25574 100755 (executable)
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+#
+# This is a modified copy of ``rally/rally/benchmark/runners/base.py``
 
-# yardstick comment: this is a modified copy of
-# rally/rally/benchmark/runners/base.py
-
-from __future__ import absolute_import
-
+import importlib
 import logging
 import multiprocessing
 import subprocess
 import time
 import traceback
-from subprocess import CalledProcessError
-
-import importlib
 
-from six.moves.queue import Empty
+from six import moves
 
-import yardstick.common.utils as utils
 from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
+from yardstick.common import utils
 from yardstick.dispatcher.base import Base as DispatcherBase
 
+
 log = logging.getLogger(__name__)
 
 
@@ -41,7 +40,7 @@ def _execute_shell_command(command):
     exitcode = 0
     try:
         output = subprocess.check_output(command, shell=True)
-    except CalledProcessError:
+    except subprocess.CalledProcessError:
         exitcode = -1
         output = traceback.format_exc()
         log.error("exec command '%s' error:\n ", command)
@@ -245,7 +244,7 @@ class Runner(object):
             log.debug("output_queue size %s", self.output_queue.qsize())
             try:
                 result.update(self.output_queue.get(True, 1))
-            except Empty:
+            except moves.queue.Empty:
                 pass
         return result
 
@@ -259,7 +258,7 @@ class Runner(object):
             log.debug("result_queue size %s", self.result_queue.qsize())
             try:
                 one_record = self.result_queue.get(True, 1)
-            except Empty:
+            except moves.queue.Empty:
                 pass
             else:
                 if output_in_influxdb:
@@ -272,3 +271,22 @@ class Runner(object):
         dispatchers = DispatcherBase.get(self.config['output_config'])
         dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
         dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)
+
+
+class RunnerProducer(producer.MessagingProducer):
+    """Class implementing the message producer for runners"""
+
+    def __init__(self, _id):
+        super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id)
+
+    def start_iteration(self, version=1, data=None):
+        data = {} if not data else data
+        self.send_message(
+            messaging.RUNNER_METHOD_START_ITERATION,
+            payloads.RunnerPayload(version=version, data=data))
+
+    def stop_iteration(self, version=1, data=None):
+        data = {} if not data else data
+        self.send_message(
+            messaging.RUNNER_METHOD_STOP_ITERATION,
+            payloads.RunnerPayload(version=version, data=data))
index 43aa7f4..a0335fd 100644 (file)
@@ -26,7 +26,7 @@ import traceback
 
 import os
 
-from yardstick.benchmark.runners import base
+from yardstick.benchmark.runners import base as base_runner
 from yardstick.common import exceptions
 from yardstick.common import messaging
 from yardstick.common import utils
@@ -131,6 +131,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
 
     mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
     mq_consumer.start_rpc_server()
+    mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id'])
 
     iteration_index = 1
     while 'run' in run_step:
@@ -141,6 +142,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
         result = None
         errors = ''
         mq_consumer.iteration_index = iteration_index
+        mq_producer.start_iteration()
 
         try:
             utils.wait_until_true(
@@ -151,6 +153,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             errors = traceback.format_exc()
             LOG.exception(errors)
 
+        mq_producer.stop_iteration()
+
         if result:
             output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
         benchmark_output = {'timestamp': time.time(),
@@ -181,7 +185,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
     mq_consumer.stop_rpc_server()
 
 
-class IterationIPCRunner(base.Runner):
+class IterationIPCRunner(base_runner.Runner):
     """Run a scenario for a configurable number of times.
 
     Each iteration has a configurable timeout. The loop control depends on the
index dc8c51b..bd700d9 100644 (file)
@@ -1,14 +1,3 @@
-# Copyright (c) 2018 Intel Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
@@ -29,10 +18,16 @@ RPC_SERVER_EXECUTOR = 'threading'
 
 # Topics.
 TOPIC_TG = 'topic_traffic_generator'
+TOPIC_RUNNER = 'topic_runner'
 
 # Methods.
 # Traffic generator consumers methods. Names must match the methods implemented
-# in the consumer endpoint class, ``RunnerIterationIPCEndpoint``.
+# in the consumer endpoint class.
 TG_METHOD_STARTED = 'tg_method_started'
 TG_METHOD_FINISHED = 'tg_method_finished'
 TG_METHOD_ITERATION = 'tg_method_iteration'
+
+# Runner consumers methods. Names must match the methods implemented in the
+# consumer endpoint class.
+RUNNER_METHOD_START_ITERATION = "runner_method_start_iteration"
+RUNNER_METHOD_STOP_ITERATION = "runner_method_stop_iteration"
index c59c875..8ede1e5 100644 (file)
@@ -63,3 +63,11 @@ class TrafficGeneratorPayload(Payload):
                # injection. The content will depend on the generator and the
                # traffic type.
     }
+
+
+class RunnerPayload(Payload):
+    """Base runner payload class"""
+    REQUIRED_FIELDS = {
+        'version',  # (str) version of the payload transmitted.
+        'data'  # (dict) generic container of data to be used if needed.
+    }
index 559c991..49ba1ef 100644 (file)
@@ -8,12 +8,17 @@
 ##############################################################################
 
 import time
+import uuid
 
 import mock
+from oslo_config import cfg
+import oslo_messaging
 import subprocess
 
 from yardstick.benchmark.runners import base as runner_base
 from yardstick.benchmark.runners import iteration
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
 from yardstick.tests.unit import base as ut_base
 
 
@@ -94,3 +99,54 @@ class RunnerTestCase(ut_base.BaseUnitTestCase):
 
         with self.assertRaises(NotImplementedError):
             runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())
+
+
+class RunnerProducerTestCase(ut_base.BaseUnitTestCase):
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(cfg, 'CONF')
+    def test__init(self, mock_config, mock_transport, mock_rpcclient,
+                   mock_target):
+        _id = uuid.uuid1().int
+        runner_producer = runner_base.RunnerProducer(_id)
+        mock_transport.assert_called_once_with(
+            mock_config, url='rabbit://yardstick:yardstick@localhost:5672/')
+        mock_target.assert_called_once_with(topic=messaging.TOPIC_RUNNER,
+                                            fanout=True,
+                                            server=messaging.SERVER)
+        mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target')
+        self.assertEqual(_id, runner_producer._id)
+        self.assertEqual(messaging.TOPIC_RUNNER, runner_producer._topic)
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(payloads, 'RunnerPayload', return_value='runner_pload')
+    def test_start_iteration(self, mock_runner_payload, *args):
+        runner_producer = runner_base.RunnerProducer(uuid.uuid1().int)
+        with mock.patch.object(runner_producer,
+                               'send_message') as mock_message:
+            runner_producer.start_iteration(version=10)
+
+        mock_message.assert_called_once_with(
+            messaging.RUNNER_METHOD_START_ITERATION, 'runner_pload')
+        mock_runner_payload.assert_called_once_with(version=10, data={})
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(payloads, 'RunnerPayload', return_value='runner_pload')
+    def test_stop_iteration(self, mock_runner_payload, *args):
+        runner_producer = runner_base.RunnerProducer(uuid.uuid1().int)
+        with mock.patch.object(runner_producer,
+                               'send_message') as mock_message:
+            runner_producer.stop_iteration(version=15)
+
+        mock_message.assert_called_once_with(
+            messaging.RUNNER_METHOD_STOP_ITERATION, 'runner_pload')
+        mock_runner_payload.assert_called_once_with(version=15, data={})
index 7930b8d..37b1f19 100644 (file)
@@ -63,3 +63,20 @@ class TrafficGeneratorPayloadTestCase(ut_base.BaseUnitTestCase):
             payloads.TrafficGeneratorPayload(iteration=10, kpi={})
         with self.assertRaises(exceptions.PayloadMissingAttributes):
             payloads.TrafficGeneratorPayload(iteration=10)
+
+
+class RunnerPayloadTestCase(ut_base.BaseUnitTestCase):
+
+    def test_init(self):
+        runner_payload = payloads.RunnerPayload(version=5,
+                                                data={'key1': 'value1'})
+        self.assertEqual(5, runner_payload.version)
+        self.assertEqual({'key1': 'value1'}, runner_payload.data)
+
+    def test__init_missing_required_fields(self):
+        with self.assertRaises(exceptions.PayloadMissingAttributes):
+            payloads.RunnerPayload(version=1)
+        with self.assertRaises(exceptions.PayloadMissingAttributes):
+            payloads.RunnerPayload(data=None)
+        with self.assertRaises(exceptions.PayloadMissingAttributes):
+            payloads.RunnerPayload()