Merge "ping bottlenecks failed when security group rule do not support ipv6 - dovetai...
[yardstick.git] / yardstick / benchmark / runners / sequence.py
index 3b06e2a..0148a45 100644 (file)
 # yardstick comment: this is a modified copy of
 # rally/rally/benchmark/runners/constant.py
 
-'''A runner that every run changes a specified input value to the scenario.
+"""A runner that every run changes a specified input value to the scenario.
 The input value in the sequence is specified in a list in the input file.
-'''
+"""
+
+from __future__ import absolute_import
 
-import os
-import multiprocessing
 import logging
-import traceback
+import multiprocessing
 import time
+import traceback
+
+import os
 
 from yardstick.benchmark.runners import base
+from yardstick.common import exceptions as y_exc
 
 LOG = logging.getLogger(__name__)
 
 
 def _worker_process(queue, cls, method_name, scenario_cfg,
-                    context_cfg, aborted):
+                    context_cfg, aborted, output_queue):
 
     sequence = 1
 
@@ -56,10 +60,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
     benchmark.setup()
     method = getattr(benchmark, method_name)
 
-    queue.put({'runner_id': runner_cfg['runner_id'],
-               'scenario_cfg': scenario_cfg,
-               'context_cfg': context_cfg})
-
     sla_action = None
     if "sla" in scenario_cfg:
         sla_action = scenario_cfg["sla"].get("action", "assert")
@@ -74,17 +74,20 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
         errors = ""
 
         try:
-            method(data)
-        except AssertionError as assertion:
+            result = method(data)
+        except y_exc.SLAValidationError as error:
             # SLA validation failed in scenario, determine what to do now
             if sla_action == "assert":
                 raise
             elif sla_action == "monitor":
-                LOG.warning("SLA validation failed: %s", assertion.args)
-                errors = assertion.args
-        except Exception as e:
+                LOG.warning("SLA validation failed: %s", error.args)
+                errors = error.args
+        except Exception as e:  # pylint: disable=broad-except
             errors = traceback.format_exc()
             LOG.exception(e)
+        else:
+            if result:
+                output_queue.put(result)
 
         time.sleep(interval)
 
@@ -95,10 +98,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             'errors': errors
         }
 
-        record = {'runner_id': runner_cfg['runner_id'],
-                  'benchmark': benchmark_output}
-
-        queue.put(record)
+        queue.put(benchmark_output)
 
         LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
@@ -108,12 +108,22 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
         if (errors and sla_action is None) or aborted.is_set():
             break
 
-    benchmark.teardown()
+    try:
+        benchmark.teardown()
+    except Exception:
+        # catch any exception in teardown and convert to simple exception
+        # never pass exceptions back to multiprocessing, because some exceptions can
+        # be unpicklable
+        # https://bugs.python.org/issue9400
+        LOG.exception("")
+        raise SystemExit(1)
     LOG.info("worker END")
+    LOG.debug("queue.qsize() = %s", queue.qsize())
+    LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
 
 
 class SequenceRunner(base.Runner):
-    '''Run a scenario by changing an input value defined in a list
+    """Run a scenario by changing an input value defined in a list
 
   Parameters
     interval - time to wait between each scenario invocation
@@ -128,13 +138,15 @@ class SequenceRunner(base.Runner):
         type:    [int]
         unit:    na
         default: none
-    '''
+    """
 
     __execution_type__ = 'Sequence'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+        name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
         self.process = multiprocessing.Process(
+            name=name,
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
-                  context_cfg, self.aborted))
+                  context_cfg, self.aborted, self.output_queue))
         self.process.start()