X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fbenchmark%2Frunners%2Fiteration.py;h=cb042437776b5c29c877061ccfeffc7e7c20a1e3;hb=2ed5cc6c71db4cc3ca5f9c3f1e2494f562faa79a;hp=03dcfae03e605b3e77b3e83a66901c130a78f106;hpb=18ffc52220f585e158e2ed4aee3d7d98c6814a2a;p=yardstick.git diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py old mode 100755 new mode 100644 index 03dcfae03..cb0424377 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -1,94 +1,142 @@ -############################################################################## -# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others. +# Copyright 2014: Mirantis Inc. +# All Rights Reserved. # -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. -'''A runner that runs a configurable number of times before it returns -''' +# yardstick comment: this is a modified copy of +# rally/rally/benchmark/runners/constant.py + +"""A runner that runs a configurable number of times before it returns +""" + +from __future__ import absolute_import -import os -import multiprocessing import logging -import traceback +import multiprocessing import time +import traceback + +import os from yardstick.benchmark.runners import base LOG = logging.getLogger(__name__) -def _worker_process(queue, cls, method_name, context, scenario_args): +QUEUE_PUT_TIMEOUT = 10 - sequence = 1 - interval = context.get("interval", 1) - iterations = context.get("iterations", 1) - LOG.info("worker START, iterations %d times, class %s", iterations, cls) +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted, output_queue): - context['runner'] = os.getpid() + sequence = 1 - benchmark = cls(context) - benchmark.setup() - method = getattr(benchmark, method_name) + runner_cfg = scenario_cfg['runner'] - record_context = {"runner": context["runner"], - "host": context["host"]} + interval = runner_cfg.get("interval", 1) + iterations = runner_cfg.get("iterations", 1) + run_step = runner_cfg.get("run_step", "setup,run,teardown") - sla_action = None - if "sla" in scenario_args: - sla_action = scenario_args["sla"].get("action", "assert") + delta = runner_cfg.get("delta", 2) + LOG.info("worker START, iterations %d times, class %s", iterations, cls) - while True: + runner_cfg['runner_id'] = os.getpid() - LOG.debug("runner=%(runner)s seq=%(sequence)s START" % - {"runner": context["runner"], "sequence": sequence}) + benchmark = cls(scenario_cfg, context_cfg) + if "setup" in run_step: + benchmark.setup() - data = {} - errors = "" + method = getattr(benchmark, method_name) + sla_action = None + if "sla" in scenario_cfg: + sla_action = scenario_cfg["sla"].get("action", "assert") + if "run" in run_step: + while True: + + LOG.debug("runner=%(runner)s seq=%(sequence)s START", + {"runner": runner_cfg["runner_id"], + "sequence": sequence}) + + data = {} + errors = "" + + try: + result = method(data) + except AssertionError as assertion: + # SLA validation failed in scenario, determine what to do now + if sla_action == "assert": + raise + elif sla_action == "monitor": + LOG.warning("SLA validation failed: %s", assertion.args) + errors = assertion.args + elif sla_action == "rate-control": + try: + scenario_cfg['options']['rate'] + except KeyError: + scenario_cfg.setdefault('options', {}) + scenario_cfg['options']['rate'] = 100 + + scenario_cfg['options']['rate'] -= delta + sequence = 1 + continue + except Exception: + errors = traceback.format_exc() + LOG.exception("") + else: + if result: + # add timeout for put so we don't block test + # if we do timeout we don't care about dropping individual KPIs + output_queue.put(result, True, QUEUE_PUT_TIMEOUT) + + time.sleep(interval) + + benchmark_output = { + 'timestamp': time.time(), + 'sequence': sequence, + 'data': data, + 'errors': errors + } + + queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT) + + LOG.debug("runner=%(runner)s seq=%(sequence)s END", + {"runner": runner_cfg["runner_id"], + "sequence": sequence}) + + sequence += 1 + + if (errors and sla_action is None) or \ + (sequence > iterations or aborted.is_set()): + LOG.info("worker END") + break + if "teardown" in run_step: try: - data = method(scenario_args) - except AssertionError as assertion: - # SLA validation failed in scenario, determine what to do now - if sla_action == "assert": - raise - elif sla_action == "monitor": - LOG.warning("SLA validation failed: %s" % assertion.args) - errors = assertion.args - except Exception as e: - errors = traceback.format_exc() - LOG.exception(e) - - time.sleep(interval) - - benchmark_output = { - 'timestamp': time.time(), - 'sequence': sequence, - 'data': data, - 'errors': errors - } - - queue.put({'context': record_context, 'sargs': scenario_args, - 'benchmark': benchmark_output}) - - LOG.debug("runner=%(runner)s seq=%(sequence)s END" % - {"runner": context["runner"], "sequence": sequence}) - - sequence += 1 - - if (errors and sla_action is None) or (sequence > iterations): - LOG.info("worker END") - break + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) - benchmark.teardown() + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class IterationRunner(base.Runner): - '''Run a scenario for a configurable number of times + """Run a scenario for a configurable number of times If the scenario ends before the time has elapsed, it will be started again. @@ -101,11 +149,14 @@ If the scenario ends before the time has elapsed, it will be started again. type: int unit: seconds default: 1 sec - ''' + """ __execution_type__ = 'Iteration' - def _run_benchmark(self, cls, method, scenario_args): + def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, - args=(self.result_queue, cls, method, self.config, scenario_args)) + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted, self.output_queue)) self.process.start()