--- /dev/null
+---
+# Sample benchmark task config file
+# Demonstrate use of background scenarios
+
+schema: "yardstick:task:0.1"
+
+scenarios:
+-
+ type: CPUload
+ options:
+ interval: 1
+
+ host: zeus.demo
+
+ # This scenario is run as a background scenario and runs
+ # in parallel with other scenarios.
+ #
+ # Background scenarios are started before normal scenarios
+ # and are terminated when all normal scenarios have ended.
+ #
+ # A background scenario does not need a runner section as it
+ # will always use an infinite duration runner that are terminated
+ # when all normal scenarios have completed.
+ #
+ run_in_background: true # default: false
+-
+ type: Iperf3
+ options:
+ host: zeus.demo
+ target: hera.demo
+
+ runner:
+ type: Duration
+ duration: 60
+
+ sla:
+ bytes_per_second: 2900000000
+ action: monitor
+
+context:
+ name: demo
+ image: yardstick-trusty-server
+ flavor: yardstick-flavor
+ user: ec2-user
+
+ placement_groups:
+ pgrp1:
+ policy: "availability"
+
+ servers:
+ zeus:
+ floating_ip: true
+ placement: "pgrp1"
+ hera:
+ floating_ip: true
+ placement: "pgrp1"
+
+ networks:
+ test:
+ cidr: '10.0.1.0/24'
self.assertEqual(context_cfg["host"], server_info)
self.assertEqual(context_cfg["target"], server_info)
+
+ @mock.patch('yardstick.cmd.commands.task.Context')
+ @mock.patch('yardstick.cmd.commands.task.base_runner')
+ def test_run(self, mock_base_runner, mock_ctx):
+ scenario = \
+ {'host': 'athena.demo',
+ 'target': 'ares.demo',
+ 'runner':
+ {'duration': 60,
+ 'interval': 1,
+ 'type': 'Duration'
+ },
+ 'type': 'Ping'}
+
+ t = task.TaskCommands()
+ runner = mock.Mock()
+ runner.join.return_value = 0
+ mock_base_runner.Runner.get.return_value = runner
+ t._run([scenario], False, "yardstick.out")
+ self.assertTrue(runner.run.called)
LOG = logging.getLogger(__name__)
-def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
+def _worker_process(queue, cls, method_name, scenario_cfg,
+ context_cfg, aborted):
sequence = 1
for value in range(start, stop+margin, step):
+ if aborted.is_set():
+ break
+
options[arg_name] = value
LOG.debug("runner=%(runner)s seq=%(sequence)s START" %
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
self.process = multiprocessing.Process(
target=_worker_process,
- args=(self.result_queue, cls, method, scenario_cfg, context_cfg))
+ args=(self.result_queue, cls, method, scenario_cfg,
+ context_cfg, self.aborted))
self.process.start()
@staticmethod
def release(runner):
'''Release the runner'''
- Runner.runners.remove(runner)
+ if runner in Runner.runners:
+ Runner.runners.remove(runner)
# if this was the last runner, stop the output serializer subprocess
if len(Runner.runners) == 0:
Runner.release_dump_process()
+ @staticmethod
+ def terminate(runner):
+ '''Terminate the runner'''
+ if runner.process and runner.process.is_alive():
+ runner.process.terminate()
+
@staticmethod
def terminate_all():
'''Terminate all runners (subprocesses)'''
self.periodic_action_process = None
self.result_queue = queue
self.process = None
+ self.aborted = multiprocessing.Event()
Runner.runners.append(self)
def run_post_stop_action(self):
cls = getattr(module, path_split[-1])
self.config['object'] = class_name
+ self.aborted.clear()
# run a potentially configured pre-start action
if "pre-start-action" in self.config:
self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
- def join(self):
- self.process.join()
+ def abort(self):
+ '''Abort the execution of a scenario'''
+ self.aborted.set()
+
+ def join(self, timeout=None):
+ self.process.join(timeout)
if self.periodic_action_process:
self.periodic_action_process.terminate()
self.periodic_action_process = None
LOG = logging.getLogger(__name__)
-def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
+def _worker_process(queue, cls, method_name, scenario_cfg,
+ context_cfg, aborted):
sequence = 1
sequence += 1
- if (errors and sla_action is None) or (time.time() - start > duration):
+ if (errors and sla_action is None) or \
+ (time.time() - start > duration or aborted.is_set()):
LOG.info("worker END")
break
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
self.process = multiprocessing.Process(
target=_worker_process,
- args=(self.result_queue, cls, method, scenario_cfg, context_cfg))
+ args=(self.result_queue, cls, method, scenario_cfg,
+ context_cfg, self.aborted))
self.process.start()
LOG = logging.getLogger(__name__)
-def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
+def _worker_process(queue, cls, method_name, scenario_cfg,
+ context_cfg, aborted):
sequence = 1
sequence += 1
- if (errors and sla_action is None) or (sequence > iterations):
+ if (errors and sla_action is None) or \
+ (sequence > iterations or aborted.is_set()):
LOG.info("worker END")
break
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
self.process = multiprocessing.Process(
target=_worker_process,
- args=(self.result_queue, cls, method, scenario_cfg, context_cfg))
+ args=(self.result_queue, cls, method, scenario_cfg,
+ context_cfg, self.aborted))
self.process.start()
LOG = logging.getLogger(__name__)
-def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
+def _worker_process(queue, cls, method_name, scenario_cfg,
+ context_cfg, aborted):
sequence = 1
sequence += 1
- if errors:
+ if errors or aborted.is_set():
break
benchmark.teardown()
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
self.process = multiprocessing.Process(
target=_worker_process,
- args=(self.result_queue, cls, method, scenario_cfg, context_cfg))
+ args=(self.result_queue, cls, method, scenario_cfg,
+ context_cfg, self.aborted))
self.process.start()
import ipaddress
import time
import logging
+from itertools import ifilter
from yardstick.benchmark.contexts.base import Context
from yardstick.benchmark.runners import base as base_runner
from yardstick.common.task_template import TaskTemplate
for context in Context.list:
context.deploy()
+ background_runners = []
+
+ # Start all background scenarios
+ for scenario in ifilter(_is_background_scenario, scenarios):
+ scenario["runner"] = dict(type="Duration", duration=1000000000)
+ runner = run_one_scenario(scenario, output_file)
+ background_runners.append(runner)
+
runners = []
if run_in_parallel:
for scenario in scenarios:
- runner = run_one_scenario(scenario, output_file)
- runners.append(runner)
+ if not _is_background_scenario(scenario):
+ runner = run_one_scenario(scenario, output_file)
+ runners.append(runner)
# Wait for runners to finish
for runner in runners:
else:
# run serially
for scenario in scenarios:
- runner = run_one_scenario(scenario, output_file)
+ if not _is_background_scenario(scenario):
+ runner = run_one_scenario(scenario, output_file)
+ runner_join(runner)
+ print "Runner ended, output in", output_file
+
+ # Abort background runners
+ for runner in background_runners:
+ runner.abort()
+
+ # Wait for background runners to finish
+ for runner in background_runners:
+ if runner.join(timeout=60) is None:
+ # Nuke if it did not stop nicely
+ base_runner.Runner.terminate(runner)
runner_join(runner)
- print "Runner ended, output in", output_file
+ else:
+ base_runner.Runner.release(runner)
+ print "Background task ended"
+
# TODO: Move stuff below into TaskCommands class !?
return False
+def _is_background_scenario(scenario):
+ if "run_in_background" in scenario:
+ return scenario["run_in_background"]
+ else:
+ return False
+
+
def run_one_scenario(scenario_cfg, output_file):
'''run one scenario using context'''
runner_cfg = scenario_cfg["runner"]