X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fbenchmark%2Frunners%2Fbase.py;h=94de45d1eae98a002983c7af23a7fdec826e4e7f;hb=56962db107f852a55101e8a9f3b93bd6982485de;hp=99386a440151ec94c5e8d5b16ad366c5752f4c10;hpb=ecfb54a60eb8e4395b582f114c92f69f547c3e45;p=yardstick.git diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index 99386a440..94de45d1e 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -12,27 +12,23 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +# +# This is a modified copy of ``rally/rally/benchmark/runners/base.py`` -# yardstick comment: this is a modified copy of -# rally/rally/benchmark/runners/base.py - -from __future__ import absolute_import - +import importlib import logging import multiprocessing import subprocess import time import traceback -from subprocess import CalledProcessError -import importlib - -from six.moves.queue import Empty +from six import moves -import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario +from yardstick.common import utils from yardstick.dispatcher.base import Base as DispatcherBase + log = logging.getLogger(__name__) @@ -41,7 +37,7 @@ def _execute_shell_command(command): exitcode = 0 try: output = subprocess.check_output(command, shell=True) - except CalledProcessError: + except subprocess.CalledProcessError: exitcode = -1 output = traceback.format_exc() log.error("exec command '%s' error:\n ", command) @@ -81,6 +77,33 @@ def _periodic_action(interval, command, queue): queue.put({'periodic-action-data': data}) +class ScenarioOutput(dict): + + QUEUE_PUT_TIMEOUT = 10 + + def __init__(self, queue, **kwargs): + super(ScenarioOutput, self).__init__() + self._queue = queue + self.result_ext = dict() + for key, val in kwargs.items(): + self.result_ext[key] = val + setattr(self, key, val) + + def push(self, data=None, add_timestamp=True): + if data is None: + data = dict(self) + + if add_timestamp: + result = {'timestamp': time.time(), 'data': data} + else: + result = data + + for key in self.result_ext.keys(): + result[key] = getattr(self, key) + + self._queue.put(result, True, self.QUEUE_PUT_TIMEOUT) + + class Runner(object): runners = [] @@ -121,7 +144,7 @@ class Runner(object): @staticmethod def terminate_all(): """Terminate all runners (subprocesses)""" - log.debug("Terminating all runners", exc_info=True) + log.debug("Terminating all runners") # release dumper process as some errors before any runner is created if not Runner.runners: @@ -245,7 +268,7 @@ class Runner(object): log.debug("output_queue size %s", self.output_queue.qsize()) try: result.update(self.output_queue.get(True, 1)) - except Empty: + except moves.queue.Empty: pass return result @@ -259,7 +282,7 @@ class Runner(object): log.debug("result_queue size %s", self.result_queue.qsize()) try: one_record = self.result_queue.get(True, 1) - except Empty: + except moves.queue.Empty: pass else: if output_in_influxdb: