Merge "code inspection fixes: test_pktgen"
[yardstick.git] / yardstick / benchmark / runners / iteration.py
index 930f883..822e677 100644 (file)
@@ -16,8 +16,8 @@
 # yardstick comment: this is a modified copy of
 # rally/rally/benchmark/runners/constant.py
 
 # yardstick comment: this is a modified copy of
 # rally/rally/benchmark/runners/constant.py
 
-'''A runner that runs a configurable number of times before it returns
-'''
+"""A runner that runs a configurable number of times before it returns
+"""
 
 from __future__ import absolute_import
 import os
 
 from __future__ import absolute_import
 import os
@@ -32,15 +32,22 @@ LOG = logging.getLogger(__name__)
 
 
 def _worker_process(queue, cls, method_name, scenario_cfg,
 
 
 def _worker_process(queue, cls, method_name, scenario_cfg,
-                    context_cfg, aborted):
+                    context_cfg, aborted, output_queue):
 
     sequence = 1
 
 
     sequence = 1
 
+    # if we don't do this we can hang waiting for the queue to drain
+    # have to do this in the subprocess
+    queue.cancel_join_thread()
+    output_queue.cancel_join_thread()
+
     runner_cfg = scenario_cfg['runner']
 
     interval = runner_cfg.get("interval", 1)
     iterations = runner_cfg.get("iterations", 1)
     run_step = runner_cfg.get("run_step", "setup,run,teardown")
     runner_cfg = scenario_cfg['runner']
 
     interval = runner_cfg.get("interval", 1)
     iterations = runner_cfg.get("iterations", 1)
     run_step = runner_cfg.get("run_step", "setup,run,teardown")
+
+    delta = runner_cfg.get("delta", 2)
     LOG.info("worker START, iterations %d times, class %s", iterations, cls)
 
     runner_cfg['runner_id'] = os.getpid()
     LOG.info("worker START, iterations %d times, class %s", iterations, cls)
 
     runner_cfg['runner_id'] = os.getpid()
@@ -51,10 +58,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
 
     method = getattr(benchmark, method_name)
 
 
     method = getattr(benchmark, method_name)
 
-    queue.put({'runner_id': runner_cfg['runner_id'],
-               'scenario_cfg': scenario_cfg,
-               'context_cfg': context_cfg})
-
     sla_action = None
     if "sla" in scenario_cfg:
         sla_action = scenario_cfg["sla"].get("action", "assert")
     sla_action = None
     if "sla" in scenario_cfg:
         sla_action = scenario_cfg["sla"].get("action", "assert")
@@ -69,7 +72,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             errors = ""
 
             try:
             errors = ""
 
             try:
-                method(data)
+                result = method(data)
             except AssertionError as assertion:
                 # SLA validation failed in scenario, determine what to do now
                 if sla_action == "assert":
             except AssertionError as assertion:
                 # SLA validation failed in scenario, determine what to do now
                 if sla_action == "assert":
@@ -77,9 +80,22 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
                 elif sla_action == "monitor":
                     LOG.warning("SLA validation failed: %s", assertion.args)
                     errors = assertion.args
                 elif sla_action == "monitor":
                     LOG.warning("SLA validation failed: %s", assertion.args)
                     errors = assertion.args
+                elif sla_action == "rate-control":
+                    try:
+                        scenario_cfg['options']['rate']
+                    except KeyError:
+                        scenario_cfg.setdefault('options', {})
+                        scenario_cfg['options']['rate'] = 100
+
+                    scenario_cfg['options']['rate'] -= delta
+                    sequence = 1
+                    continue
             except Exception as e:
                 errors = traceback.format_exc()
                 LOG.exception(e)
             except Exception as e:
                 errors = traceback.format_exc()
                 LOG.exception(e)
+            else:
+                if result:
+                    output_queue.put(result)
 
             time.sleep(interval)
 
 
             time.sleep(interval)
 
@@ -90,10 +106,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
                 'errors': errors
             }
 
                 'errors': errors
             }
 
-            record = {'runner_id': runner_cfg['runner_id'],
-                      'benchmark': benchmark_output}
-
-            queue.put(record)
+            queue.put(benchmark_output)
 
             LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                       {"runner": runner_cfg["runner_id"],
 
             LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                       {"runner": runner_cfg["runner_id"],
@@ -110,7 +123,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
 
 
 class IterationRunner(base.Runner):
 
 
 class IterationRunner(base.Runner):
-    '''Run a scenario for a configurable number of times
+    """Run a scenario for a configurable number of times
 
 If the scenario ends before the time has elapsed, it will be started again.
 
 
 If the scenario ends before the time has elapsed, it will be started again.
 
@@ -123,12 +136,12 @@ If the scenario ends before the time has elapsed, it will be started again.
         type:    int
         unit:    seconds
         default: 1 sec
         type:    int
         unit:    seconds
         default: 1 sec
-    '''
+    """
     __execution_type__ = 'Iteration'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
         self.process = multiprocessing.Process(
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
     __execution_type__ = 'Iteration'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
         self.process = multiprocessing.Process(
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
-                  context_cfg, self.aborted))
+                  context_cfg, self.aborted, self.output_queue))
         self.process.start()
         self.process.start()