Add run_in_background attribute to scenarios 57/4357/8
authorJo¶rgen Karlsson <jorgen.w.karlsson@ericsson.com>
Fri, 11 Dec 2015 14:50:22 +0000 (15:50 +0100)
committerJörgen Karlsson <jorgen.w.karlsson@ericsson.com>
Tue, 15 Dec 2015 17:12:55 +0000 (17:12 +0000)
This change adds the possibility to run scenarios as "background
tasks".

Background scenarios/tasks:
  - are started before all "normal scenarios"
  - runs in parallel with "normal scenarios"
  - terminates when all "normal scenarios" have completed
    their tasks

They are intended as a way to perform background tasks, e.g. collect
data such as cpuload etc, in parallel with the execution of normal
benchmarking scenarios.

Note that we already have the 'run_in_parallel' attribute but
this attribute has a couple of issues and do not solve all the
uses cases.

Change-Id: I9c5230bfdbbb66030f57b658ce1db87ff2c2d62b
Signed-off-by: Jo¶rgen Karlsson <jorgen.w.karlsson@ericsson.com>
samples/background-task.yaml [new file with mode: 0644]
tests/unit/cmd/commands/test_task.py
yardstick/benchmark/runners/arithmetic.py
yardstick/benchmark/runners/base.py
yardstick/benchmark/runners/duration.py
yardstick/benchmark/runners/iteration.py
yardstick/benchmark/runners/sequence.py
yardstick/cmd/commands/task.py

diff --git a/samples/background-task.yaml b/samples/background-task.yaml
new file mode 100644 (file)
index 0000000..f81867b
--- /dev/null
@@ -0,0 +1,60 @@
+---
+# Sample benchmark task config file
+# Demonstrate use of background scenarios
+
+schema: "yardstick:task:0.1"
+
+scenarios:
+-
+  type: CPUload
+  options:
+    interval: 1
+
+  host: zeus.demo
+
+  # This scenario is run as a background scenario and runs
+  # in parallel with other scenarios.
+  #
+  # Background scenarios are started before normal scenarios
+  # and are terminated when all normal scenarios have ended.
+  #
+  # A background scenario does not need a runner section as it
+  # will always use an infinite duration runner that are terminated
+  # when all normal scenarios have completed.
+  #
+  run_in_background: true   # default: false
+-
+  type: Iperf3
+  options:
+  host: zeus.demo
+  target: hera.demo
+
+  runner:
+    type: Duration
+    duration: 60
+
+  sla:
+    bytes_per_second: 2900000000
+    action: monitor
+
+context:
+  name: demo
+  image: yardstick-trusty-server
+  flavor: yardstick-flavor
+  user: ec2-user
+
+  placement_groups:
+    pgrp1:
+      policy: "availability"
+
+  servers:
+    zeus:
+      floating_ip: true
+      placement: "pgrp1"
+    hera:
+      floating_ip: true
+      placement: "pgrp1"
+
+  networks:
+    test:
+      cidr: '10.0.1.0/24'
index 89813cb..e785e99 100644 (file)
@@ -36,3 +36,23 @@ class TaskCommandsTestCase(unittest.TestCase):
 
         self.assertEqual(context_cfg["host"], server_info)
         self.assertEqual(context_cfg["target"], server_info)
+
+    @mock.patch('yardstick.cmd.commands.task.Context')
+    @mock.patch('yardstick.cmd.commands.task.base_runner')
+    def test_run(self, mock_base_runner, mock_ctx):
+        scenario = \
+            {'host': 'athena.demo',
+             'target': 'ares.demo',
+             'runner':
+                 {'duration': 60,
+                  'interval': 1,
+                  'type': 'Duration'
+                 },
+                 'type': 'Ping'}
+
+        t = task.TaskCommands()
+        runner = mock.Mock()
+        runner.join.return_value = 0
+        mock_base_runner.Runner.get.return_value = runner
+        t._run([scenario], False, "yardstick.out")
+        self.assertTrue(runner.run.called)
index af23034..4eab664 100755 (executable)
@@ -22,7 +22,8 @@ from yardstick.benchmark.runners import base
 LOG = logging.getLogger(__name__)
 
 
-def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
+def _worker_process(queue, cls, method_name, scenario_cfg,
+                    context_cfg, aborted):
 
     sequence = 1
 
@@ -55,6 +56,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
 
     for value in range(start, stop+margin, step):
 
+        if aborted.is_set():
+            break
+
         options[arg_name] = value
 
         LOG.debug("runner=%(runner)s seq=%(sequence)s START" %
@@ -133,5 +137,6 @@ class ArithmeticRunner(base.Runner):
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
         self.process = multiprocessing.Process(
             target=_worker_process,
-            args=(self.result_queue, cls, method, scenario_cfg, context_cfg))
+            args=(self.result_queue, cls, method, scenario_cfg,
+                  context_cfg, self.aborted))
         self.process.start()
index d443806..9925ace 100755 (executable)
@@ -141,12 +141,19 @@ class Runner(object):
     @staticmethod
     def release(runner):
         '''Release the runner'''
-        Runner.runners.remove(runner)
+        if runner in Runner.runners:
+            Runner.runners.remove(runner)
 
         # if this was the last runner, stop the output serializer subprocess
         if len(Runner.runners) == 0:
             Runner.release_dump_process()
 
+    @staticmethod
+    def terminate(runner):
+        '''Terminate the runner'''
+        if runner.process and runner.process.is_alive():
+            runner.process.terminate()
+
     @staticmethod
     def terminate_all():
         '''Terminate all runners (subprocesses)'''
@@ -173,6 +180,7 @@ class Runner(object):
         self.periodic_action_process = None
         self.result_queue = queue
         self.process = None
+        self.aborted = multiprocessing.Event()
         Runner.runners.append(self)
 
     def run_post_stop_action(self):
@@ -197,6 +205,7 @@ class Runner(object):
         cls = getattr(module, path_split[-1])
 
         self.config['object'] = class_name
+        self.aborted.clear()
 
         # run a potentially configured pre-start action
         if "pre-start-action" in self.config:
@@ -230,8 +239,12 @@ class Runner(object):
 
         self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
 
-    def join(self):
-        self.process.join()
+    def abort(self):
+        '''Abort the execution of a scenario'''
+        self.aborted.set()
+
+    def join(self, timeout=None):
+        self.process.join(timeout)
         if self.periodic_action_process:
             self.periodic_action_process.terminate()
             self.periodic_action_process = None
index 40e0aa7..e2a21c2 100644 (file)
@@ -21,7 +21,8 @@ from yardstick.benchmark.runners import base
 LOG = logging.getLogger(__name__)
 
 
-def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
+def _worker_process(queue, cls, method_name, scenario_cfg,
+                    context_cfg, aborted):
 
     sequence = 1
 
@@ -86,7 +87,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
 
         sequence += 1
 
-        if (errors and sla_action is None) or (time.time() - start > duration):
+        if (errors and sla_action is None) or \
+                (time.time() - start > duration or aborted.is_set()):
             LOG.info("worker END")
             break
 
@@ -113,5 +115,6 @@ If the scenario ends before the time has elapsed, it will be started again.
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
         self.process = multiprocessing.Process(
             target=_worker_process,
-            args=(self.result_queue, cls, method, scenario_cfg, context_cfg))
+            args=(self.result_queue, cls, method, scenario_cfg,
+                  context_cfg, self.aborted))
         self.process.start()
index 077e0e8..e38ed37 100755 (executable)
@@ -21,7 +21,8 @@ from yardstick.benchmark.runners import base
 LOG = logging.getLogger(__name__)
 
 
-def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
+def _worker_process(queue, cls, method_name, scenario_cfg,
+                    context_cfg, aborted):
 
     sequence = 1
 
@@ -85,7 +86,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
 
         sequence += 1
 
-        if (errors and sla_action is None) or (sequence > iterations):
+        if (errors and sla_action is None) or \
+                (sequence > iterations or aborted.is_set()):
             LOG.info("worker END")
             break
 
@@ -112,5 +114,6 @@ If the scenario ends before the time has elapsed, it will be started again.
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
         self.process = multiprocessing.Process(
             target=_worker_process,
-            args=(self.result_queue, cls, method, scenario_cfg, context_cfg))
+            args=(self.result_queue, cls, method, scenario_cfg,
+                  context_cfg, self.aborted))
         self.process.start()
index a410eea..47708fc 100644 (file)
@@ -22,7 +22,8 @@ from yardstick.benchmark.runners import base
 LOG = logging.getLogger(__name__)
 
 
-def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
+def _worker_process(queue, cls, method_name, scenario_cfg,
+                    context_cfg, aborted):
 
     sequence = 1
 
@@ -95,7 +96,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
 
         sequence += 1
 
-        if errors:
+        if errors or aborted.is_set():
             break
 
     benchmark.teardown()
@@ -125,5 +126,6 @@ class SequenceRunner(base.Runner):
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
         self.process = multiprocessing.Process(
             target=_worker_process,
-            args=(self.result_queue, cls, method, scenario_cfg, context_cfg))
+            args=(self.result_queue, cls, method, scenario_cfg,
+                  context_cfg, self.aborted))
         self.process.start()
index d6cd698..a56824a 100755 (executable)
@@ -16,6 +16,7 @@ import atexit
 import ipaddress
 import time
 import logging
+from itertools import ifilter
 from yardstick.benchmark.contexts.base import Context
 from yardstick.benchmark.runners import base as base_runner
 from yardstick.common.task_template import TaskTemplate
@@ -108,11 +109,20 @@ class TaskCommands(object):
         for context in Context.list:
             context.deploy()
 
+        background_runners = []
+
+        # Start all background scenarios
+        for scenario in ifilter(_is_background_scenario, scenarios):
+            scenario["runner"] = dict(type="Duration", duration=1000000000)
+            runner = run_one_scenario(scenario, output_file)
+            background_runners.append(runner)
+
         runners = []
         if run_in_parallel:
             for scenario in scenarios:
-                runner = run_one_scenario(scenario, output_file)
-                runners.append(runner)
+                if not _is_background_scenario(scenario):
+                    runner = run_one_scenario(scenario, output_file)
+                    runners.append(runner)
 
             # Wait for runners to finish
             for runner in runners:
@@ -121,9 +131,25 @@ class TaskCommands(object):
         else:
             # run serially
             for scenario in scenarios:
-                runner = run_one_scenario(scenario, output_file)
+                if not _is_background_scenario(scenario):
+                    runner = run_one_scenario(scenario, output_file)
+                    runner_join(runner)
+                    print "Runner ended, output in", output_file
+
+        # Abort background runners
+        for runner in background_runners:
+            runner.abort()
+
+        # Wait for background runners to finish
+        for runner in background_runners:
+            if runner.join(timeout=60) is None:
+                # Nuke if it did not stop nicely
+                base_runner.Runner.terminate(runner)
                 runner_join(runner)
-                print "Runner ended, output in", output_file
+            else:
+                base_runner.Runner.release(runner)
+            print "Background task ended"
+
 
 # TODO: Move stuff below into TaskCommands class !?
 
@@ -280,6 +306,13 @@ def _is_same_heat_context(host_attr, target_attr):
     return False
 
 
+def _is_background_scenario(scenario):
+    if "run_in_background" in scenario:
+        return scenario["run_in_background"]
+    else:
+        return False
+
+
 def run_one_scenario(scenario_cfg, output_file):
     '''run one scenario using context'''
     runner_cfg = scenario_cfg["runner"]