task: drain background runner queues 37/44837/1
authorRoss Brattain <ross.b.brattain@intel.com>
Wed, 11 Oct 2017 20:30:14 +0000 (13:30 -0700)
committerRoss Brattain <ross.b.brattain@intel.com>
Wed, 11 Oct 2017 20:34:12 +0000 (13:34 -0700)
We were not draining the queues in the background
runners.

Modify the main runner_join loop to loop over
all the runners and drain them.

The runner join method does extra work for periodic actions
so we can't change its behavior.

Instead create a new poll() method and use that
to check runner status

Change-Id: I9466ba40a6a4c45c82cedff279cbb4817c6b66ad
Signed-off-by: Ross Brattain <ross.b.brattain@intel.com>
yardstick/benchmark/core/task.py
yardstick/benchmark/runners/base.py

index 53298d8..1512ca7 100644 (file)
@@ -39,7 +39,6 @@ output_file_default = "/tmp/yardstick.out"
 config_file = '/etc/yardstick/yardstick.conf'
 test_cases_dir_default = "tests/opnfv/test_cases/"
 LOG = logging.getLogger(__name__)
-JOIN_TIMEOUT = 60
 
 
 class Task(object):     # pragma: no cover
@@ -260,7 +259,7 @@ class Task(object):     # pragma: no cover
 
             # Wait for runners to finish
             for runner in runners:
-                status = runner_join(runner, self.outputs, result)
+                status = runner_join(runner, background_runners, self.outputs, result)
                 if status != 0:
                     raise RuntimeError(
                         "{0} runner status {1}".format(runner.__execution_type__, status))
@@ -270,7 +269,7 @@ class Task(object):     # pragma: no cover
             for scenario in scenarios:
                 if not _is_background_scenario(scenario):
                     runner = self.run_one_scenario(scenario, output_file)
-                    status = runner_join(runner, self.outputs, result)
+                    status = runner_join(runner, background_runners, self.outputs, result)
                     if status != 0:
                         LOG.error('Scenario NO.%s: "%s" ERROR!',
                                   scenarios.index(scenario) + 1,
@@ -285,11 +284,11 @@ class Task(object):     # pragma: no cover
 
         # Wait for background runners to finish
         for runner in background_runners:
-            status = runner.join(self.outputs, result, JOIN_TIMEOUT)
+            status = runner.join(self.outputs, result)
             if status is None:
                 # Nuke if it did not stop nicely
                 base_runner.Runner.terminate(runner)
-                runner.join(self.outputs, result, JOIN_TIMEOUT)
+                runner.join(self.outputs, result)
             base_runner.Runner.release(runner)
 
             print("Background task ended")
@@ -641,13 +640,22 @@ def get_networks_from_nodes(nodes):
     return networks
 
 
-def runner_join(runner, outputs, result):
+def runner_join(runner, background_runners, outputs, result):
     """join (wait for) a runner, exit process at runner failure
+    :param background_runners:
+    :type background_runners:
     :param outputs:
     :type outputs: dict
     :param result:
     :type result: list
     """
+    while runner.poll() is None:
+        outputs.update(runner.get_output())
+        result.extend(runner.get_result())
+        # drain all the background runner queues
+        for background in background_runners:
+            outputs.update(background.get_output())
+            result.extend(background.get_result())
     status = runner.join(outputs, result)
     base_runner.Runner.release(runner)
     return status
index 13718d7..a887fa5 100755 (executable)
@@ -210,6 +210,10 @@ class Runner(object):
 
     QUEUE_JOIN_INTERVAL = 5
 
+    def poll(self, timeout=QUEUE_JOIN_INTERVAL):
+        self.process.join(timeout)
+        return self.process.exitcode
+
     def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
         while self.process.exitcode is None:
             # drain the queue while we are running otherwise we won't terminate