from mock import mock
-from yardstick.benchmark.runners.base import Runner
+from yardstick.benchmark.runners import base
from yardstick.benchmark.runners.iteration import IterationRunner
+class ActionTestCase(unittest.TestCase):
+
+ @mock.patch("yardstick.benchmark.runners.base.subprocess")
+ def test__execute_shell_command(self, mock_subprocess):
+ mock_subprocess.check_output.side_effect = Exception()
+
+ self.assertEqual(base._execute_shell_command("")[0], -1)
+
+ @mock.patch("yardstick.benchmark.runners.base.subprocess")
+ def test__single_action(self, mock_subprocess):
+ mock_subprocess.check_output.side_effect = Exception()
+
+ base._single_action(0, "echo", mock.MagicMock())
+
+ @mock.patch("yardstick.benchmark.runners.base.subprocess")
+ def test__periodic_action(self, mock_subprocess):
+ mock_subprocess.check_output.side_effect = Exception()
+
+ base._periodic_action(0, "echo", mock.MagicMock())
+
+
class RunnerTestCase(unittest.TestCase):
@mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
actual_result = runner.get_output()
self.assertEqual(idle_result, actual_result)
+ @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
+ def test_get_result(self, mock_process):
+ runner = IterationRunner({})
+ runner.result_queue.put({'case': 'opnfv_yardstick_tc002'})
+ runner.result_queue.put({'criteria': 'PASS'})
+
+ idle_result = [
+ {'case': 'opnfv_yardstick_tc002'},
+ {'criteria': 'PASS'}
+ ]
+
+ for retries in range(1000):
+ time.sleep(0.01)
+ if not runner.result_queue.empty():
+ break
+ actual_result = runner.get_result()
+ self.assertEqual(idle_result, actual_result)
+
def test__run_benchmark(self):
- runner = Runner(mock.Mock())
+ runner = base.Runner(mock.Mock())
with self.assertRaises(NotImplementedError):
runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())
# Wait for runners to finish
for runner in runners:
- status = runner_join(runner)
+ status = runner_join(runner, self.outputs, result)
if status != 0:
- raise RuntimeError
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
+ raise RuntimeError(
+ "{0} runner status {1}".format(runner.__execution_type__, status))
LOG.info("Runner ended, output in %s", output_file)
else:
# run serially
for scenario in scenarios:
if not _is_background_scenario(scenario):
runner = self.run_one_scenario(scenario, output_file)
- status = runner_join(runner)
+ status = runner_join(runner, self.outputs, result)
if status != 0:
LOG.error('Scenario NO.%s: "%s" ERROR!',
scenarios.index(scenario) + 1,
scenario.get('type'))
- raise RuntimeError
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
+ raise RuntimeError(
+ "{0} runner status {1}".format(runner.__execution_type__, status))
LOG.info("Runner ended, output in %s", output_file)
# Abort background runners
# Wait for background runners to finish
for runner in background_runners:
- status = runner.join(JOIN_TIMEOUT)
+ status = runner.join(self.outputs, result, JOIN_TIMEOUT)
if status is None:
# Nuke if it did not stop nicely
base_runner.Runner.terminate(runner)
- runner.join(JOIN_TIMEOUT)
+ runner.join(self.outputs, result, JOIN_TIMEOUT)
base_runner.Runner.release(runner)
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
print("Background task ended")
return result
return networks
-def runner_join(runner):
- """join (wait for) a runner, exit process at runner failure"""
- status = runner.join()
+def runner_join(runner, outputs, result):
+ """join (wait for) a runner, exit process at runner failure
+ :param outputs:
+ :type outputs: dict
+ :param result:
+ :type result: list
+ """
+ status = runner.join(outputs, result)
base_runner.Runner.release(runner)
return status
sequence = 1
- # if we don't do this we can hang waiting for the queue to drain
- # have to do this in the subprocess
- queue.cancel_join_thread()
- output_queue.cancel_join_thread()
-
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
if errors and sla_action is None:
break
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
LOG.info("worker END")
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class ArithmeticRunner(base.Runner):
import time
import traceback
+
+from six.moves.queue import Empty
+
import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
def _single_action(seconds, command, queue):
"""entrypoint for the single action process"""
- queue.cancel_join_thread()
log.debug("single action, fires after %d seconds (from now)", seconds)
time.sleep(seconds)
log.debug("single action: executing command: '%s'", command)
def _periodic_action(interval, command, queue):
"""entrypoint for the periodic action process"""
- queue.cancel_join_thread()
log.debug("periodic action, fires every: %d seconds", interval)
time_spent = 0
while True:
@staticmethod
def terminate_all():
"""Terminate all runners (subprocesses)"""
- log.debug("Terminating all runners")
+ log.debug("Terminating all runners", exc_info=True)
# release dumper process as some errors before any runner is created
if not Runner.runners:
self.config = config
self.periodic_action_process = None
self.output_queue = multiprocessing.Queue()
- self.output_queue.cancel_join_thread()
self.result_queue = multiprocessing.Queue()
- self.result_queue.cancel_join_thread()
self.process = None
self.aborted = multiprocessing.Event()
Runner.runners.append(self)
"""Abort the execution of a scenario"""
self.aborted.set()
- def join(self, timeout=None):
- self.process.join(timeout)
+ QUEUE_JOIN_INTERVAL = 5
+
+ def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
+ while self.process.exitcode is None:
+ # drain the queue while we are running otherwise we won't terminate
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+ self.process.join(interval)
+ # drain after the process has exited
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+
+ self.process.terminate()
if self.periodic_action_process:
+ self.periodic_action_process.join(1)
self.periodic_action_process.terminate()
self.periodic_action_process = None
def get_output(self):
result = {}
while not self.output_queue.empty():
- result.update(self.output_queue.get())
+ log.debug("output_queue size %s", self.output_queue.qsize())
+ try:
+ result.update(self.output_queue.get(True, 1))
+ except Empty:
+ pass
return result
def get_result(self):
result = []
while not self.result_queue.empty():
- result.append(self.result_queue.get())
+ log.debug("result_queue size %s", self.result_queue.qsize())
+ try:
+ result.append(self.result_queue.get(True, 1))
+ except Empty:
+ pass
return result
sequence = 1
- # if we don't do this we can hang waiting for the queue to drain
- # have to do this in the subprocess
- queue.cancel_join_thread()
- output_queue.cancel_join_thread()
-
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
LOG.exception("")
raise SystemExit(1)
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
+
class DurationRunner(base.Runner):
"""Run a scenario for a certain amount of time
def _worker_process(queue, cls, method_name, scenario_cfg,
context_cfg, aborted): # pragma: no cover
- queue.cancel_join_thread()
runner_cfg = scenario_cfg['runner']
iterations = runner_cfg.get("iterations", 1)
interval = runner_cfg.get("interval", 1)
LOG.debug("iterator: %s iterations: %s", iterator, iterations)
if "teardown" in run_step:
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
class IterationRunner(base.Runner):
sequence = 1
- # if we don't do this we can hang waiting for the queue to drain
- # have to do this in the subprocess
- queue.cancel_join_thread()
- output_queue.cancel_join_thread()
-
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
LOG.exception(e)
else:
if result:
- output_queue.put(result)
+ LOG.debug("output_queue.put %s", result)
+ output_queue.put(result, True, 1)
time.sleep(interval)
'errors': errors
}
- queue.put(benchmark_output)
+ LOG.debug("queue.put, %s", benchmark_output)
+ queue.put(benchmark_output, True, 1)
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"],
LOG.info("worker END")
break
if "teardown" in run_step:
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class IterationRunner(base.Runner):
if (errors and sla_action is None) or aborted.is_set():
break
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
LOG.info("worker END")
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class SequenceRunner(base.Runner):
self._config = config
self._context = context
self._queue = multiprocessing.Queue()
- self._queue.cancel_join_thread()
self._event = multiprocessing.Event()
self.monitor_data = data
self.setup_done = False