Merge changes from topics 'YARDSTICK-1250', 'YARDSTICK-1249'
[yardstick.git] / yardstick / benchmark / runners / duration.py
index 1412c0c..60f1fa5 100644 (file)
 # yardstick comment: this is a modified copy of
 # rally/rally/benchmark/runners/constant.py
 
 # yardstick comment: this is a modified copy of
 # rally/rally/benchmark/runners/constant.py
 
-'''A runner that runs a specific time before it returns
-'''
+"""A runner that runs a specific time before it returns
+"""
 
 
+from __future__ import absolute_import
 import os
 import multiprocessing
 import logging
 import os
 import multiprocessing
 import logging
@@ -26,12 +27,16 @@ import traceback
 import time
 
 from yardstick.benchmark.runners import base
 import time
 
 from yardstick.benchmark.runners import base
+from yardstick.common import exceptions as y_exc
 
 LOG = logging.getLogger(__name__)
 
 
 
 LOG = logging.getLogger(__name__)
 
 
+QUEUE_PUT_TIMEOUT = 10
+
+
 def _worker_process(queue, cls, method_name, scenario_cfg,
 def _worker_process(queue, cls, method_name, scenario_cfg,
-                    context_cfg, aborted):
+                    context_cfg, aborted, output_queue):
 
     sequence = 1
 
 
     sequence = 1
 
@@ -39,7 +44,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
 
     interval = runner_cfg.get("interval", 1)
     duration = runner_cfg.get("duration", 60)
 
     interval = runner_cfg.get("interval", 1)
     duration = runner_cfg.get("duration", 60)
-    LOG.info("worker START, duration %d sec, class %s", duration, cls)
+    LOG.info("Worker START, duration is %ds", duration)
+    LOG.debug("class is %s", cls)
 
     runner_cfg['runner_id'] = os.getpid()
 
 
     runner_cfg['runner_id'] = os.getpid()
 
@@ -51,11 +57,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
     if "sla" in scenario_cfg:
         sla_action = scenario_cfg["sla"].get("action", "assert")
 
     if "sla" in scenario_cfg:
         sla_action = scenario_cfg["sla"].get("action", "assert")
 
-    queue.put({'runner_id': runner_cfg['runner_id'],
-               'scenario_cfg': scenario_cfg,
-               'context_cfg': context_cfg})
-
     start = time.time()
     start = time.time()
+    timeout = start + duration
     while True:
 
         LOG.debug("runner=%(runner)s seq=%(sequence)s START",
     while True:
 
         LOG.debug("runner=%(runner)s seq=%(sequence)s START",
@@ -64,20 +67,29 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
         data = {}
         errors = ""
 
         data = {}
         errors = ""
 
+        benchmark.pre_run_wait_time(interval)
+
         try:
         try:
-            method(data)
-        except AssertionError as assertion:
+            result = method(data)
+        except y_exc.SLAValidationError as error:
             # SLA validation failed in scenario, determine what to do now
             if sla_action == "assert":
                 raise
             elif sla_action == "monitor":
             # SLA validation failed in scenario, determine what to do now
             if sla_action == "assert":
                 raise
             elif sla_action == "monitor":
-                LOG.warning("SLA validation failed: %s", assertion.args)
-                errors = assertion.args
-        except Exception as e:
+                LOG.warning("SLA validation failed: %s", error.args)
+                errors = error.args
+        # catch all exceptions because with multiprocessing we can have un-picklable exception
+        # problems  https://bugs.python.org/issue9400
+        except Exception:  # pylint: disable=broad-except
             errors = traceback.format_exc()
             errors = traceback.format_exc()
-            LOG.exception(e)
+            LOG.exception("")
+        else:
+            if result:
+                # add timeout for put so we don't block test
+                # if we do timeout we don't care about dropping individual KPIs
+                output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
 
 
-        time.sleep(interval)
+        benchmark.post_run_wait_time(interval)
 
         benchmark_output = {
             'timestamp': time.time(),
 
         benchmark_output = {
             'timestamp': time.time(),
@@ -86,26 +98,33 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             'errors': errors
         }
 
             'errors': errors
         }
 
-        record = {'runner_id': runner_cfg['runner_id'],
-                  'benchmark': benchmark_output}
-
-        queue.put(record)
+        queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
 
         LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
 
         sequence += 1
 
 
         LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
 
         sequence += 1
 
-        if (errors and sla_action is None) or \
-                (time.time() - start > duration or aborted.is_set()):
-            LOG.info("worker END")
+        if (errors and sla_action is None) or time.time() > timeout or aborted.is_set():
+            LOG.info("Worker END")
             break
 
             break
 
-    benchmark.teardown()
+    try:
+        benchmark.teardown()
+    except Exception:
+        # catch any exception in teardown and convert to simple exception
+        # never pass exceptions back to multiprocessing, because some exceptions can
+        # be unpicklable
+        # https://bugs.python.org/issue9400
+        LOG.exception("")
+        raise SystemExit(1)
+
+    LOG.debug("queue.qsize() = %s", queue.qsize())
+    LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
 
 
 class DurationRunner(base.Runner):
 
 
 class DurationRunner(base.Runner):
-    '''Run a scenario for a certain amount of time
+    """Run a scenario for a certain amount of time
 
 If the scenario ends before the time has elapsed, it will be started again.
 
 
 If the scenario ends before the time has elapsed, it will be started again.
 
@@ -118,12 +137,14 @@ If the scenario ends before the time has elapsed, it will be started again.
         type:    int
         unit:    seconds
         default: 1 sec
         type:    int
         unit:    seconds
         default: 1 sec
-    '''
+    """
     __execution_type__ = 'Duration'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
     __execution_type__ = 'Duration'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+        name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
         self.process = multiprocessing.Process(
         self.process = multiprocessing.Process(
+            name=name,
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
-                  context_cfg, self.aborted))
+                  context_cfg, self.aborted, self.output_queue))
         self.process.start()
         self.process.start()