Merge "convert networkcapacity to LF"
[yardstick.git] / yardstick / cmd / commands / task.py
old mode 100755 (executable)
new mode 100644 (file)
index 5c25c57..a10a2a8
@@ -13,16 +13,20 @@ import sys
 import os
 import yaml
 import atexit
-import pkg_resources
 import ipaddress
+import time
+import logging
+import uuid
+from itertools import ifilter
 
-from yardstick.benchmark.context.model import Context
+from yardstick.benchmark.contexts.base import Context
 from yardstick.benchmark.runners import base as base_runner
 from yardstick.common.task_template import TaskTemplate
 from yardstick.common.utils import cliargs
 
 output_file_default = "/tmp/yardstick.out"
 test_cases_dir_default = "tests/opnfv/test_cases/"
+LOG = logging.getLogger(__name__)
 
 
 class TaskCommands(object):
@@ -47,38 +51,42 @@ class TaskCommands(object):
              output_file_default, default=output_file_default)
     @cliargs("--suite", help="process test suite file instead of a task file",
              action="store_true")
-    def do_start(self, args):
+    def do_start(self, args, **kwargs):
         '''Start a benchmark scenario.'''
 
         atexit.register(atexit_handler)
 
+        total_start_time = time.time()
         parser = TaskParser(args.inputfile[0])
 
-        suite_params = {}
         if args.suite:
-            suite_params = parser.parse_suite()
-            test_cases_dir = suite_params["test_cases_dir"]
-            if test_cases_dir[-1] != os.sep:
-                test_cases_dir += os.sep
-            task_files = [test_cases_dir + task
-                          for task in suite_params["task_fnames"]]
+            # 1.parse suite, return suite_params info
+            task_files, task_args, task_args_fnames = \
+                parser.parse_suite()
         else:
             task_files = [parser.path]
+            task_args = [args.task_args]
+            task_args_fnames = [args.task_args_file]
 
-        task_args = suite_params.get("task_args", [args.task_args])
-        task_args_fnames = suite_params.get("task_args_fnames",
-                                            [args.task_args_file])
+        LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
+                 task_files, task_args, task_args_fnames)
 
         if args.parse_only:
             sys.exit(0)
 
         if os.path.isfile(args.output_file):
             os.remove(args.output_file)
-
+        # parse task_files
         for i in range(0, len(task_files)):
+            one_task_start_time = time.time()
             parser.path = task_files[i]
-            scenarios, run_in_parallel = parser.parse_task(task_args[i],
-                                                           task_args_fnames[i])
+            scenarios, run_in_parallel, meet_precondition = parser.parse_task(
+                 task_args[i], task_args_fnames[i])
+
+            if not meet_precondition:
+                LOG.info("meet_precondition is %s, please check envrionment",
+                         meet_precondition)
+                continue
 
             self._run(scenarios, run_in_parallel, args.output_file)
 
@@ -90,6 +98,13 @@ class TaskCommands(object):
                 for context in Context.list:
                     context.undeploy()
                 Context.list = []
+            one_task_end_time = time.time()
+            LOG.info("task %s finished in %d secs", task_files[i],
+                     one_task_end_time - one_task_start_time)
+
+        total_end_time = time.time()
+        LOG.info("total finished in %d secs",
+                 total_end_time - total_start_time)
 
         print "Done, exiting"
 
@@ -98,11 +113,20 @@ class TaskCommands(object):
         for context in Context.list:
             context.deploy()
 
+        background_runners = []
+
+        # Start all background scenarios
+        for scenario in ifilter(_is_background_scenario, scenarios):
+            scenario["runner"] = dict(type="Duration", duration=1000000000)
+            runner = run_one_scenario(scenario, output_file)
+            background_runners.append(runner)
+
         runners = []
         if run_in_parallel:
             for scenario in scenarios:
-                runner = run_one_scenario(scenario, output_file)
-                runners.append(runner)
+                if not _is_background_scenario(scenario):
+                    runner = run_one_scenario(scenario, output_file)
+                    runners.append(runner)
 
             # Wait for runners to finish
             for runner in runners:
@@ -111,9 +135,25 @@ class TaskCommands(object):
         else:
             # run serially
             for scenario in scenarios:
-                runner = run_one_scenario(scenario, output_file)
+                if not _is_background_scenario(scenario):
+                    runner = run_one_scenario(scenario, output_file)
+                    runner_join(runner)
+                    print "Runner ended, output in", output_file
+
+        # Abort background runners
+        for runner in background_runners:
+            runner.abort()
+
+        # Wait for background runners to finish
+        for runner in background_runners:
+            if runner.join(timeout=60) is None:
+                # Nuke if it did not stop nicely
+                base_runner.Runner.terminate(runner)
                 runner_join(runner)
-                print "Runner ended, output in", output_file
+            else:
+                base_runner.Runner.release(runner)
+            print "Background task ended"
+
 
 # TODO: Move stuff below into TaskCommands class !?
 
@@ -123,10 +163,34 @@ class TaskParser(object):
     def __init__(self, path):
         self.path = path
 
+    def _meet_constraint(self, task, cur_pod, cur_installer):
+        if "constraint" in task:
+            constraint = task.get('constraint', None)
+            if constraint is not None:
+                tc_fit_pod = constraint.get('pod', None)
+                tc_fit_installer = constraint.get('installer', None)
+                LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
+                         cur_pod, cur_installer, constraint)
+                if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
+                    return False
+                if cur_installer and tc_fit_installer and \
+                        cur_installer not in tc_fit_installer:
+                    return False
+        return True
+
+    def _get_task_para(self, task, cur_pod):
+        task_args = task.get('task_args', None)
+        if task_args is not None:
+            task_args = task_args.get(cur_pod, None)
+        task_args_fnames = task.get('task_args_fnames', None)
+        if task_args_fnames is not None:
+            task_args_fnames = task_args_fnames.get(cur_pod, None)
+        return task_args, task_args_fnames
+
     def parse_suite(self):
         '''parse the suite file and return a list of task config file paths
            and lists of optional parameters if present'''
-        print "Parsing suite file:", self.path
+        LOG.info("\nParsing suite file:%s", self.path)
 
         try:
             with open(self.path) as stream:
@@ -135,33 +199,38 @@ class TaskParser(object):
             sys.exit(ioerror)
 
         self._check_schema(cfg["schema"], "suite")
-        print "Starting suite:", cfg["name"]
+        LOG.info("\nStarting scenario:%s", cfg["name"])
 
         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
-        task_fnames = []
-        task_args = []
-        task_args_fnames = []
+        if test_cases_dir[-1] != os.sep:
+            test_cases_dir += os.sep
+
+        cur_pod = os.environ.get('NODE_NAME', None)
+        cur_installer = os.environ.get('INSTALLER_TYPE', None)
+
+        valid_task_files = []
+        valid_task_args = []
+        valid_task_args_fnames = []
 
         for task in cfg["test_cases"]:
-            task_fnames.append(task["file_name"])
-            if "task_args" in task:
-                task_args.append(task["task_args"])
+            # 1.check file_name
+            if "file_name" in task:
+                task_fname = task.get('file_name', None)
+                if task_fname is None:
+                    continue
             else:
-                task_args.append(None)
-
-            if "task_args_file" in task:
-                task_args_fnames.append(task["task_args_file"])
+                continue
+            # 2.check constraint
+            if self._meet_constraint(task, cur_pod, cur_installer):
+                valid_task_files.append(test_cases_dir + task_fname)
             else:
-                task_args_fnames.append(None)
-
-        suite_params = {
-            "test_cases_dir": test_cases_dir,
-            "task_fnames": task_fnames,
-            "task_args": task_args,
-            "task_args_fnames": task_args_fnames
-        }
+                continue
+            # 3.fetch task parameters
+            task_args, task_args_fnames = self._get_task_para(task, cur_pod)
+            valid_task_args.append(task_args)
+            valid_task_args_fnames.append(task_args_fnames)
 
-        return suite_params
+        return valid_task_files, valid_task_args, valid_task_args_fnames
 
     def parse_task(self, task_args=None, task_args_file=None):
         '''parses the task file and return an context and scenario instances'''
@@ -192,26 +261,44 @@ class TaskParser(object):
             sys.exit(ioerror)
 
         self._check_schema(cfg["schema"], "task")
+        meet_precondition = self._check_precondition(cfg)
 
         # TODO: support one or many contexts? Many would simpler and precise
+        # TODO: support hybrid context type
         if "context" in cfg:
             context_cfgs = [cfg["context"]]
-        else:
+        elif "contexts" in cfg:
             context_cfgs = cfg["contexts"]
+        else:
+            context_cfgs = [{"type": "Dummy"}]
 
         for cfg_attrs in context_cfgs:
-            # config external_network based on env var
-            if "networks" in cfg_attrs:
-                for _, attrs in cfg_attrs["networks"].items():
-                    attrs["external_network"] = os.environ.get(
-                        'EXTERNAL_NETWORK', 'net04_ext')
-            context = Context()
+            context_type = cfg_attrs.get("type", "Heat")
+            if "Heat" == context_type and "networks" in cfg_attrs:
+                # bugfix: if there are more than one network,
+                # only add "external_network" on first one.
+                # the name of netwrok should follow this rule:
+                # test, test2, test3 ...
+                # sort network with the length of network's name
+                sorted_networks = sorted(cfg_attrs["networks"].keys())
+                # config external_network based on env var
+                cfg_attrs["networks"][sorted_networks[0]]["external_network"] \
+                    = os.environ.get("EXTERNAL_NETWORK", "net04_ext")
+
+            context = Context.get(context_type)
             context.init(cfg_attrs)
 
         run_in_parallel = cfg.get("run_in_parallel", False)
 
+        # add tc and task id for influxdb extended tags
+        task_id = str(uuid.uuid4())
+        for scenario in cfg["scenarios"]:
+            task_name = os.path.splitext(os.path.basename(self.path))[0]
+            scenario["tc"] = task_name
+            scenario["task_id"] = task_id
+
         # TODO we need something better here, a class that represent the file
-        return cfg["scenarios"], run_in_parallel
+        return cfg["scenarios"], run_in_parallel, meet_precondition
 
     def _check_schema(self, cfg_schema, schema_type):
         '''Check if config file is using the correct schema type'''
@@ -220,6 +307,38 @@ class TaskParser(object):
             sys.exit("error: file %s has unknown schema %s" % (self.path,
                                                                cfg_schema))
 
+    def _check_precondition(self, cfg):
+        '''Check if the envrionment meet the preconditon'''
+
+        if "precondition" in cfg:
+            precondition = cfg["precondition"]
+            installer_type = precondition.get("installer_type", None)
+            deploy_scenarios = precondition.get("deploy_scenarios", None)
+            tc_fit_pods = precondition.get("pod_name", None)
+            installer_type_env = os.environ.get('INSTALL_TYPE', None)
+            deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
+            pod_name_env = os.environ.get('NODE_NAME', None)
+
+            LOG.info("installer_type: %s, installer_type_env: %s",
+                     installer_type, installer_type_env)
+            LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
+                     deploy_scenarios, deploy_scenario_env)
+            LOG.info("tc_fit_pods: %s, pod_name_env: %s",
+                     tc_fit_pods, pod_name_env)
+            if installer_type and installer_type_env:
+                if installer_type_env not in installer_type:
+                    return False
+            if deploy_scenarios and deploy_scenario_env:
+                deploy_scenarios_list = deploy_scenarios.split(',')
+                for deploy_scenario in deploy_scenarios_list:
+                    if deploy_scenario_env.startswith(deploy_scenario):
+                        return True
+                return False
+            if tc_fit_pods and pod_name_env:
+                if pod_name_env not in tc_fit_pods:
+                    return False
+        return True
+
 
 def atexit_handler():
     '''handler for process termination'''
@@ -240,45 +359,100 @@ def is_ip_addr(addr):
         return False
 
 
-def run_one_scenario(scenario_cfg, output_file):
-    '''run one scenario using context'''
-    key_filename = pkg_resources.resource_filename(
-        'yardstick.resources', 'files/yardstick_key')
+def _is_same_heat_context(host_attr, target_attr):
+    '''check if two servers are in the same heat context
+    host_attr: either a name for a server created by yardstick or a dict
+    with attribute name mapping when using external heat templates
+    target_attr: either a name for a server created by yardstick or a dict
+    with attribute name mapping when using external heat templates
+    '''
+    host = None
+    target = None
+    for context in Context.list:
+        if context.__context_type__ != "Heat":
+            continue
+
+        host = context._get_server(host_attr)
+        if host is None:
+            continue
+
+        target = context._get_server(target_attr)
+        if target is None:
+            return False
+
+        # Both host and target is not None, then they are in the
+        # same heat context.
+        return True
+
+    return False
 
-    host = Context.get_server(scenario_cfg["host"])
 
+def _is_background_scenario(scenario):
+    if "run_in_background" in scenario:
+        return scenario["run_in_background"]
+    else:
+        return False
+
+
+def run_one_scenario(scenario_cfg, output_file):
+    '''run one scenario using context'''
     runner_cfg = scenario_cfg["runner"]
-    runner_cfg['host'] = host.public_ip
-    runner_cfg['user'] = host.context.user
-    runner_cfg['key_filename'] = key_filename
     runner_cfg['output_filename'] = output_file
 
+    # TODO support get multi hosts/vms info
+    context_cfg = {}
+    if "host" in scenario_cfg:
+        context_cfg['host'] = Context.get_server(scenario_cfg["host"])
+
     if "target" in scenario_cfg:
         if is_ip_addr(scenario_cfg["target"]):
-            scenario_cfg["ipaddr"] = scenario_cfg["target"]
+            context_cfg['target'] = {}
+            context_cfg['target']["ipaddr"] = scenario_cfg["target"]
         else:
-            target = Context.get_server(scenario_cfg["target"])
-
-            # get public IP for target server, some scenarios require it
-            if target.public_ip:
-                runner_cfg['target'] = target.public_ip
-
-            # TODO scenario_cfg["ipaddr"] is bad naming
-            if host.context != target.context:
-                # target is in another context, get its public IP
-                scenario_cfg["ipaddr"] = target.public_ip
+            context_cfg['target'] = Context.get_server(scenario_cfg["target"])
+            if _is_same_heat_context(scenario_cfg["host"],
+                                     scenario_cfg["target"]):
+                context_cfg["target"]["ipaddr"] = \
+                    context_cfg["target"]["private_ip"]
             else:
-                # target is in the same context, get its private IP
-                scenario_cfg["ipaddr"] = target.private_ip
-
+                context_cfg["target"]["ipaddr"] = \
+                    context_cfg["target"]["ip"]
+
+    if "targets" in scenario_cfg:
+        ip_list = []
+        for target in scenario_cfg["targets"]:
+            if is_ip_addr(target):
+                ip_list.append(target)
+                context_cfg['target'] = {}
+            else:
+                context_cfg['target'] = Context.get_server(target)
+                if _is_same_heat_context(scenario_cfg["host"], target):
+                    ip_list.append(context_cfg["target"]["private_ip"])
+                else:
+                    ip_list.append(context_cfg["target"]["ip"])
+        context_cfg['target']['ipaddr'] = ','.join(ip_list)
+
+    if "nodes" in scenario_cfg:
+        context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
     runner = base_runner.Runner.get(runner_cfg)
 
     print "Starting runner of type '%s'" % runner_cfg["type"]
-    runner.run(scenario_cfg["type"], scenario_cfg)
+    runner.run(scenario_cfg, context_cfg)
 
     return runner
 
 
+def parse_nodes_with_context(scenario_cfg):
+    '''paras the 'nodes' fields in scenario '''
+    nodes = scenario_cfg["nodes"]
+
+    nodes_cfg = {}
+    for nodename in nodes:
+        nodes_cfg[nodename] = Context.get_server(nodes[nodename])
+
+    return nodes_cfg
+
+
 def runner_join(runner):
     '''join (wait for) a runner, exit process at runner failure'''
     status = runner.join()