Merge "Fix nsb_setup.sh script"
[yardstick.git] / yardstick / benchmark / runners / duration.py
index 7594276..60b0348 100644 (file)
@@ -31,6 +31,9 @@ from yardstick.benchmark.runners import base
 LOG = logging.getLogger(__name__)
 
 
+QUEUE_PUT_TIMEOUT = 10
+
+
 def _worker_process(queue, cls, method_name, scenario_cfg,
                     context_cfg, aborted, output_queue):
 
@@ -63,6 +66,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
         data = {}
         errors = ""
 
+        benchmark.pre_run_wait_time(interval)
+
         try:
             result = method(data)
         except AssertionError as assertion:
@@ -74,14 +79,16 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
                 errors = assertion.args
         # catch all exceptions because with multiprocessing we can have un-picklable exception
         # problems  https://bugs.python.org/issue9400
-        except Exception:
+        except Exception:  # pylint: disable=broad-except
             errors = traceback.format_exc()
             LOG.exception("")
         else:
             if result:
-                output_queue.put(result)
+                # add timeout for put so we don't block test
+                # if we do timeout we don't care about dropping individual KPIs
+                output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
 
-        time.sleep(interval)
+        benchmark.post_run_wait_time(interval)
 
         benchmark_output = {
             'timestamp': time.time(),
@@ -90,7 +97,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             'errors': errors
         }
 
-        queue.put(benchmark_output)
+        queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
 
         LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
@@ -133,7 +140,9 @@ If the scenario ends before the time has elapsed, it will be started again.
     __execution_type__ = 'Duration'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+        name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
         self.process = multiprocessing.Process(
+            name=name,
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
                   context_cfg, self.aborted, self.output_queue))