Merge "Pass parameters between scenarios"
authorRex Lee <limingjiang@huawei.com>
Mon, 5 Jun 2017 01:20:33 +0000 (01:20 +0000)
committerGerrit Code Review <gerrit@opnfv.org>
Mon, 5 Jun 2017 01:20:33 +0000 (01:20 +0000)
tests/unit/benchmark/core/test_task.py
tests/unit/benchmark/runner/__init__.py [new file with mode: 0644]
tests/unit/benchmark/runner/test_base.py [new file with mode: 0644]
yardstick/benchmark/core/task.py
yardstick/benchmark/runners/arithmetic.py
yardstick/benchmark/runners/base.py
yardstick/benchmark/runners/duration.py
yardstick/benchmark/runners/iteration.py
yardstick/benchmark/runners/sequence.py

index cd7ffde..8034392 100644 (file)
@@ -64,6 +64,7 @@ class TaskTestCase(unittest.TestCase):
         t = task.Task()
         runner = mock.Mock()
         runner.join.return_value = 0
+        runner.get_output.return_value = {}
         mock_base_runner.Runner.get.return_value = runner
         t._run([scenario], False, "yardstick.out")
         self.assertTrue(runner.run.called)
@@ -155,6 +156,33 @@ class TaskTestCase(unittest.TestCase):
         self.assertEqual(task_args_fnames[0], None)
         self.assertEqual(task_args_fnames[1], None)
 
+    def test_parse_options(self):
+        options = {
+            'openstack': {
+                'EXTERNAL_NETWORK': '$network'
+            },
+            'ndoes': ['node1', '$node'],
+            'host': '$host'
+        }
+
+        t = task.Task()
+        t.outputs = {
+            'network': 'ext-net',
+            'node': 'node2',
+            'host': 'server.yardstick'
+        }
+
+        idle_result = {
+            'openstack': {
+                'EXTERNAL_NETWORK': 'ext-net'
+            },
+            'ndoes': ['node1', 'node2'],
+            'host': 'server.yardstick'
+        }
+
+        actual_result = t._parse_options(options)
+        self.assertEqual(idle_result, actual_result)
+
     def test_change_server_name_host_str(self):
         scenario = {'host': 'demo'}
         suffix = '-8'
diff --git a/tests/unit/benchmark/runner/__init__.py b/tests/unit/benchmark/runner/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/tests/unit/benchmark/runner/test_base.py b/tests/unit/benchmark/runner/test_base.py
new file mode 100644 (file)
index 0000000..7880fe5
--- /dev/null
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+
+##############################################################################
+# Copyright (c) 2017 Huawei Technologies Co.,Ltd and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from __future__ import print_function
+from __future__ import absolute_import
+
+import unittest
+import multiprocessing
+import time
+
+from yardstick.benchmark.runners.iteration import IterationRunner
+
+
+class RunnerTestCase(unittest.TestCase):
+
+    def test_get_output(self):
+        queue = multiprocessing.Queue()
+        runner = IterationRunner({}, queue)
+        runner.output_queue.put({'case': 'opnfv_yardstick_tc002'})
+        runner.output_queue.put({'criteria': 'PASS'})
+
+        idle_result = {
+            'case': 'opnfv_yardstick_tc002',
+            'criteria': 'PASS'
+        }
+
+        time.sleep(1)
+        actual_result = runner.get_output()
+        self.assertEqual(idle_result, actual_result)
+
+
+def main():
+    unittest.main()
+
+
+if __name__ == '__main__':
+    main()
index 3a151db..c44081b 100644 (file)
@@ -44,6 +44,7 @@ class Task(object):     # pragma: no cover
     def __init__(self):
         self.config = {}
         self.contexts = []
+        self.outputs = {}
 
     def start(self, args, **kwargs):
         """Start a benchmark scenario."""
@@ -136,6 +137,7 @@ class Task(object):     # pragma: no cover
             # Wait for runners to finish
             for runner in runners:
                 runner_join(runner)
+                self.outputs.update(runner.get_output())
                 print("Runner ended, output in", output_file)
         else:
             # run serially
@@ -143,6 +145,7 @@ class Task(object):     # pragma: no cover
                 if not _is_background_scenario(scenario):
                     runner = self.run_one_scenario(scenario, output_file)
                     runner_join(runner)
+                    self.outputs.update(runner.get_output())
                     print("Runner ended, output in", output_file)
 
         # Abort background runners
@@ -155,6 +158,7 @@ class Task(object):     # pragma: no cover
                 # Nuke if it did not stop nicely
                 base_runner.Runner.terminate(runner)
                 runner_join(runner)
+                self.outputs.update(runner.get_output())
             else:
                 base_runner.Runner.release(runner)
             print("Background task ended")
@@ -168,11 +172,24 @@ class Task(object):     # pragma: no cover
             for context in self.contexts[::-1]:
                 context.undeploy()
 
+    def _parse_options(self, op):
+        if isinstance(op, dict):
+            return {k: self._parse_options(v) for k, v in op.items()}
+        elif isinstance(op, list):
+            return [self._parse_options(v) for v in op]
+        elif isinstance(op, str):
+            return self.outputs.get(op[1:]) if op.startswith('$') else op
+        else:
+            return op
+
     def run_one_scenario(self, scenario_cfg, output_file):
         """run one scenario using context"""
         runner_cfg = scenario_cfg["runner"]
         runner_cfg['output_filename'] = output_file
 
+        options = scenario_cfg.get('options', {})
+        scenario_cfg['options'] = self._parse_options(options)
+
         # TODO support get multi hosts/vms info
         context_cfg = {}
         if "host" in scenario_cfg:
index 65fdb9d..7ec5933 100755 (executable)
@@ -42,7 +42,7 @@ LOG = logging.getLogger(__name__)
 
 
 def _worker_process(queue, cls, method_name, scenario_cfg,
-                    context_cfg, aborted):
+                    context_cfg, aborted, output_queue):
 
     sequence = 1
 
@@ -108,7 +108,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
         errors = ""
 
         try:
-            method(data)
+            result = method(data)
         except AssertionError as assertion:
             # SLA validation failed in scenario, determine what to do now
             if sla_action == "assert":
@@ -119,6 +119,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
         except Exception as e:
             errors = traceback.format_exc()
             LOG.exception(e)
+        else:
+            if result:
+                output_queue.put(result)
 
         time.sleep(interval)
 
@@ -188,5 +191,5 @@ class ArithmeticRunner(base.Runner):
         self.process = multiprocessing.Process(
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
-                  context_cfg, self.aborted))
+                  context_cfg, self.aborted, self.output_queue))
         self.process.start()
index b48ed97..ebb9a91 100755 (executable)
@@ -197,6 +197,7 @@ class Runner(object):
         self.config = config
         self.periodic_action_process = None
         self.result_queue = queue
+        self.output_queue = multiprocessing.Queue()
         self.process = None
         self.aborted = multiprocessing.Event()
         Runner.runners.append(self)
@@ -269,3 +270,9 @@ class Runner(object):
 
         self.run_post_stop_action()
         return self.process.exitcode
+
+    def get_output(self):
+        result = {}
+        while not self.output_queue.empty():
+            result.update(self.output_queue.get())
+        return result
index 772983c..2bf2cd2 100644 (file)
@@ -32,7 +32,7 @@ LOG = logging.getLogger(__name__)
 
 
 def _worker_process(queue, cls, method_name, scenario_cfg,
-                    context_cfg, aborted):
+                    context_cfg, aborted, output_queue):
 
     sequence = 1
 
@@ -66,7 +66,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
         errors = ""
 
         try:
-            method(data)
+            result = method(data)
         except AssertionError as assertion:
             # SLA validation failed in scenario, determine what to do now
             if sla_action == "assert":
@@ -77,6 +77,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
         except Exception as e:
             errors = traceback.format_exc()
             LOG.exception(e)
+        else:
+            if result:
+                output_queue.put(result)
 
         time.sleep(interval)
 
@@ -126,5 +129,5 @@ If the scenario ends before the time has elapsed, it will be started again.
         self.process = multiprocessing.Process(
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
-                  context_cfg, self.aborted))
+                  context_cfg, self.aborted, self.output_queue))
         self.process.start()
index 3963de8..973bb9a 100644 (file)
@@ -32,7 +32,7 @@ LOG = logging.getLogger(__name__)
 
 
 def _worker_process(queue, cls, method_name, scenario_cfg,
-                    context_cfg, aborted):
+                    context_cfg, aborted, output_queue):
 
     sequence = 1
 
@@ -71,7 +71,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             errors = ""
 
             try:
-                method(data)
+                result = method(data)
             except AssertionError as assertion:
                 # SLA validation failed in scenario, determine what to do now
                 if sla_action == "assert":
@@ -92,6 +92,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             except Exception as e:
                 errors = traceback.format_exc()
                 LOG.exception(e)
+            else:
+                if result:
+                    output_queue.put(result)
 
             time.sleep(interval)
 
@@ -142,5 +145,5 @@ If the scenario ends before the time has elapsed, it will be started again.
         self.process = multiprocessing.Process(
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
-                  context_cfg, self.aborted))
+                  context_cfg, self.aborted, self.output_queue))
         self.process.start()
index af87c00..74ff822 100644 (file)
@@ -33,7 +33,7 @@ LOG = logging.getLogger(__name__)
 
 
 def _worker_process(queue, cls, method_name, scenario_cfg,
-                    context_cfg, aborted):
+                    context_cfg, aborted, output_queue):
 
     sequence = 1
 
@@ -75,7 +75,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
         errors = ""
 
         try:
-            method(data)
+            result = method(data)
         except AssertionError as assertion:
             # SLA validation failed in scenario, determine what to do now
             if sla_action == "assert":
@@ -86,6 +86,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
         except Exception as e:
             errors = traceback.format_exc()
             LOG.exception(e)
+        else:
+            if result:
+                output_queue.put(result)
 
         time.sleep(interval)
 
@@ -137,5 +140,5 @@ class SequenceRunner(base.Runner):
         self.process = multiprocessing.Process(
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
-                  context_cfg, self.aborted))
+                  context_cfg, self.aborted, self.output_queue))
         self.process.start()