Sometimes the runners can hang. Initially
debugging lead to the queue join thread, so I thought
we could cancel all the join threads and everything would be okay.
But it turns out canceling the queue join threads can lead
to corruption of the queues, so when we go to drain the queues
the task hangs.
But it also turns out that we were not properly draining
the queues in the task process. We were waiting for all
the runners to exit, then draining the queues.
This is bad and will cause the queues to fill up and hang
and/or drop data or corrupt the queues.
The proper fix seems to be to draining the queues in a
loop before calling join with a timeout.
Also modified the queue drain loops to no block on queue.get()
Revert "cancel all queue join threads"
This reverts commit
75c0e3a54b8f6e8fd77c7d9d95decab830159929.
Revert "duration runner: add teardown and cancel all queue join threads"
This reverts commit
7eb6abb6931b24e085b139cc3500f4497cdde57d.
Change-Id: Ic4f8e814cf23615621c1250535967716b425ac18
Signed-off-by: Ross Brattain <ross.b.brattain@intel.com>
-from yardstick.benchmark.runners.base import Runner
+from yardstick.benchmark.runners import base
from yardstick.benchmark.runners.iteration import IterationRunner
from yardstick.benchmark.runners.iteration import IterationRunner
+class ActionTestCase(unittest.TestCase):
+
+ @mock.patch("yardstick.benchmark.runners.base.subprocess")
+ def test__execute_shell_command(self, mock_subprocess):
+ mock_subprocess.check_output.side_effect = Exception()
+
+ self.assertEqual(base._execute_shell_command("")[0], -1)
+
+ @mock.patch("yardstick.benchmark.runners.base.subprocess")
+ def test__single_action(self, mock_subprocess):
+ mock_subprocess.check_output.side_effect = Exception()
+
+ base._single_action(0, "echo", mock.MagicMock())
+
+ @mock.patch("yardstick.benchmark.runners.base.subprocess")
+ def test__periodic_action(self, mock_subprocess):
+ mock_subprocess.check_output.side_effect = Exception()
+
+ base._periodic_action(0, "echo", mock.MagicMock())
+
+
class RunnerTestCase(unittest.TestCase):
@mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
class RunnerTestCase(unittest.TestCase):
@mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
actual_result = runner.get_output()
self.assertEqual(idle_result, actual_result)
actual_result = runner.get_output()
self.assertEqual(idle_result, actual_result)
+ @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
+ def test_get_result(self, mock_process):
+ runner = IterationRunner({})
+ runner.result_queue.put({'case': 'opnfv_yardstick_tc002'})
+ runner.result_queue.put({'criteria': 'PASS'})
+
+ idle_result = [
+ {'case': 'opnfv_yardstick_tc002'},
+ {'criteria': 'PASS'}
+ ]
+
+ for retries in range(1000):
+ time.sleep(0.01)
+ if not runner.result_queue.empty():
+ break
+ actual_result = runner.get_result()
+ self.assertEqual(idle_result, actual_result)
+
def test__run_benchmark(self):
def test__run_benchmark(self):
- runner = Runner(mock.Mock())
+ runner = base.Runner(mock.Mock())
with self.assertRaises(NotImplementedError):
runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())
with self.assertRaises(NotImplementedError):
runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())
# Wait for runners to finish
for runner in runners:
# Wait for runners to finish
for runner in runners:
- status = runner_join(runner)
+ status = runner_join(runner, self.outputs, result)
- raise RuntimeError
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
+ raise RuntimeError(
+ "{0} runner status {1}".format(runner.__execution_type__, status))
LOG.info("Runner ended, output in %s", output_file)
else:
# run serially
for scenario in scenarios:
if not _is_background_scenario(scenario):
runner = self.run_one_scenario(scenario, output_file)
LOG.info("Runner ended, output in %s", output_file)
else:
# run serially
for scenario in scenarios:
if not _is_background_scenario(scenario):
runner = self.run_one_scenario(scenario, output_file)
- status = runner_join(runner)
+ status = runner_join(runner, self.outputs, result)
if status != 0:
LOG.error('Scenario NO.%s: "%s" ERROR!',
scenarios.index(scenario) + 1,
scenario.get('type'))
if status != 0:
LOG.error('Scenario NO.%s: "%s" ERROR!',
scenarios.index(scenario) + 1,
scenario.get('type'))
- raise RuntimeError
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
+ raise RuntimeError(
+ "{0} runner status {1}".format(runner.__execution_type__, status))
LOG.info("Runner ended, output in %s", output_file)
# Abort background runners
LOG.info("Runner ended, output in %s", output_file)
# Abort background runners
# Wait for background runners to finish
for runner in background_runners:
# Wait for background runners to finish
for runner in background_runners:
- status = runner.join(JOIN_TIMEOUT)
+ status = runner.join(self.outputs, result, JOIN_TIMEOUT)
if status is None:
# Nuke if it did not stop nicely
base_runner.Runner.terminate(runner)
if status is None:
# Nuke if it did not stop nicely
base_runner.Runner.terminate(runner)
- runner.join(JOIN_TIMEOUT)
+ runner.join(self.outputs, result, JOIN_TIMEOUT)
base_runner.Runner.release(runner)
base_runner.Runner.release(runner)
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
print("Background task ended")
return result
print("Background task ended")
return result
-def runner_join(runner):
- """join (wait for) a runner, exit process at runner failure"""
- status = runner.join()
+def runner_join(runner, outputs, result):
+ """join (wait for) a runner, exit process at runner failure
+ :param outputs:
+ :type outputs: dict
+ :param result:
+ :type result: list
+ """
+ status = runner.join(outputs, result)
base_runner.Runner.release(runner)
return status
base_runner.Runner.release(runner)
return status
- # if we don't do this we can hang waiting for the queue to drain
- # have to do this in the subprocess
- queue.cancel_join_thread()
- output_queue.cancel_join_thread()
-
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
if errors and sla_action is None:
break
if errors and sla_action is None:
break
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class ArithmeticRunner(base.Runner):
class ArithmeticRunner(base.Runner):
import time
import traceback
import time
import traceback
+
+from six.moves.queue import Empty
+
import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
def _single_action(seconds, command, queue):
"""entrypoint for the single action process"""
def _single_action(seconds, command, queue):
"""entrypoint for the single action process"""
- queue.cancel_join_thread()
log.debug("single action, fires after %d seconds (from now)", seconds)
time.sleep(seconds)
log.debug("single action: executing command: '%s'", command)
log.debug("single action, fires after %d seconds (from now)", seconds)
time.sleep(seconds)
log.debug("single action: executing command: '%s'", command)
def _periodic_action(interval, command, queue):
"""entrypoint for the periodic action process"""
def _periodic_action(interval, command, queue):
"""entrypoint for the periodic action process"""
- queue.cancel_join_thread()
log.debug("periodic action, fires every: %d seconds", interval)
time_spent = 0
while True:
log.debug("periodic action, fires every: %d seconds", interval)
time_spent = 0
while True:
@staticmethod
def terminate_all():
"""Terminate all runners (subprocesses)"""
@staticmethod
def terminate_all():
"""Terminate all runners (subprocesses)"""
- log.debug("Terminating all runners")
+ log.debug("Terminating all runners", exc_info=True)
# release dumper process as some errors before any runner is created
if not Runner.runners:
# release dumper process as some errors before any runner is created
if not Runner.runners:
self.config = config
self.periodic_action_process = None
self.output_queue = multiprocessing.Queue()
self.config = config
self.periodic_action_process = None
self.output_queue = multiprocessing.Queue()
- self.output_queue.cancel_join_thread()
self.result_queue = multiprocessing.Queue()
self.result_queue = multiprocessing.Queue()
- self.result_queue.cancel_join_thread()
self.process = None
self.aborted = multiprocessing.Event()
Runner.runners.append(self)
self.process = None
self.aborted = multiprocessing.Event()
Runner.runners.append(self)
"""Abort the execution of a scenario"""
self.aborted.set()
"""Abort the execution of a scenario"""
self.aborted.set()
- def join(self, timeout=None):
- self.process.join(timeout)
+ QUEUE_JOIN_INTERVAL = 5
+
+ def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
+ while self.process.exitcode is None:
+ # drain the queue while we are running otherwise we won't terminate
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+ self.process.join(interval)
+ # drain after the process has exited
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+
+ self.process.terminate()
if self.periodic_action_process:
if self.periodic_action_process:
+ self.periodic_action_process.join(1)
self.periodic_action_process.terminate()
self.periodic_action_process = None
self.periodic_action_process.terminate()
self.periodic_action_process = None
def get_output(self):
result = {}
while not self.output_queue.empty():
def get_output(self):
result = {}
while not self.output_queue.empty():
- result.update(self.output_queue.get())
+ log.debug("output_queue size %s", self.output_queue.qsize())
+ try:
+ result.update(self.output_queue.get(True, 1))
+ except Empty:
+ pass
return result
def get_result(self):
result = []
while not self.result_queue.empty():
return result
def get_result(self):
result = []
while not self.result_queue.empty():
- result.append(self.result_queue.get())
+ log.debug("result_queue size %s", self.result_queue.qsize())
+ try:
+ result.append(self.result_queue.get(True, 1))
+ except Empty:
+ pass
- # if we don't do this we can hang waiting for the queue to drain
- # have to do this in the subprocess
- queue.cancel_join_thread()
- output_queue.cancel_join_thread()
-
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
LOG.exception("")
raise SystemExit(1)
LOG.exception("")
raise SystemExit(1)
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
+
class DurationRunner(base.Runner):
"""Run a scenario for a certain amount of time
class DurationRunner(base.Runner):
"""Run a scenario for a certain amount of time
def _worker_process(queue, cls, method_name, scenario_cfg,
context_cfg, aborted): # pragma: no cover
def _worker_process(queue, cls, method_name, scenario_cfg,
context_cfg, aborted): # pragma: no cover
- queue.cancel_join_thread()
runner_cfg = scenario_cfg['runner']
iterations = runner_cfg.get("iterations", 1)
interval = runner_cfg.get("interval", 1)
runner_cfg = scenario_cfg['runner']
iterations = runner_cfg.get("iterations", 1)
interval = runner_cfg.get("interval", 1)
LOG.debug("iterator: %s iterations: %s", iterator, iterations)
if "teardown" in run_step:
LOG.debug("iterator: %s iterations: %s", iterator, iterations)
if "teardown" in run_step:
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
class IterationRunner(base.Runner):
class IterationRunner(base.Runner):
- # if we don't do this we can hang waiting for the queue to drain
- # have to do this in the subprocess
- queue.cancel_join_thread()
- output_queue.cancel_join_thread()
-
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
LOG.exception(e)
else:
if result:
LOG.exception(e)
else:
if result:
- output_queue.put(result)
+ LOG.debug("output_queue.put %s", result)
+ output_queue.put(result, True, 1)
- queue.put(benchmark_output)
+ LOG.debug("queue.put, %s", benchmark_output)
+ queue.put(benchmark_output, True, 1)
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"],
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"],
LOG.info("worker END")
break
if "teardown" in run_step:
LOG.info("worker END")
break
if "teardown" in run_step:
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class IterationRunner(base.Runner):
class IterationRunner(base.Runner):
if (errors and sla_action is None) or aborted.is_set():
break
if (errors and sla_action is None) or aborted.is_set():
break
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class SequenceRunner(base.Runner):
class SequenceRunner(base.Runner):
self._config = config
self._context = context
self._queue = multiprocessing.Queue()
self._config = config
self._context = context
self._queue = multiprocessing.Queue()
- self._queue.cancel_join_thread()
self._event = multiprocessing.Event()
self.monitor_data = data
self.setup_done = False
self._event = multiprocessing.Event()
self.monitor_data = data
self.setup_done = False