Merge "Add send socket commands function"
[yardstick.git] / yardstick / benchmark / runners / dynamictp.py
index 2f5f7e4..88d3c57 100755 (executable)
 """A runner that searches for the max throughput with binary search
 """
 
-import os
-import multiprocessing
 import logging
-import traceback
+import multiprocessing
 import time
+import traceback
+
+import os
 
 from yardstick.benchmark.runners import base
+from yardstick.common import exceptions as y_exc
 
 LOG = logging.getLogger(__name__)
 
@@ -33,7 +35,6 @@ LOG = logging.getLogger(__name__)
 def _worker_process(queue, cls, method_name, scenario_cfg,
                     context_cfg, aborted):  # pragma: no cover
 
-    queue.cancel_join_thread()
     runner_cfg = scenario_cfg['runner']
     iterations = runner_cfg.get("iterations", 1)
     interval = runner_cfg.get("interval", 1)
@@ -66,8 +67,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             max_throuput_found = False
             sequence = 0
 
-            last_min_data = {}
-            last_min_data['packets_per_second'] = 0
+            last_min_data = {'packets_per_second': 0}
 
             while True:
                 sequence += 1
@@ -81,10 +81,10 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
 
                 try:
                     method(data)
-                except AssertionError as assertion:
-                    LOG.warning("SLA validation failed: %s" % assertion.args)
+                except y_exc.SLAValidationError as error:
+                    LOG.warning("SLA validation failed: %s", error.args)
                     too_high = True
-                except Exception as e:
+                except Exception as e:  # pylint: disable=broad-except
                     errors = traceback.format_exc()
                     LOG.exception(e)
 
@@ -126,7 +126,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
                     queue.put(record)
                     max_throuput_found = True
 
-                if (errors) or aborted.is_set() or max_throuput_found:
+                if errors or aborted.is_set() or max_throuput_found:
                     LOG.info("worker END")
                     break
 
@@ -142,11 +142,21 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             LOG.debug("iterator: %s iterations: %s", iterator, iterations)
 
     if "teardown" in run_step:
-        benchmark.teardown()
+        try:
+            benchmark.teardown()
+        except Exception:
+            # catch any exception in teardown and convert to simple exception
+            # never pass exceptions back to multiprocessing, because some exceptions can
+            # be unpicklable
+            # https://bugs.python.org/issue9400
+            LOG.exception("")
+            raise SystemExit(1)
+
+    LOG.debug("queue.qsize() = %s", queue.qsize())
 
 
 class IterationRunner(base.Runner):
-    '''Run a scenario to find the max throughput
+    """Run a scenario to find the max throughput
 
 If the scenario ends before the time has elapsed, it will be started again.
 
@@ -159,11 +169,13 @@ If the scenario ends before the time has elapsed, it will be started again.
         type:   int
         unit:   pps
         default: 1000 pps
-    '''
+    """
     __execution_type__ = 'Dynamictp'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+        name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
         self.process = multiprocessing.Process(
+            name=name,
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
                   context_cfg, self.aborted))