Add new scenario NSPerf-RFC2544
[yardstick.git] / yardstick / benchmark / runners / base.py
index 99386a4..94de45d 100755 (executable)
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+#
+# This is a modified copy of ``rally/rally/benchmark/runners/base.py``
 
-# yardstick comment: this is a modified copy of
-# rally/rally/benchmark/runners/base.py
-
-from __future__ import absolute_import
-
+import importlib
 import logging
 import multiprocessing
 import subprocess
 import time
 import traceback
-from subprocess import CalledProcessError
 
-import importlib
-
-from six.moves.queue import Empty
+from six import moves
 
-import yardstick.common.utils as utils
 from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.common import utils
 from yardstick.dispatcher.base import Base as DispatcherBase
 
+
 log = logging.getLogger(__name__)
 
 
@@ -41,7 +37,7 @@ def _execute_shell_command(command):
     exitcode = 0
     try:
         output = subprocess.check_output(command, shell=True)
-    except CalledProcessError:
+    except subprocess.CalledProcessError:
         exitcode = -1
         output = traceback.format_exc()
         log.error("exec command '%s' error:\n ", command)
@@ -81,6 +77,33 @@ def _periodic_action(interval, command, queue):
         queue.put({'periodic-action-data': data})
 
 
+class ScenarioOutput(dict):
+
+    QUEUE_PUT_TIMEOUT = 10
+
+    def __init__(self, queue, **kwargs):
+        super(ScenarioOutput, self).__init__()
+        self._queue = queue
+        self.result_ext = dict()
+        for key, val in kwargs.items():
+            self.result_ext[key] = val
+            setattr(self, key, val)
+
+    def push(self, data=None, add_timestamp=True):
+        if data is None:
+            data = dict(self)
+
+        if add_timestamp:
+            result = {'timestamp': time.time(), 'data': data}
+        else:
+            result = data
+
+        for key in self.result_ext.keys():
+            result[key] = getattr(self, key)
+
+        self._queue.put(result, True, self.QUEUE_PUT_TIMEOUT)
+
+
 class Runner(object):
     runners = []
 
@@ -121,7 +144,7 @@ class Runner(object):
     @staticmethod
     def terminate_all():
         """Terminate all runners (subprocesses)"""
-        log.debug("Terminating all runners", exc_info=True)
+        log.debug("Terminating all runners")
 
         # release dumper process as some errors before any runner is created
         if not Runner.runners:
@@ -245,7 +268,7 @@ class Runner(object):
             log.debug("output_queue size %s", self.output_queue.qsize())
             try:
                 result.update(self.output_queue.get(True, 1))
-            except Empty:
+            except moves.queue.Empty:
                 pass
         return result
 
@@ -259,7 +282,7 @@ class Runner(object):
             log.debug("result_queue size %s", self.result_queue.qsize())
             try:
                 one_record = self.result_queue.get(True, 1)
-            except Empty:
+            except moves.queue.Empty:
                 pass
             else:
                 if output_in_influxdb: