change ci from base-on-pod to base-on-scenario (in progress)
[yardstick.git] / yardstick / cmd / commands / task.py
old mode 100755 (executable)
new mode 100644 (file)
index 8db6e77..587f624
@@ -14,6 +14,10 @@ import os
 import yaml
 import atexit
 import ipaddress
+import time
+import logging
+import uuid
+from itertools import ifilter
 
 from yardstick.benchmark.contexts.base import Context
 from yardstick.benchmark.runners import base as base_runner
@@ -22,6 +26,7 @@ from yardstick.common.utils import cliargs
 
 output_file_default = "/tmp/yardstick.out"
 test_cases_dir_default = "tests/opnfv/test_cases/"
+LOG = logging.getLogger(__name__)
 
 
 class TaskCommands(object):
@@ -51,6 +56,7 @@ class TaskCommands(object):
 
         atexit.register(atexit_handler)
 
+        total_start_time = time.time()
         parser = TaskParser(args.inputfile[0])
 
         suite_params = {}
@@ -75,9 +81,16 @@ class TaskCommands(object):
             os.remove(args.output_file)
 
         for i in range(0, len(task_files)):
+            one_task_start_time = time.time()
             parser.path = task_files[i]
-            scenarios, run_in_parallel = parser.parse_task(task_args[i],
-                                                           task_args_fnames[i])
+            task_name = os.path.splitext(os.path.basename(task_files[i]))[0]
+            scenarios, run_in_parallel, meet_precondition = parser.parse_task(
+                task_name, task_args[i], task_args_fnames[i])
+
+            if not meet_precondition:
+                LOG.info("meet_precondition is %s, please check envrionment",
+                         meet_precondition)
+                continue
 
             self._run(scenarios, run_in_parallel, args.output_file)
 
@@ -89,6 +102,13 @@ class TaskCommands(object):
                 for context in Context.list:
                     context.undeploy()
                 Context.list = []
+            one_task_end_time = time.time()
+            LOG.info("task %s finished in %d secs", task_files[i],
+                     one_task_end_time - one_task_start_time)
+
+        total_end_time = time.time()
+        LOG.info("total finished in %d secs",
+                 total_end_time - total_start_time)
 
         print "Done, exiting"
 
@@ -97,11 +117,20 @@ class TaskCommands(object):
         for context in Context.list:
             context.deploy()
 
+        background_runners = []
+
+        # Start all background scenarios
+        for scenario in ifilter(_is_background_scenario, scenarios):
+            scenario["runner"] = dict(type="Duration", duration=1000000000)
+            runner = run_one_scenario(scenario, output_file)
+            background_runners.append(runner)
+
         runners = []
         if run_in_parallel:
             for scenario in scenarios:
-                runner = run_one_scenario(scenario, output_file)
-                runners.append(runner)
+                if not _is_background_scenario(scenario):
+                    runner = run_one_scenario(scenario, output_file)
+                    runners.append(runner)
 
             # Wait for runners to finish
             for runner in runners:
@@ -110,9 +139,25 @@ class TaskCommands(object):
         else:
             # run serially
             for scenario in scenarios:
-                runner = run_one_scenario(scenario, output_file)
+                if not _is_background_scenario(scenario):
+                    runner = run_one_scenario(scenario, output_file)
+                    runner_join(runner)
+                    print "Runner ended, output in", output_file
+
+        # Abort background runners
+        for runner in background_runners:
+            runner.abort()
+
+        # Wait for background runners to finish
+        for runner in background_runners:
+            if runner.join(timeout=60) is None:
+                # Nuke if it did not stop nicely
+                base_runner.Runner.terminate(runner)
                 runner_join(runner)
-                print "Runner ended, output in", output_file
+            else:
+                base_runner.Runner.release(runner)
+            print "Background task ended"
+
 
 # TODO: Move stuff below into TaskCommands class !?
 
@@ -162,7 +207,7 @@ class TaskParser(object):
 
         return suite_params
 
-    def parse_task(self, task_args=None, task_args_file=None):
+    def parse_task(self, task_name, task_args=None, task_args_file=None):
         '''parses the task file and return an context and scenario instances'''
         print "Parsing task config:", self.path
 
@@ -191,13 +236,16 @@ class TaskParser(object):
             sys.exit(ioerror)
 
         self._check_schema(cfg["schema"], "task")
+        meet_precondition = self._check_precondition(cfg)
 
         # TODO: support one or many contexts? Many would simpler and precise
         # TODO: support hybrid context type
         if "context" in cfg:
             context_cfgs = [cfg["context"]]
-        else:
+        elif "contexts" in cfg:
             context_cfgs = cfg["contexts"]
+        else:
+            context_cfgs = [{"type": "Dummy"}]
 
         for cfg_attrs in context_cfgs:
             context_type = cfg_attrs.get("type", "Heat")
@@ -211,8 +259,14 @@ class TaskParser(object):
 
         run_in_parallel = cfg.get("run_in_parallel", False)
 
+        # add tc and task id for influxdb extended tags
+        task_id = str(uuid.uuid4())
+        for scenario in cfg["scenarios"]:
+            scenario["tc"] = task_name
+            scenario["task_id"] = task_id
+
         # TODO we need something better here, a class that represent the file
-        return cfg["scenarios"], run_in_parallel
+        return cfg["scenarios"], run_in_parallel, meet_precondition
 
     def _check_schema(self, cfg_schema, schema_type):
         '''Check if config file is using the correct schema type'''
@@ -221,6 +275,38 @@ class TaskParser(object):
             sys.exit("error: file %s has unknown schema %s" % (self.path,
                                                                cfg_schema))
 
+    def _check_precondition(self, cfg):
+        '''Check if the envrionment meet the preconditon'''
+
+        if "precondition" in cfg:
+            precondition = cfg["precondition"]
+            installer_type = precondition.get("installer_type", None)
+            deploy_scenarios = precondition.get("deploy_scenarios", None)
+            tc_fit_pods = precondition.get("pod_name", None)
+            installer_type_env = os.environ.get('INSTALL_TYPE', None)
+            deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
+            pod_name_env = os.environ.get('NODE_NAME', None)
+
+            LOG.info("installer_type: %s, installer_type_env: %s",
+                     installer_type, installer_type_env)
+            LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
+                     deploy_scenarios, deploy_scenario_env)
+            LOG.info("tc_fit_pods: %s, pod_name_env: %s",
+                     tc_fit_pods, pod_name_env)
+            if installer_type and installer_type_env:
+                if installer_type_env not in installer_type:
+                    return False
+            if deploy_scenarios and deploy_scenario_env:
+                deploy_scenarios_list = deploy_scenarios.split(',')
+                for deploy_scenario in deploy_scenarios_list:
+                    if deploy_scenario_env.startswith(deploy_scenario):
+                        return True
+                return False
+            if tc_fit_pods and pod_name_env:
+                if pod_name_env not in tc_fit_pods:
+                    return False
+        return True
+
 
 def atexit_handler():
     '''handler for process termination'''
@@ -269,6 +355,13 @@ def _is_same_heat_context(host_attr, target_attr):
     return False
 
 
+def _is_background_scenario(scenario):
+    if "run_in_background" in scenario:
+        return scenario["run_in_background"]
+    else:
+        return False
+
+
 def run_one_scenario(scenario_cfg, output_file):
     '''run one scenario using context'''
     runner_cfg = scenario_cfg["runner"]
@@ -276,7 +369,8 @@ def run_one_scenario(scenario_cfg, output_file):
 
     # TODO support get multi hosts/vms info
     context_cfg = {}
-    context_cfg['host'] = Context.get_server(scenario_cfg["host"])
+    if "host" in scenario_cfg:
+        context_cfg['host'] = Context.get_server(scenario_cfg["host"])
 
     if "target" in scenario_cfg:
         if is_ip_addr(scenario_cfg["target"]):
@@ -292,6 +386,22 @@ def run_one_scenario(scenario_cfg, output_file):
                 context_cfg["target"]["ipaddr"] = \
                     context_cfg["target"]["ip"]
 
+    if "targets" in scenario_cfg:
+        ip_list = []
+        for target in scenario_cfg["targets"]:
+            if is_ip_addr(target):
+                ip_list.append(target)
+                context_cfg['target'] = {}
+            else:
+                context_cfg['target'] = Context.get_server(target)
+                if _is_same_heat_context(scenario_cfg["host"], target):
+                    ip_list.append(context_cfg["target"]["private_ip"])
+                else:
+                    ip_list.append(context_cfg["target"]["ip"])
+        context_cfg['target']['ipaddr'] = ','.join(ip_list)
+
+    if "nodes" in scenario_cfg:
+        context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
     runner = base_runner.Runner.get(runner_cfg)
 
     print "Starting runner of type '%s'" % runner_cfg["type"]
@@ -300,6 +410,17 @@ def run_one_scenario(scenario_cfg, output_file):
     return runner
 
 
+def parse_nodes_with_context(scenario_cfg):
+    '''paras the 'nodes' fields in scenario '''
+    nodes = scenario_cfg["nodes"]
+
+    nodes_cfg = {}
+    for nodename in nodes:
+        nodes_cfg[nodename] = Context.get_server(nodes[nodename])
+
+    return nodes_cfg
+
+
 def runner_join(runner):
     '''join (wait for) a runner, exit process at runner failure'''
     status = runner.join()