Merge "convert networkcapacity to LF"
[yardstick.git] / yardstick / cmd / commands / task.py
old mode 100755 (executable)
new mode 100644 (file)
index d6cd698..a10a2a8
@@ -16,6 +16,9 @@ import atexit
 import ipaddress
 import time
 import logging
+import uuid
+from itertools import ifilter
+
 from yardstick.benchmark.contexts.base import Context
 from yardstick.benchmark.runners import base as base_runner
 from yardstick.common.task_template import TaskTemplate
@@ -48,7 +51,7 @@ class TaskCommands(object):
              output_file_default, default=output_file_default)
     @cliargs("--suite", help="process test suite file instead of a task file",
              action="store_true")
-    def do_start(self, args):
+    def do_start(self, args, **kwargs):
         '''Start a benchmark scenario.'''
 
         atexit.register(atexit_handler)
@@ -56,32 +59,34 @@ class TaskCommands(object):
         total_start_time = time.time()
         parser = TaskParser(args.inputfile[0])
 
-        suite_params = {}
         if args.suite:
-            suite_params = parser.parse_suite()
-            test_cases_dir = suite_params["test_cases_dir"]
-            if test_cases_dir[-1] != os.sep:
-                test_cases_dir += os.sep
-            task_files = [test_cases_dir + task
-                          for task in suite_params["task_fnames"]]
+            # 1.parse suite, return suite_params info
+            task_files, task_args, task_args_fnames = \
+                parser.parse_suite()
         else:
             task_files = [parser.path]
+            task_args = [args.task_args]
+            task_args_fnames = [args.task_args_file]
 
-        task_args = suite_params.get("task_args", [args.task_args])
-        task_args_fnames = suite_params.get("task_args_fnames",
-                                            [args.task_args_file])
+        LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
+                 task_files, task_args, task_args_fnames)
 
         if args.parse_only:
             sys.exit(0)
 
         if os.path.isfile(args.output_file):
             os.remove(args.output_file)
-
+        # parse task_files
         for i in range(0, len(task_files)):
             one_task_start_time = time.time()
             parser.path = task_files[i]
-            scenarios, run_in_parallel = parser.parse_task(task_args[i],
-                                                           task_args_fnames[i])
+            scenarios, run_in_parallel, meet_precondition = parser.parse_task(
+                 task_args[i], task_args_fnames[i])
+
+            if not meet_precondition:
+                LOG.info("meet_precondition is %s, please check envrionment",
+                         meet_precondition)
+                continue
 
             self._run(scenarios, run_in_parallel, args.output_file)
 
@@ -108,11 +113,20 @@ class TaskCommands(object):
         for context in Context.list:
             context.deploy()
 
+        background_runners = []
+
+        # Start all background scenarios
+        for scenario in ifilter(_is_background_scenario, scenarios):
+            scenario["runner"] = dict(type="Duration", duration=1000000000)
+            runner = run_one_scenario(scenario, output_file)
+            background_runners.append(runner)
+
         runners = []
         if run_in_parallel:
             for scenario in scenarios:
-                runner = run_one_scenario(scenario, output_file)
-                runners.append(runner)
+                if not _is_background_scenario(scenario):
+                    runner = run_one_scenario(scenario, output_file)
+                    runners.append(runner)
 
             # Wait for runners to finish
             for runner in runners:
@@ -121,9 +135,25 @@ class TaskCommands(object):
         else:
             # run serially
             for scenario in scenarios:
-                runner = run_one_scenario(scenario, output_file)
+                if not _is_background_scenario(scenario):
+                    runner = run_one_scenario(scenario, output_file)
+                    runner_join(runner)
+                    print "Runner ended, output in", output_file
+
+        # Abort background runners
+        for runner in background_runners:
+            runner.abort()
+
+        # Wait for background runners to finish
+        for runner in background_runners:
+            if runner.join(timeout=60) is None:
+                # Nuke if it did not stop nicely
+                base_runner.Runner.terminate(runner)
                 runner_join(runner)
-                print "Runner ended, output in", output_file
+            else:
+                base_runner.Runner.release(runner)
+            print "Background task ended"
+
 
 # TODO: Move stuff below into TaskCommands class !?
 
@@ -133,10 +163,34 @@ class TaskParser(object):
     def __init__(self, path):
         self.path = path
 
+    def _meet_constraint(self, task, cur_pod, cur_installer):
+        if "constraint" in task:
+            constraint = task.get('constraint', None)
+            if constraint is not None:
+                tc_fit_pod = constraint.get('pod', None)
+                tc_fit_installer = constraint.get('installer', None)
+                LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
+                         cur_pod, cur_installer, constraint)
+                if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
+                    return False
+                if cur_installer and tc_fit_installer and \
+                        cur_installer not in tc_fit_installer:
+                    return False
+        return True
+
+    def _get_task_para(self, task, cur_pod):
+        task_args = task.get('task_args', None)
+        if task_args is not None:
+            task_args = task_args.get(cur_pod, None)
+        task_args_fnames = task.get('task_args_fnames', None)
+        if task_args_fnames is not None:
+            task_args_fnames = task_args_fnames.get(cur_pod, None)
+        return task_args, task_args_fnames
+
     def parse_suite(self):
         '''parse the suite file and return a list of task config file paths
            and lists of optional parameters if present'''
-        print "Parsing suite file:", self.path
+        LOG.info("\nParsing suite file:%s", self.path)
 
         try:
             with open(self.path) as stream:
@@ -145,33 +199,38 @@ class TaskParser(object):
             sys.exit(ioerror)
 
         self._check_schema(cfg["schema"], "suite")
-        print "Starting suite:", cfg["name"]
+        LOG.info("\nStarting scenario:%s", cfg["name"])
 
         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
-        task_fnames = []
-        task_args = []
-        task_args_fnames = []
+        if test_cases_dir[-1] != os.sep:
+            test_cases_dir += os.sep
+
+        cur_pod = os.environ.get('NODE_NAME', None)
+        cur_installer = os.environ.get('INSTALLER_TYPE', None)
+
+        valid_task_files = []
+        valid_task_args = []
+        valid_task_args_fnames = []
 
         for task in cfg["test_cases"]:
-            task_fnames.append(task["file_name"])
-            if "task_args" in task:
-                task_args.append(task["task_args"])
+            # 1.check file_name
+            if "file_name" in task:
+                task_fname = task.get('file_name', None)
+                if task_fname is None:
+                    continue
             else:
-                task_args.append(None)
-
-            if "task_args_file" in task:
-                task_args_fnames.append(task["task_args_file"])
+                continue
+            # 2.check constraint
+            if self._meet_constraint(task, cur_pod, cur_installer):
+                valid_task_files.append(test_cases_dir + task_fname)
             else:
-                task_args_fnames.append(None)
-
-        suite_params = {
-            "test_cases_dir": test_cases_dir,
-            "task_fnames": task_fnames,
-            "task_args": task_args,
-            "task_args_fnames": task_args_fnames
-        }
+                continue
+            # 3.fetch task parameters
+            task_args, task_args_fnames = self._get_task_para(task, cur_pod)
+            valid_task_args.append(task_args)
+            valid_task_args_fnames.append(task_args_fnames)
 
-        return suite_params
+        return valid_task_files, valid_task_args, valid_task_args_fnames
 
     def parse_task(self, task_args=None, task_args_file=None):
         '''parses the task file and return an context and scenario instances'''
@@ -202,28 +261,44 @@ class TaskParser(object):
             sys.exit(ioerror)
 
         self._check_schema(cfg["schema"], "task")
+        meet_precondition = self._check_precondition(cfg)
 
         # TODO: support one or many contexts? Many would simpler and precise
         # TODO: support hybrid context type
         if "context" in cfg:
             context_cfgs = [cfg["context"]]
-        else:
+        elif "contexts" in cfg:
             context_cfgs = cfg["contexts"]
+        else:
+            context_cfgs = [{"type": "Dummy"}]
 
         for cfg_attrs in context_cfgs:
             context_type = cfg_attrs.get("type", "Heat")
             if "Heat" == context_type and "networks" in cfg_attrs:
+                # bugfix: if there are more than one network,
+                # only add "external_network" on first one.
+                # the name of netwrok should follow this rule:
+                # test, test2, test3 ...
+                # sort network with the length of network's name
+                sorted_networks = sorted(cfg_attrs["networks"].keys())
                 # config external_network based on env var
-                for _, attrs in cfg_attrs["networks"].items():
-                    attrs["external_network"] = os.environ.get(
-                        'EXTERNAL_NETWORK', 'net04_ext')
+                cfg_attrs["networks"][sorted_networks[0]]["external_network"] \
+                    = os.environ.get("EXTERNAL_NETWORK", "net04_ext")
+
             context = Context.get(context_type)
             context.init(cfg_attrs)
 
         run_in_parallel = cfg.get("run_in_parallel", False)
 
+        # add tc and task id for influxdb extended tags
+        task_id = str(uuid.uuid4())
+        for scenario in cfg["scenarios"]:
+            task_name = os.path.splitext(os.path.basename(self.path))[0]
+            scenario["tc"] = task_name
+            scenario["task_id"] = task_id
+
         # TODO we need something better here, a class that represent the file
-        return cfg["scenarios"], run_in_parallel
+        return cfg["scenarios"], run_in_parallel, meet_precondition
 
     def _check_schema(self, cfg_schema, schema_type):
         '''Check if config file is using the correct schema type'''
@@ -232,6 +307,38 @@ class TaskParser(object):
             sys.exit("error: file %s has unknown schema %s" % (self.path,
                                                                cfg_schema))
 
+    def _check_precondition(self, cfg):
+        '''Check if the envrionment meet the preconditon'''
+
+        if "precondition" in cfg:
+            precondition = cfg["precondition"]
+            installer_type = precondition.get("installer_type", None)
+            deploy_scenarios = precondition.get("deploy_scenarios", None)
+            tc_fit_pods = precondition.get("pod_name", None)
+            installer_type_env = os.environ.get('INSTALL_TYPE', None)
+            deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
+            pod_name_env = os.environ.get('NODE_NAME', None)
+
+            LOG.info("installer_type: %s, installer_type_env: %s",
+                     installer_type, installer_type_env)
+            LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
+                     deploy_scenarios, deploy_scenario_env)
+            LOG.info("tc_fit_pods: %s, pod_name_env: %s",
+                     tc_fit_pods, pod_name_env)
+            if installer_type and installer_type_env:
+                if installer_type_env not in installer_type:
+                    return False
+            if deploy_scenarios and deploy_scenario_env:
+                deploy_scenarios_list = deploy_scenarios.split(',')
+                for deploy_scenario in deploy_scenarios_list:
+                    if deploy_scenario_env.startswith(deploy_scenario):
+                        return True
+                return False
+            if tc_fit_pods and pod_name_env:
+                if pod_name_env not in tc_fit_pods:
+                    return False
+        return True
+
 
 def atexit_handler():
     '''handler for process termination'''
@@ -280,6 +387,13 @@ def _is_same_heat_context(host_attr, target_attr):
     return False
 
 
+def _is_background_scenario(scenario):
+    if "run_in_background" in scenario:
+        return scenario["run_in_background"]
+    else:
+        return False
+
+
 def run_one_scenario(scenario_cfg, output_file):
     '''run one scenario using context'''
     runner_cfg = scenario_cfg["runner"]
@@ -304,6 +418,20 @@ def run_one_scenario(scenario_cfg, output_file):
                 context_cfg["target"]["ipaddr"] = \
                     context_cfg["target"]["ip"]
 
+    if "targets" in scenario_cfg:
+        ip_list = []
+        for target in scenario_cfg["targets"]:
+            if is_ip_addr(target):
+                ip_list.append(target)
+                context_cfg['target'] = {}
+            else:
+                context_cfg['target'] = Context.get_server(target)
+                if _is_same_heat_context(scenario_cfg["host"], target):
+                    ip_list.append(context_cfg["target"]["private_ip"])
+                else:
+                    ip_list.append(context_cfg["target"]["ip"])
+        context_cfg['target']['ipaddr'] = ','.join(ip_list)
+
     if "nodes" in scenario_cfg:
         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
     runner = base_runner.Runner.get(runner_cfg)