Code Review
/
yardstick.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Merge "Separate out test_parse_to_value_exception()"
[yardstick.git]
/
yardstick
/
benchmark
/
runners
/
duration.py
diff --git
a/yardstick/benchmark/runners/duration.py
b/yardstick/benchmark/runners/duration.py
index
6a09131
..
14fd8bb
100644
(file)
--- a/
yardstick/benchmark/runners/duration.py
+++ b/
yardstick/benchmark/runners/duration.py
@@
-27,20
+27,19
@@
import traceback
import time
from yardstick.benchmark.runners import base
import time
from yardstick.benchmark.runners import base
+from yardstick.common import exceptions as y_exc
LOG = logging.getLogger(__name__)
LOG = logging.getLogger(__name__)
+QUEUE_PUT_TIMEOUT = 10
+
+
def _worker_process(queue, cls, method_name, scenario_cfg,
context_cfg, aborted, output_queue):
sequence = 1
def _worker_process(queue, cls, method_name, scenario_cfg,
context_cfg, aborted, output_queue):
sequence = 1
- # if we don't do this we can hang waiting for the queue to drain
- # have to do this in the subprocess
- queue.cancel_join_thread()
- output_queue.cancel_join_thread()
-
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
@@
-68,25
+67,30
@@
def _worker_process(queue, cls, method_name, scenario_cfg,
data = {}
errors = ""
data = {}
errors = ""
+ benchmark.pre_run_wait_time(interval)
+
try:
result = method(data)
try:
result = method(data)
- except
AssertionError as assertion
:
+ except
y_exc.SLAValidationError as error
:
# SLA validation failed in scenario, determine what to do now
if sla_action == "assert":
# SLA validation failed in scenario, determine what to do now
if sla_action == "assert":
+ benchmark.teardown()
raise
elif sla_action == "monitor":
raise
elif sla_action == "monitor":
- LOG.warning("SLA validation failed: %s",
assertion
.args)
- errors =
assertion
.args
+ LOG.warning("SLA validation failed: %s",
error
.args)
+ errors =
error
.args
# catch all exceptions because with multiprocessing we can have un-picklable exception
# problems https://bugs.python.org/issue9400
# catch all exceptions because with multiprocessing we can have un-picklable exception
# problems https://bugs.python.org/issue9400
- except Exception:
+ except Exception:
# pylint: disable=broad-except
errors = traceback.format_exc()
LOG.exception("")
else:
if result:
errors = traceback.format_exc()
LOG.exception("")
else:
if result:
- output_queue.put(result)
+ # add timeout for put so we don't block test
+ # if we do timeout we don't care about dropping individual KPIs
+ output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
-
time.sleep
(interval)
+
benchmark.post_run_wait_time
(interval)
benchmark_output = {
'timestamp': time.time(),
benchmark_output = {
'timestamp': time.time(),
@@
-95,7
+99,7
@@
def _worker_process(queue, cls, method_name, scenario_cfg,
'errors': errors
}
'errors': errors
}
- queue.put(benchmark_output)
+ queue.put(benchmark_output
, True, QUEUE_PUT_TIMEOUT
)
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"], "sequence": sequence})
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"], "sequence": sequence})
@@
-116,6
+120,9
@@
def _worker_process(queue, cls, method_name, scenario_cfg,
LOG.exception("")
raise SystemExit(1)
LOG.exception("")
raise SystemExit(1)
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
+
class DurationRunner(base.Runner):
"""Run a scenario for a certain amount of time
class DurationRunner(base.Runner):
"""Run a scenario for a certain amount of time
@@
-135,7
+142,9
@@
If the scenario ends before the time has elapsed, it will be started again.
__execution_type__ = 'Duration'
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
__execution_type__ = 'Duration'
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+ name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
self.process = multiprocessing.Process(
self.process = multiprocessing.Process(
+ name=name,
target=_worker_process,
args=(self.result_queue, cls, method, scenario_cfg,
context_cfg, self.aborted, self.output_queue))
target=_worker_process,
args=(self.result_queue, cls, method, scenario_cfg,
context_cfg, self.aborted, self.output_queue))