Merge "[workaround] Disable gui for aarch64"
[yardstick.git] / yardstick / benchmark / runners / base.py
index fbdf6c2..af25574 100755 (executable)
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+#
+# This is a modified copy of ``rally/rally/benchmark/runners/base.py``
 
-# yardstick comment: this is a modified copy of
-# rally/rally/benchmark/runners/base.py
-
-from __future__ import absolute_import
-
+import importlib
 import logging
 import multiprocessing
 import subprocess
 import time
 import traceback
-from subprocess import CalledProcessError
-
-import importlib
 
-from six.moves.queue import Empty
+from six import moves
 
-import yardstick.common.utils as utils
 from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
+from yardstick.common import utils
 from yardstick.dispatcher.base import Base as DispatcherBase
 
+
 log = logging.getLogger(__name__)
 
 
@@ -41,7 +40,7 @@ def _execute_shell_command(command):
     exitcode = 0
     try:
         output = subprocess.check_output(command, shell=True)
-    except CalledProcessError:
+    except subprocess.CalledProcessError:
         exitcode = -1
         output = traceback.format_exc()
         log.error("exec command '%s' error:\n ", command)
@@ -245,7 +244,7 @@ class Runner(object):
             log.debug("output_queue size %s", self.output_queue.qsize())
             try:
                 result.update(self.output_queue.get(True, 1))
-            except Empty:
+            except moves.queue.Empty:
                 pass
         return result
 
@@ -259,7 +258,7 @@ class Runner(object):
             log.debug("result_queue size %s", self.result_queue.qsize())
             try:
                 one_record = self.result_queue.get(True, 1)
-            except Empty:
+            except moves.queue.Empty:
                 pass
             else:
                 if output_in_influxdb:
@@ -272,3 +271,22 @@ class Runner(object):
         dispatchers = DispatcherBase.get(self.config['output_config'])
         dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
         dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)
+
+
+class RunnerProducer(producer.MessagingProducer):
+    """Class implementing the message producer for runners"""
+
+    def __init__(self, _id):
+        super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id)
+
+    def start_iteration(self, version=1, data=None):
+        data = {} if not data else data
+        self.send_message(
+            messaging.RUNNER_METHOD_START_ITERATION,
+            payloads.RunnerPayload(version=version, data=data))
+
+    def stop_iteration(self, version=1, data=None):
+        data = {} if not data else data
+        self.send_message(
+            messaging.RUNNER_METHOD_STOP_ITERATION,
+            payloads.RunnerPayload(version=version, data=data))