from mock import mock
 
-from yardstick.benchmark.runners.base import Runner
+from yardstick.benchmark.runners import base
 from yardstick.benchmark.runners.iteration import IterationRunner
 
 
+class ActionTestCase(unittest.TestCase):
+
+    @mock.patch("yardstick.benchmark.runners.base.subprocess")
+    def test__execute_shell_command(self, mock_subprocess):
+        mock_subprocess.check_output.side_effect = Exception()
+
+        self.assertEqual(base._execute_shell_command("")[0], -1)
+
+    @mock.patch("yardstick.benchmark.runners.base.subprocess")
+    def test__single_action(self, mock_subprocess):
+        mock_subprocess.check_output.side_effect = Exception()
+
+        base._single_action(0, "echo", mock.MagicMock())
+
+    @mock.patch("yardstick.benchmark.runners.base.subprocess")
+    def test__periodic_action(self, mock_subprocess):
+        mock_subprocess.check_output.side_effect = Exception()
+
+        base._periodic_action(0, "echo", mock.MagicMock())
+
+
 class RunnerTestCase(unittest.TestCase):
 
     @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
         actual_result = runner.get_output()
         self.assertEqual(idle_result, actual_result)
 
+    @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
+    def test_get_result(self, mock_process):
+        runner = IterationRunner({})
+        runner.result_queue.put({'case': 'opnfv_yardstick_tc002'})
+        runner.result_queue.put({'criteria': 'PASS'})
+
+        idle_result = [
+            {'case': 'opnfv_yardstick_tc002'},
+            {'criteria': 'PASS'}
+        ]
+
+        for retries in range(1000):
+            time.sleep(0.01)
+            if not runner.result_queue.empty():
+                break
+        actual_result = runner.get_result()
+        self.assertEqual(idle_result, actual_result)
+
     def test__run_benchmark(self):
-        runner = Runner(mock.Mock())
+        runner = base.Runner(mock.Mock())
 
         with self.assertRaises(NotImplementedError):
             runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())
 
         }
 
         profile = ProxProfile(tp_config)
-        profile.init(234)
-        self.assertEqual(profile.queue, 234)
+        queue = mock.Mock()
+        profile.init(queue)
+        self.assertIs(profile.queue, queue)
 
     def test_execute_traffic(self):
         packet_sizes = [
 
 
             # Wait for runners to finish
             for runner in runners:
-                status = runner_join(runner)
+                status = runner_join(runner, self.outputs, result)
                 if status != 0:
-                    raise RuntimeError
-                self.outputs.update(runner.get_output())
-                result.extend(runner.get_result())
+                    raise RuntimeError(
+                        "{0} runner status {1}".format(runner.__execution_type__, status))
                 LOG.info("Runner ended, output in %s", output_file)
         else:
             # run serially
             for scenario in scenarios:
                 if not _is_background_scenario(scenario):
                     runner = self.run_one_scenario(scenario, output_file)
-                    status = runner_join(runner)
+                    status = runner_join(runner, self.outputs, result)
                     if status != 0:
                         LOG.error('Scenario NO.%s: "%s" ERROR!',
                                   scenarios.index(scenario) + 1,
                                   scenario.get('type'))
-                        raise RuntimeError
-                    self.outputs.update(runner.get_output())
-                    result.extend(runner.get_result())
+                        raise RuntimeError(
+                            "{0} runner status {1}".format(runner.__execution_type__, status))
                     LOG.info("Runner ended, output in %s", output_file)
 
         # Abort background runners
 
         # Wait for background runners to finish
         for runner in background_runners:
-            status = runner.join(JOIN_TIMEOUT)
+            status = runner.join(self.outputs, result, JOIN_TIMEOUT)
             if status is None:
                 # Nuke if it did not stop nicely
                 base_runner.Runner.terminate(runner)
-                runner.join(JOIN_TIMEOUT)
+                runner.join(self.outputs, result, JOIN_TIMEOUT)
             base_runner.Runner.release(runner)
 
-            self.outputs.update(runner.get_output())
-            result.extend(runner.get_result())
             print("Background task ended")
         return result
 
     return networks
 
 
-def runner_join(runner):
-    """join (wait for) a runner, exit process at runner failure"""
-    status = runner.join()
+def runner_join(runner, outputs, result):
+    """join (wait for) a runner, exit process at runner failure
+    :param outputs:
+    :type outputs: dict
+    :param result:
+    :type result: list
+    """
+    status = runner.join(outputs, result)
     base_runner.Runner.release(runner)
     return status
 
 
         if errors and sla_action is None:
             break
 
-    benchmark.teardown()
+    try:
+        benchmark.teardown()
+    except Exception:
+        # catch any exception in teardown and convert to simple exception
+        # never pass exceptions back to multiprocessing, because some exceptions can
+        # be unpicklable
+        # https://bugs.python.org/issue9400
+        LOG.exception("")
+        raise SystemExit(1)
     LOG.info("worker END")
+    LOG.debug("queue.qsize() = %s", queue.qsize())
+    LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
 
 
 class ArithmeticRunner(base.Runner):
 
 import time
 import traceback
 
+
+from six.moves.queue import Empty
+
 import yardstick.common.utils as utils
 from yardstick.benchmark.scenarios import base as base_scenario
 
     @staticmethod
     def terminate_all():
         """Terminate all runners (subprocesses)"""
-        log.debug("Terminating all runners")
+        log.debug("Terminating all runners", exc_info=True)
 
         # release dumper process as some errors before any runner is created
         if not Runner.runners:
         """Abort the execution of a scenario"""
         self.aborted.set()
 
-    def join(self, timeout=None):
-        self.process.join(timeout)
+    QUEUE_JOIN_INTERVAL = 5
+
+    def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
+        while self.process.exitcode is None:
+            # drain the queue while we are running otherwise we won't terminate
+            outputs.update(self.get_output())
+            result.extend(self.get_result())
+            self.process.join(interval)
+        # drain after the process has exited
+        outputs.update(self.get_output())
+        result.extend(self.get_result())
+
+        self.process.terminate()
         if self.periodic_action_process:
+            self.periodic_action_process.join(1)
             self.periodic_action_process.terminate()
             self.periodic_action_process = None
 
     def get_output(self):
         result = {}
         while not self.output_queue.empty():
-            result.update(self.output_queue.get())
+            log.debug("output_queue size %s", self.output_queue.qsize())
+            try:
+                result.update(self.output_queue.get(True, 1))
+            except Empty:
+                pass
         return result
 
     def get_result(self):
         result = []
         while not self.result_queue.empty():
-            result.append(self.result_queue.get())
+            log.debug("result_queue size %s", self.result_queue.qsize())
+            try:
+                result.append(self.result_queue.get(True, 1))
+            except Empty:
+                pass
         return result
 
         sla_action = scenario_cfg["sla"].get("action", "assert")
 
     start = time.time()
+    timeout = start + duration
     while True:
 
         LOG.debug("runner=%(runner)s seq=%(sequence)s START",
             elif sla_action == "monitor":
                 LOG.warning("SLA validation failed: %s", assertion.args)
                 errors = assertion.args
-        except Exception as e:
+        # catch all exceptions because with multiprocessing we can have un-picklable exception
+        # problems  https://bugs.python.org/issue9400
+        except Exception:
             errors = traceback.format_exc()
-            LOG.exception(e)
+            LOG.exception("")
         else:
             if result:
                 output_queue.put(result)
 
         sequence += 1
 
-        if (errors and sla_action is None) or \
-                (time.time() - start > duration or aborted.is_set()):
+        if (errors and sla_action is None) or time.time() > timeout or aborted.is_set():
             LOG.info("Worker END")
             break
 
-    benchmark.teardown()
+    try:
+        benchmark.teardown()
+    except Exception:
+        # catch any exception in teardown and convert to simple exception
+        # never pass exceptions back to multiprocessing, because some exceptions can
+        # be unpicklable
+        # https://bugs.python.org/issue9400
+        LOG.exception("")
+        raise SystemExit(1)
+
+    LOG.debug("queue.qsize() = %s", queue.qsize())
+    LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
 
 
 class DurationRunner(base.Runner):
 
             LOG.debug("iterator: %s iterations: %s", iterator, iterations)
 
     if "teardown" in run_step:
-        benchmark.teardown()
+        try:
+            benchmark.teardown()
+        except Exception:
+            # catch any exception in teardown and convert to simple exception
+            # never pass exceptions back to multiprocessing, because some exceptions can
+            # be unpicklable
+            # https://bugs.python.org/issue9400
+            LOG.exception("")
+            raise SystemExit(1)
+
+    LOG.debug("queue.qsize() = %s", queue.qsize())
 
 
 class IterationRunner(base.Runner):
 
                 LOG.exception(e)
             else:
                 if result:
-                    output_queue.put(result)
+                    LOG.debug("output_queue.put %s", result)
+                    output_queue.put(result, True, 1)
 
             time.sleep(interval)
 
                 'errors': errors
             }
 
-            queue.put(benchmark_output)
+            LOG.debug("queue.put, %s", benchmark_output)
+            queue.put(benchmark_output, True, 1)
 
             LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                       {"runner": runner_cfg["runner_id"],
                 LOG.info("worker END")
                 break
     if "teardown" in run_step:
-        benchmark.teardown()
+        try:
+            benchmark.teardown()
+        except Exception:
+            # catch any exception in teardown and convert to simple exception
+            # never pass exceptions back to multiprocessing, because some exceptions can
+            # be unpicklable
+            # https://bugs.python.org/issue9400
+            LOG.exception("")
+            raise SystemExit(1)
+
+    LOG.debug("queue.qsize() = %s", queue.qsize())
+    LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
 
 
 class IterationRunner(base.Runner):
 
         if (errors and sla_action is None) or aborted.is_set():
             break
 
-    benchmark.teardown()
+    try:
+        benchmark.teardown()
+    except Exception:
+        # catch any exception in teardown and convert to simple exception
+        # never pass exceptions back to multiprocessing, because some exceptions can
+        # be unpicklable
+        # https://bugs.python.org/issue9400
+        LOG.exception("")
+        raise SystemExit(1)
     LOG.info("worker END")
+    LOG.debug("queue.qsize() = %s", queue.qsize())
+    LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
 
 
 class SequenceRunner(base.Runner):
 
         self._consumer_tag = None
         self._url = amqp_url
         self._queue = queue
+        self._queue.cancel_join_thread()
 
     def connect(self):
         """ connect to amqp url """
 
     def init(self, queue):
         self.pkt_size_iterator = iter(self.pkt_sizes)
         self.queue = queue
+        self.queue.cancel_join_thread()
 
     def bounds_iterator(self, logger=None):
         if logger:
 
         return self._test_type
 
     def run_traffic(self, traffic_profile):
+        self._queue.cancel_join_thread()
         self.lower = 0.0
         self.upper = 100.0
 
 
         self._queue.put(samples)
 
     def run_traffic(self, traffic_profile):
+        # if we don't do this we can hang waiting for the queue to drain
+        # have to do this in the subprocess
+        self._queue.cancel_join_thread()
         # fixme: fix passing correct trex config file,
         # instead of searching the default path
         try: