#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+#
+# This is a modified copy of ``rally/rally/benchmark/runners/base.py``
 
-# yardstick comment: this is a modified copy of
-# rally/rally/benchmark/runners/base.py
-
-from __future__ import absolute_import
-
+import importlib
 import logging
 import multiprocessing
 import subprocess
 import time
 import traceback
-from subprocess import CalledProcessError
-
-import importlib
 
-from six.moves.queue import Empty
+from six import moves
 
-import yardstick.common.utils as utils
 from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
+from yardstick.common import utils
 from yardstick.dispatcher.base import Base as DispatcherBase
 
+
 log = logging.getLogger(__name__)
 
 
     exitcode = 0
     try:
         output = subprocess.check_output(command, shell=True)
-    except CalledProcessError:
+    except subprocess.CalledProcessError:
         exitcode = -1
         output = traceback.format_exc()
         log.error("exec command '%s' error:\n ", command)
             log.debug("output_queue size %s", self.output_queue.qsize())
             try:
                 result.update(self.output_queue.get(True, 1))
-            except Empty:
+            except moves.queue.Empty:
                 pass
         return result
 
             log.debug("result_queue size %s", self.result_queue.qsize())
             try:
                 one_record = self.result_queue.get(True, 1)
-            except Empty:
+            except moves.queue.Empty:
                 pass
             else:
                 if output_in_influxdb:
         dispatchers = DispatcherBase.get(self.config['output_config'])
         dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
         dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)
+
+
+class RunnerProducer(producer.MessagingProducer):
+    """Class implementing the message producer for runners"""
+
+    def __init__(self, _id):
+        super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id)
+
+    def start_iteration(self, version=1, data=None):
+        data = {} if not data else data
+        self.send_message(
+            messaging.RUNNER_METHOD_START_ITERATION,
+            payloads.RunnerPayload(version=version, data=data))
+
+    def stop_iteration(self, version=1, data=None):
+        data = {} if not data else data
+        self.send_message(
+            messaging.RUNNER_METHOD_STOP_ITERATION,
+            payloads.RunnerPayload(version=version, data=data))
 
 
 import os
 
-from yardstick.benchmark.runners import base
+from yardstick.benchmark.runners import base as base_runner
 from yardstick.common import exceptions
 from yardstick.common import messaging
 from yardstick.common import utils
 
     mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
     mq_consumer.start_rpc_server()
+    mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id'])
 
     iteration_index = 1
     while 'run' in run_step:
         result = None
         errors = ''
         mq_consumer.iteration_index = iteration_index
+        mq_producer.start_iteration()
 
         try:
             utils.wait_until_true(
             errors = traceback.format_exc()
             LOG.exception(errors)
 
+        mq_producer.stop_iteration()
+
         if result:
             output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
         benchmark_output = {'timestamp': time.time(),
     mq_consumer.stop_rpc_server()
 
 
-class IterationIPCRunner(base.Runner):
+class IterationIPCRunner(base_runner.Runner):
     """Run a scenario for a configurable number of times.
 
     Each iteration has a configurable timeout. The loop control depends on the
 
-# Copyright (c) 2018 Intel Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
 
 # Topics.
 TOPIC_TG = 'topic_traffic_generator'
+TOPIC_RUNNER = 'topic_runner'
 
 # Methods.
 # Traffic generator consumers methods. Names must match the methods implemented
-# in the consumer endpoint class, ``RunnerIterationIPCEndpoint``.
+# in the consumer endpoint class.
 TG_METHOD_STARTED = 'tg_method_started'
 TG_METHOD_FINISHED = 'tg_method_finished'
 TG_METHOD_ITERATION = 'tg_method_iteration'
+
+# Runner consumers methods. Names must match the methods implemented in the
+# consumer endpoint class.
+RUNNER_METHOD_START_ITERATION = "runner_method_start_iteration"
+RUNNER_METHOD_STOP_ITERATION = "runner_method_stop_iteration"
 
                # injection. The content will depend on the generator and the
                # traffic type.
     }
+
+
+class RunnerPayload(Payload):
+    """Base runner payload class"""
+    REQUIRED_FIELDS = {
+        'version',  # (str) version of the payload transmitted.
+        'data'  # (dict) generic container of data to be used if needed.
+    }
 
 ##############################################################################
 
 import time
+import uuid
 
 import mock
+from oslo_config import cfg
+import oslo_messaging
 import subprocess
 
 from yardstick.benchmark.runners import base as runner_base
 from yardstick.benchmark.runners import iteration
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
 from yardstick.tests.unit import base as ut_base
 
 
 
         with self.assertRaises(NotImplementedError):
             runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())
+
+
+class RunnerProducerTestCase(ut_base.BaseUnitTestCase):
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(cfg, 'CONF')
+    def test__init(self, mock_config, mock_transport, mock_rpcclient,
+                   mock_target):
+        _id = uuid.uuid1().int
+        runner_producer = runner_base.RunnerProducer(_id)
+        mock_transport.assert_called_once_with(
+            mock_config, url='rabbit://yardstick:yardstick@localhost:5672/')
+        mock_target.assert_called_once_with(topic=messaging.TOPIC_RUNNER,
+                                            fanout=True,
+                                            server=messaging.SERVER)
+        mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target')
+        self.assertEqual(_id, runner_producer._id)
+        self.assertEqual(messaging.TOPIC_RUNNER, runner_producer._topic)
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(payloads, 'RunnerPayload', return_value='runner_pload')
+    def test_start_iteration(self, mock_runner_payload, *args):
+        runner_producer = runner_base.RunnerProducer(uuid.uuid1().int)
+        with mock.patch.object(runner_producer,
+                               'send_message') as mock_message:
+            runner_producer.start_iteration(version=10)
+
+        mock_message.assert_called_once_with(
+            messaging.RUNNER_METHOD_START_ITERATION, 'runner_pload')
+        mock_runner_payload.assert_called_once_with(version=10, data={})
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(payloads, 'RunnerPayload', return_value='runner_pload')
+    def test_stop_iteration(self, mock_runner_payload, *args):
+        runner_producer = runner_base.RunnerProducer(uuid.uuid1().int)
+        with mock.patch.object(runner_producer,
+                               'send_message') as mock_message:
+            runner_producer.stop_iteration(version=15)
+
+        mock_message.assert_called_once_with(
+            messaging.RUNNER_METHOD_STOP_ITERATION, 'runner_pload')
+        mock_runner_payload.assert_called_once_with(version=15, data={})
 
             payloads.TrafficGeneratorPayload(iteration=10, kpi={})
         with self.assertRaises(exceptions.PayloadMissingAttributes):
             payloads.TrafficGeneratorPayload(iteration=10)
+
+
+class RunnerPayloadTestCase(ut_base.BaseUnitTestCase):
+
+    def test_init(self):
+        runner_payload = payloads.RunnerPayload(version=5,
+                                                data={'key1': 'value1'})
+        self.assertEqual(5, runner_payload.version)
+        self.assertEqual({'key1': 'value1'}, runner_payload.data)
+
+    def test__init_missing_required_fields(self):
+        with self.assertRaises(exceptions.PayloadMissingAttributes):
+            payloads.RunnerPayload(version=1)
+        with self.assertRaises(exceptions.PayloadMissingAttributes):
+            payloads.RunnerPayload(data=None)
+        with self.assertRaises(exceptions.PayloadMissingAttributes):
+            payloads.RunnerPayload()