Adding cpu set to enable affinity for given vcpu
[yardstick.git] / yardstick / benchmark / runners / iteration.py
index 3a839b6..cb04243 100644 (file)
 # yardstick comment: this is a modified copy of
 # rally/rally/benchmark/runners/constant.py
 
 # yardstick comment: this is a modified copy of
 # rally/rally/benchmark/runners/constant.py
 
-'''A runner that runs a configurable number of times before it returns
-'''
+"""A runner that runs a configurable number of times before it returns
+"""
+
+from __future__ import absolute_import
 
 
-import os
-import multiprocessing
 import logging
 import logging
-import traceback
+import multiprocessing
 import time
 import time
+import traceback
+
+import os
 
 from yardstick.benchmark.runners import base
 
 LOG = logging.getLogger(__name__)
 
 
 
 from yardstick.benchmark.runners import base
 
 LOG = logging.getLogger(__name__)
 
 
+QUEUE_PUT_TIMEOUT = 10
+
+
 def _worker_process(queue, cls, method_name, scenario_cfg,
 def _worker_process(queue, cls, method_name, scenario_cfg,
-                    context_cfg, aborted):
+                    context_cfg, aborted, output_queue):
 
     sequence = 1
 
 
     sequence = 1
 
@@ -40,6 +46,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
     interval = runner_cfg.get("interval", 1)
     iterations = runner_cfg.get("iterations", 1)
     run_step = runner_cfg.get("run_step", "setup,run,teardown")
     interval = runner_cfg.get("interval", 1)
     iterations = runner_cfg.get("iterations", 1)
     run_step = runner_cfg.get("run_step", "setup,run,teardown")
+
+    delta = runner_cfg.get("delta", 2)
     LOG.info("worker START, iterations %d times, class %s", iterations, cls)
 
     runner_cfg['runner_id'] = os.getpid()
     LOG.info("worker START, iterations %d times, class %s", iterations, cls)
 
     runner_cfg['runner_id'] = os.getpid()
@@ -50,10 +58,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
 
     method = getattr(benchmark, method_name)
 
 
     method = getattr(benchmark, method_name)
 
-    queue.put({'runner_id': runner_cfg['runner_id'],
-               'scenario_cfg': scenario_cfg,
-               'context_cfg': context_cfg})
-
     sla_action = None
     if "sla" in scenario_cfg:
         sla_action = scenario_cfg["sla"].get("action", "assert")
     sla_action = None
     if "sla" in scenario_cfg:
         sla_action = scenario_cfg["sla"].get("action", "assert")
@@ -68,7 +72,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             errors = ""
 
             try:
             errors = ""
 
             try:
-                method(data)
+                result = method(data)
             except AssertionError as assertion:
                 # SLA validation failed in scenario, determine what to do now
                 if sla_action == "assert":
             except AssertionError as assertion:
                 # SLA validation failed in scenario, determine what to do now
                 if sla_action == "assert":
@@ -76,9 +80,24 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
                 elif sla_action == "monitor":
                     LOG.warning("SLA validation failed: %s", assertion.args)
                     errors = assertion.args
                 elif sla_action == "monitor":
                     LOG.warning("SLA validation failed: %s", assertion.args)
                     errors = assertion.args
-            except Exception as e:
+                elif sla_action == "rate-control":
+                    try:
+                        scenario_cfg['options']['rate']
+                    except KeyError:
+                        scenario_cfg.setdefault('options', {})
+                        scenario_cfg['options']['rate'] = 100
+
+                    scenario_cfg['options']['rate'] -= delta
+                    sequence = 1
+                    continue
+            except Exception:
                 errors = traceback.format_exc()
                 errors = traceback.format_exc()
-                LOG.exception(e)
+                LOG.exception("")
+            else:
+                if result:
+                    # add timeout for put so we don't block test
+                    # if we do timeout we don't care about dropping individual KPIs
+                    output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
 
             time.sleep(interval)
 
 
             time.sleep(interval)
 
@@ -89,10 +108,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
                 'errors': errors
             }
 
                 'errors': errors
             }
 
-            record = {'runner_id': runner_cfg['runner_id'],
-                      'benchmark': benchmark_output}
-
-            queue.put(record)
+            queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
 
             LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                       {"runner": runner_cfg["runner_id"],
 
             LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                       {"runner": runner_cfg["runner_id"],
@@ -105,11 +121,22 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
                 LOG.info("worker END")
                 break
     if "teardown" in run_step:
                 LOG.info("worker END")
                 break
     if "teardown" in run_step:
-        benchmark.teardown()
+        try:
+            benchmark.teardown()
+        except Exception:
+            # catch any exception in teardown and convert to simple exception
+            # never pass exceptions back to multiprocessing, because some exceptions can
+            # be unpicklable
+            # https://bugs.python.org/issue9400
+            LOG.exception("")
+            raise SystemExit(1)
+
+    LOG.debug("queue.qsize() = %s", queue.qsize())
+    LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
 
 
 class IterationRunner(base.Runner):
 
 
 class IterationRunner(base.Runner):
-    '''Run a scenario for a configurable number of times
+    """Run a scenario for a configurable number of times
 
 If the scenario ends before the time has elapsed, it will be started again.
 
 
 If the scenario ends before the time has elapsed, it will be started again.
 
@@ -122,12 +149,14 @@ If the scenario ends before the time has elapsed, it will be started again.
         type:    int
         unit:    seconds
         default: 1 sec
         type:    int
         unit:    seconds
         default: 1 sec
-    '''
+    """
     __execution_type__ = 'Iteration'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
     __execution_type__ = 'Iteration'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+        name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
         self.process = multiprocessing.Process(
         self.process = multiprocessing.Process(
+            name=name,
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
-                  context_cfg, self.aborted))
+                  context_cfg, self.aborted, self.output_queue))
         self.process.start()
         self.process.start()