# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+#
+# This is a modified copy of ``rally/rally/benchmark/runners/base.py``
-# yardstick comment: this is a modified copy of
-# rally/rally/benchmark/runners/base.py
-
-from __future__ import absolute_import
-
+import importlib
import logging
import multiprocessing
import subprocess
import time
import traceback
-from subprocess import CalledProcessError
-import importlib
-
-from six.moves.queue import Empty
+from six import moves
-import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.common import utils
from yardstick.dispatcher.base import Base as DispatcherBase
+
log = logging.getLogger(__name__)
exitcode = 0
try:
output = subprocess.check_output(command, shell=True)
- except CalledProcessError:
+ except subprocess.CalledProcessError:
exitcode = -1
output = traceback.format_exc()
log.error("exec command '%s' error:\n ", command)
queue.put({'periodic-action-data': data})
+class ScenarioOutput(dict):
+
+ QUEUE_PUT_TIMEOUT = 10
+
+ def __init__(self, queue, **kwargs):
+ super(ScenarioOutput, self).__init__()
+ self._queue = queue
+ self.result_ext = dict()
+ for key, val in kwargs.items():
+ self.result_ext[key] = val
+ setattr(self, key, val)
+
+ def push(self, data=None, add_timestamp=True):
+ if data is None:
+ data = dict(self)
+
+ if add_timestamp:
+ result = {'timestamp': time.time(), 'data': data}
+ else:
+ result = data
+
+ for key in self.result_ext.keys():
+ result[key] = getattr(self, key)
+
+ self._queue.put(result, True, self.QUEUE_PUT_TIMEOUT)
+
+
class Runner(object):
runners = []
@staticmethod
def terminate_all():
"""Terminate all runners (subprocesses)"""
- log.debug("Terminating all runners", exc_info=True)
+ log.debug("Terminating all runners")
# release dumper process as some errors before any runner is created
if not Runner.runners:
log.debug("output_queue size %s", self.output_queue.qsize())
try:
result.update(self.output_queue.get(True, 1))
- except Empty:
+ except moves.queue.Empty:
pass
return result
log.debug("result_queue size %s", self.result_queue.qsize())
try:
one_record = self.result_queue.get(True, 1)
- except Empty:
+ except moves.queue.Empty:
pass
else:
if output_in_influxdb: