cancel all queue join threads 63/43663/9
authorRoss Brattain <ross.b.brattain@intel.com>
Fri, 29 Sep 2017 22:39:04 +0000 (15:39 -0700)
committerRoss Brattain <ross.b.brattain@intel.com>
Sun, 1 Oct 2017 20:18:08 +0000 (13:18 -0700)
In some cases we are blocking in base.Runner join() because the
queues are not empty

call cancel_join_thread to prevent the Queue from blocking the
Process exit

https://docs.python.org/3.3/library/multiprocessing.html#all-platforms

Joining processes that use queues

  Bear in mind that a process that has put items in a queue will wait
  before terminating until all the buffered items are fed by the
  "feeder" thread to the underlying pipe. (The child process can call
  the cancel_join_thread() method of the queue to avoid this behaviour.)

  This means that whenever you use a queue you need to make sure that
  all items which have been put on the queue will eventually be removed
  before the process is joined. Otherwise you cannot be sure that
  processes which have put items on the queue will terminate. Remember
  also that non-daemonic processes will be joined automatically.

Warning

  As mentioned above, if a child process has put items on a queue (and
  it has not used JoinableQueue.cancel_join_thread), then that process
  will not terminate until all buffered items have been flushed to the
  pipe.

  This means that if you try joining that process you may get a deadlock
  unless you are sure that all items which have been put on the queue
  have been consumed. Similarly, if the child process is non-daemonic
  then the parent process may hang on exit when it tries to join all its
  non-daemonic children.

cancel_join_thread()

  Prevent join_thread() from blocking. In particular, this prevents the
  background thread from being joined automatically when the process
  exits – see join_thread().

  A better name for this method might be allow_exit_without_flush(). It
  is likely to cause enqueued data to lost, and you almost certainly
  will not need to use it. It is really only there if you need the
  current process to exit immediately without waiting to flush enqueued
  data to the underlying pipe, and you don’t care about lost data.

Change-Id: I345c722a752bddf9f0824a11cdf52ae9f04669af
Signed-off-by: Ross Brattain <ross.b.brattain@intel.com>
yardstick/benchmark/runners/arithmetic.py
yardstick/benchmark/runners/base.py
yardstick/benchmark/runners/dynamictp.py
yardstick/benchmark/runners/iteration.py
yardstick/benchmark/scenarios/availability/monitor/basemonitor.py

index 7898ae2..974fb21 100755 (executable)
@@ -46,6 +46,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
 
     sequence = 1
 
+    # if we don't do this we can hang waiting for the queue to drain
+    # have to do this in the subprocess
+    queue.cancel_join_thread()
+    output_queue.cancel_join_thread()
+
     runner_cfg = scenario_cfg['runner']
 
     interval = runner_cfg.get("interval", 1)
index a69811f..57903eb 100755 (executable)
@@ -47,6 +47,7 @@ def _execute_shell_command(command):
 
 def _single_action(seconds, command, queue):
     """entrypoint for the single action process"""
+    queue.cancel_join_thread()
     log.debug("single action, fires after %d seconds (from now)", seconds)
     time.sleep(seconds)
     log.debug("single action: executing command: '%s'", command)
@@ -61,6 +62,7 @@ def _single_action(seconds, command, queue):
 
 def _periodic_action(interval, command, queue):
     """entrypoint for the periodic action process"""
+    queue.cancel_join_thread()
     log.debug("periodic action, fires every: %d seconds", interval)
     time_spent = 0
     while True:
@@ -137,7 +139,9 @@ class Runner(object):
         self.config = config
         self.periodic_action_process = None
         self.output_queue = multiprocessing.Queue()
+        self.output_queue.cancel_join_thread()
         self.result_queue = multiprocessing.Queue()
+        self.result_queue.cancel_join_thread()
         self.process = None
         self.aborted = multiprocessing.Event()
         Runner.runners.append(self)
index afff27d..2f5f7e4 100755 (executable)
@@ -33,6 +33,7 @@ LOG = logging.getLogger(__name__)
 def _worker_process(queue, cls, method_name, scenario_cfg,
                     context_cfg, aborted):  # pragma: no cover
 
+    queue.cancel_join_thread()
     runner_cfg = scenario_cfg['runner']
     iterations = runner_cfg.get("iterations", 1)
     interval = runner_cfg.get("interval", 1)
index 50fe106..822e677 100644 (file)
@@ -36,6 +36,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
 
     sequence = 1
 
+    # if we don't do this we can hang waiting for the queue to drain
+    # have to do this in the subprocess
+    queue.cancel_join_thread()
+    output_queue.cancel_join_thread()
+
     runner_cfg = scenario_cfg['runner']
 
     interval = runner_cfg.get("interval", 1)
index a6c1a28..871f13f 100644 (file)
@@ -90,6 +90,7 @@ class BaseMonitor(multiprocessing.Process):
         self._config = config
         self._context = context
         self._queue = multiprocessing.Queue()
+        self._queue.cancel_join_thread()
         self._event = multiprocessing.Event()
         self.monitor_data = data
         self.setup_done = False