X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fbenchmark%2Frunners%2Fiteration.py;h=822e67723555b9c20da7230a02380b0db2b0383f;hb=c26a006b489fcf61948a97bb18d39b7275c68d27;hp=3a6b2e1d6975383bdb8ccf4d1e7e9eddaa8e5589;hpb=52fbce20e29b6dc7c5637b57b71de102c198b05a;p=yardstick.git diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py old mode 100755 new mode 100644 index 3a6b2e1d6..822e67723 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -1,15 +1,25 @@ -############################################################################## -# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others. +# Copyright 2014: Mirantis Inc. +# All Rights Reserved. # -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# yardstick comment: this is a modified copy of +# rally/rally/benchmark/runners/constant.py -'''A runner that runs a configurable number of times before it returns -''' +"""A runner that runs a configurable number of times before it returns +""" +from __future__ import absolute_import import os import multiprocessing import logging @@ -21,78 +31,99 @@ from yardstick.benchmark.runners import base LOG = logging.getLogger(__name__) -def _worker_process(queue, cls, method_name, scenario_cfg): +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted, output_queue): sequence = 1 + # if we don't do this we can hang waiting for the queue to drain + # have to do this in the subprocess + queue.cancel_join_thread() + output_queue.cancel_join_thread() + runner_cfg = scenario_cfg['runner'] interval = runner_cfg.get("interval", 1) iterations = runner_cfg.get("iterations", 1) + run_step = runner_cfg.get("run_step", "setup,run,teardown") + + delta = runner_cfg.get("delta", 2) LOG.info("worker START, iterations %d times, class %s", iterations, cls) runner_cfg['runner_id'] = os.getpid() - benchmark = cls(runner_cfg) - benchmark.setup() - method = getattr(benchmark, method_name) + benchmark = cls(scenario_cfg, context_cfg) + if "setup" in run_step: + benchmark.setup() - queue.put({'runner_id': runner_cfg['runner_id'], - 'scenario_cfg': scenario_cfg}) + method = getattr(benchmark, method_name) sla_action = None if "sla" in scenario_cfg: sla_action = scenario_cfg["sla"].get("action", "assert") - - while True: - - LOG.debug("runner=%(runner)s seq=%(sequence)s START" % - {"runner": runner_cfg["runner_id"], "sequence": sequence}) - - data = {} - errors = "" - - try: - data = method(scenario_cfg) - except AssertionError as assertion: - # SLA validation failed in scenario, determine what to do now - if sla_action == "assert": - raise - elif sla_action == "monitor": - LOG.warning("SLA validation failed: %s" % assertion.args) - errors = assertion.args - except Exception as e: - errors = traceback.format_exc() - LOG.exception(e) - - time.sleep(interval) - - benchmark_output = { - 'timestamp': time.time(), - 'sequence': sequence, - 'data': data, - 'errors': errors - } - - record = {'runner_id': runner_cfg['runner_id'], - 'benchmark': benchmark_output} - - queue.put(record) - - LOG.debug("runner=%(runner)s seq=%(sequence)s END" % - {"runner": runner_cfg["runner_id"], "sequence": sequence}) - - sequence += 1 - - if (errors and sla_action is None) or (sequence > iterations): - LOG.info("worker END") - break - - benchmark.teardown() + if "run" in run_step: + while True: + + LOG.debug("runner=%(runner)s seq=%(sequence)s START", + {"runner": runner_cfg["runner_id"], + "sequence": sequence}) + + data = {} + errors = "" + + try: + result = method(data) + except AssertionError as assertion: + # SLA validation failed in scenario, determine what to do now + if sla_action == "assert": + raise + elif sla_action == "monitor": + LOG.warning("SLA validation failed: %s", assertion.args) + errors = assertion.args + elif sla_action == "rate-control": + try: + scenario_cfg['options']['rate'] + except KeyError: + scenario_cfg.setdefault('options', {}) + scenario_cfg['options']['rate'] = 100 + + scenario_cfg['options']['rate'] -= delta + sequence = 1 + continue + except Exception as e: + errors = traceback.format_exc() + LOG.exception(e) + else: + if result: + output_queue.put(result) + + time.sleep(interval) + + benchmark_output = { + 'timestamp': time.time(), + 'sequence': sequence, + 'data': data, + 'errors': errors + } + + queue.put(benchmark_output) + + LOG.debug("runner=%(runner)s seq=%(sequence)s END", + {"runner": runner_cfg["runner_id"], + "sequence": sequence}) + + sequence += 1 + + if (errors and sla_action is None) or \ + (sequence > iterations or aborted.is_set()): + LOG.info("worker END") + break + if "teardown" in run_step: + benchmark.teardown() class IterationRunner(base.Runner): - '''Run a scenario for a configurable number of times + """Run a scenario for a configurable number of times If the scenario ends before the time has elapsed, it will be started again. @@ -105,11 +136,12 @@ If the scenario ends before the time has elapsed, it will be started again. type: int unit: seconds default: 1 sec - ''' + """ __execution_type__ = 'Iteration' - def _run_benchmark(self, cls, method, scenario_cfg): + def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): self.process = multiprocessing.Process( target=_worker_process, - args=(self.result_queue, cls, method, scenario_cfg)) + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted, self.output_queue)) self.process.start()