X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fcmd%2Fcommands%2Ftask.py;h=a10a2a8a3e6becff00d49580caf7ba23718bf8b1;hb=911c3a3729da77386848d5e9843c2fe91a965939;hp=5c25c576a2e0e98bd07bdf110fede1c89e387055;hpb=976b99c1c8ed80210d37e418f630f9c64879cd63;p=yardstick.git diff --git a/yardstick/cmd/commands/task.py b/yardstick/cmd/commands/task.py old mode 100755 new mode 100644 index 5c25c576a..a10a2a8a3 --- a/yardstick/cmd/commands/task.py +++ b/yardstick/cmd/commands/task.py @@ -13,16 +13,20 @@ import sys import os import yaml import atexit -import pkg_resources import ipaddress +import time +import logging +import uuid +from itertools import ifilter -from yardstick.benchmark.context.model import Context +from yardstick.benchmark.contexts.base import Context from yardstick.benchmark.runners import base as base_runner from yardstick.common.task_template import TaskTemplate from yardstick.common.utils import cliargs output_file_default = "/tmp/yardstick.out" test_cases_dir_default = "tests/opnfv/test_cases/" +LOG = logging.getLogger(__name__) class TaskCommands(object): @@ -47,38 +51,42 @@ class TaskCommands(object): output_file_default, default=output_file_default) @cliargs("--suite", help="process test suite file instead of a task file", action="store_true") - def do_start(self, args): + def do_start(self, args, **kwargs): '''Start a benchmark scenario.''' atexit.register(atexit_handler) + total_start_time = time.time() parser = TaskParser(args.inputfile[0]) - suite_params = {} if args.suite: - suite_params = parser.parse_suite() - test_cases_dir = suite_params["test_cases_dir"] - if test_cases_dir[-1] != os.sep: - test_cases_dir += os.sep - task_files = [test_cases_dir + task - for task in suite_params["task_fnames"]] + # 1.parse suite, return suite_params info + task_files, task_args, task_args_fnames = \ + parser.parse_suite() else: task_files = [parser.path] + task_args = [args.task_args] + task_args_fnames = [args.task_args_file] - task_args = suite_params.get("task_args", [args.task_args]) - task_args_fnames = suite_params.get("task_args_fnames", - [args.task_args_file]) + LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s", + task_files, task_args, task_args_fnames) if args.parse_only: sys.exit(0) if os.path.isfile(args.output_file): os.remove(args.output_file) - + # parse task_files for i in range(0, len(task_files)): + one_task_start_time = time.time() parser.path = task_files[i] - scenarios, run_in_parallel = parser.parse_task(task_args[i], - task_args_fnames[i]) + scenarios, run_in_parallel, meet_precondition = parser.parse_task( + task_args[i], task_args_fnames[i]) + + if not meet_precondition: + LOG.info("meet_precondition is %s, please check envrionment", + meet_precondition) + continue self._run(scenarios, run_in_parallel, args.output_file) @@ -90,6 +98,13 @@ class TaskCommands(object): for context in Context.list: context.undeploy() Context.list = [] + one_task_end_time = time.time() + LOG.info("task %s finished in %d secs", task_files[i], + one_task_end_time - one_task_start_time) + + total_end_time = time.time() + LOG.info("total finished in %d secs", + total_end_time - total_start_time) print "Done, exiting" @@ -98,11 +113,20 @@ class TaskCommands(object): for context in Context.list: context.deploy() + background_runners = [] + + # Start all background scenarios + for scenario in ifilter(_is_background_scenario, scenarios): + scenario["runner"] = dict(type="Duration", duration=1000000000) + runner = run_one_scenario(scenario, output_file) + background_runners.append(runner) + runners = [] if run_in_parallel: for scenario in scenarios: - runner = run_one_scenario(scenario, output_file) - runners.append(runner) + if not _is_background_scenario(scenario): + runner = run_one_scenario(scenario, output_file) + runners.append(runner) # Wait for runners to finish for runner in runners: @@ -111,9 +135,25 @@ class TaskCommands(object): else: # run serially for scenario in scenarios: - runner = run_one_scenario(scenario, output_file) + if not _is_background_scenario(scenario): + runner = run_one_scenario(scenario, output_file) + runner_join(runner) + print "Runner ended, output in", output_file + + # Abort background runners + for runner in background_runners: + runner.abort() + + # Wait for background runners to finish + for runner in background_runners: + if runner.join(timeout=60) is None: + # Nuke if it did not stop nicely + base_runner.Runner.terminate(runner) runner_join(runner) - print "Runner ended, output in", output_file + else: + base_runner.Runner.release(runner) + print "Background task ended" + # TODO: Move stuff below into TaskCommands class !? @@ -123,10 +163,34 @@ class TaskParser(object): def __init__(self, path): self.path = path + def _meet_constraint(self, task, cur_pod, cur_installer): + if "constraint" in task: + constraint = task.get('constraint', None) + if constraint is not None: + tc_fit_pod = constraint.get('pod', None) + tc_fit_installer = constraint.get('installer', None) + LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s", + cur_pod, cur_installer, constraint) + if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod: + return False + if cur_installer and tc_fit_installer and \ + cur_installer not in tc_fit_installer: + return False + return True + + def _get_task_para(self, task, cur_pod): + task_args = task.get('task_args', None) + if task_args is not None: + task_args = task_args.get(cur_pod, None) + task_args_fnames = task.get('task_args_fnames', None) + if task_args_fnames is not None: + task_args_fnames = task_args_fnames.get(cur_pod, None) + return task_args, task_args_fnames + def parse_suite(self): '''parse the suite file and return a list of task config file paths and lists of optional parameters if present''' - print "Parsing suite file:", self.path + LOG.info("\nParsing suite file:%s", self.path) try: with open(self.path) as stream: @@ -135,33 +199,38 @@ class TaskParser(object): sys.exit(ioerror) self._check_schema(cfg["schema"], "suite") - print "Starting suite:", cfg["name"] + LOG.info("\nStarting scenario:%s", cfg["name"]) test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default) - task_fnames = [] - task_args = [] - task_args_fnames = [] + if test_cases_dir[-1] != os.sep: + test_cases_dir += os.sep + + cur_pod = os.environ.get('NODE_NAME', None) + cur_installer = os.environ.get('INSTALLER_TYPE', None) + + valid_task_files = [] + valid_task_args = [] + valid_task_args_fnames = [] for task in cfg["test_cases"]: - task_fnames.append(task["file_name"]) - if "task_args" in task: - task_args.append(task["task_args"]) + # 1.check file_name + if "file_name" in task: + task_fname = task.get('file_name', None) + if task_fname is None: + continue else: - task_args.append(None) - - if "task_args_file" in task: - task_args_fnames.append(task["task_args_file"]) + continue + # 2.check constraint + if self._meet_constraint(task, cur_pod, cur_installer): + valid_task_files.append(test_cases_dir + task_fname) else: - task_args_fnames.append(None) - - suite_params = { - "test_cases_dir": test_cases_dir, - "task_fnames": task_fnames, - "task_args": task_args, - "task_args_fnames": task_args_fnames - } + continue + # 3.fetch task parameters + task_args, task_args_fnames = self._get_task_para(task, cur_pod) + valid_task_args.append(task_args) + valid_task_args_fnames.append(task_args_fnames) - return suite_params + return valid_task_files, valid_task_args, valid_task_args_fnames def parse_task(self, task_args=None, task_args_file=None): '''parses the task file and return an context and scenario instances''' @@ -192,26 +261,44 @@ class TaskParser(object): sys.exit(ioerror) self._check_schema(cfg["schema"], "task") + meet_precondition = self._check_precondition(cfg) # TODO: support one or many contexts? Many would simpler and precise + # TODO: support hybrid context type if "context" in cfg: context_cfgs = [cfg["context"]] - else: + elif "contexts" in cfg: context_cfgs = cfg["contexts"] + else: + context_cfgs = [{"type": "Dummy"}] for cfg_attrs in context_cfgs: - # config external_network based on env var - if "networks" in cfg_attrs: - for _, attrs in cfg_attrs["networks"].items(): - attrs["external_network"] = os.environ.get( - 'EXTERNAL_NETWORK', 'net04_ext') - context = Context() + context_type = cfg_attrs.get("type", "Heat") + if "Heat" == context_type and "networks" in cfg_attrs: + # bugfix: if there are more than one network, + # only add "external_network" on first one. + # the name of netwrok should follow this rule: + # test, test2, test3 ... + # sort network with the length of network's name + sorted_networks = sorted(cfg_attrs["networks"].keys()) + # config external_network based on env var + cfg_attrs["networks"][sorted_networks[0]]["external_network"] \ + = os.environ.get("EXTERNAL_NETWORK", "net04_ext") + + context = Context.get(context_type) context.init(cfg_attrs) run_in_parallel = cfg.get("run_in_parallel", False) + # add tc and task id for influxdb extended tags + task_id = str(uuid.uuid4()) + for scenario in cfg["scenarios"]: + task_name = os.path.splitext(os.path.basename(self.path))[0] + scenario["tc"] = task_name + scenario["task_id"] = task_id + # TODO we need something better here, a class that represent the file - return cfg["scenarios"], run_in_parallel + return cfg["scenarios"], run_in_parallel, meet_precondition def _check_schema(self, cfg_schema, schema_type): '''Check if config file is using the correct schema type''' @@ -220,6 +307,38 @@ class TaskParser(object): sys.exit("error: file %s has unknown schema %s" % (self.path, cfg_schema)) + def _check_precondition(self, cfg): + '''Check if the envrionment meet the preconditon''' + + if "precondition" in cfg: + precondition = cfg["precondition"] + installer_type = precondition.get("installer_type", None) + deploy_scenarios = precondition.get("deploy_scenarios", None) + tc_fit_pods = precondition.get("pod_name", None) + installer_type_env = os.environ.get('INSTALL_TYPE', None) + deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None) + pod_name_env = os.environ.get('NODE_NAME', None) + + LOG.info("installer_type: %s, installer_type_env: %s", + installer_type, installer_type_env) + LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s", + deploy_scenarios, deploy_scenario_env) + LOG.info("tc_fit_pods: %s, pod_name_env: %s", + tc_fit_pods, pod_name_env) + if installer_type and installer_type_env: + if installer_type_env not in installer_type: + return False + if deploy_scenarios and deploy_scenario_env: + deploy_scenarios_list = deploy_scenarios.split(',') + for deploy_scenario in deploy_scenarios_list: + if deploy_scenario_env.startswith(deploy_scenario): + return True + return False + if tc_fit_pods and pod_name_env: + if pod_name_env not in tc_fit_pods: + return False + return True + def atexit_handler(): '''handler for process termination''' @@ -240,45 +359,100 @@ def is_ip_addr(addr): return False -def run_one_scenario(scenario_cfg, output_file): - '''run one scenario using context''' - key_filename = pkg_resources.resource_filename( - 'yardstick.resources', 'files/yardstick_key') +def _is_same_heat_context(host_attr, target_attr): + '''check if two servers are in the same heat context + host_attr: either a name for a server created by yardstick or a dict + with attribute name mapping when using external heat templates + target_attr: either a name for a server created by yardstick or a dict + with attribute name mapping when using external heat templates + ''' + host = None + target = None + for context in Context.list: + if context.__context_type__ != "Heat": + continue + + host = context._get_server(host_attr) + if host is None: + continue + + target = context._get_server(target_attr) + if target is None: + return False + + # Both host and target is not None, then they are in the + # same heat context. + return True + + return False - host = Context.get_server(scenario_cfg["host"]) +def _is_background_scenario(scenario): + if "run_in_background" in scenario: + return scenario["run_in_background"] + else: + return False + + +def run_one_scenario(scenario_cfg, output_file): + '''run one scenario using context''' runner_cfg = scenario_cfg["runner"] - runner_cfg['host'] = host.public_ip - runner_cfg['user'] = host.context.user - runner_cfg['key_filename'] = key_filename runner_cfg['output_filename'] = output_file + # TODO support get multi hosts/vms info + context_cfg = {} + if "host" in scenario_cfg: + context_cfg['host'] = Context.get_server(scenario_cfg["host"]) + if "target" in scenario_cfg: if is_ip_addr(scenario_cfg["target"]): - scenario_cfg["ipaddr"] = scenario_cfg["target"] + context_cfg['target'] = {} + context_cfg['target']["ipaddr"] = scenario_cfg["target"] else: - target = Context.get_server(scenario_cfg["target"]) - - # get public IP for target server, some scenarios require it - if target.public_ip: - runner_cfg['target'] = target.public_ip - - # TODO scenario_cfg["ipaddr"] is bad naming - if host.context != target.context: - # target is in another context, get its public IP - scenario_cfg["ipaddr"] = target.public_ip + context_cfg['target'] = Context.get_server(scenario_cfg["target"]) + if _is_same_heat_context(scenario_cfg["host"], + scenario_cfg["target"]): + context_cfg["target"]["ipaddr"] = \ + context_cfg["target"]["private_ip"] else: - # target is in the same context, get its private IP - scenario_cfg["ipaddr"] = target.private_ip - + context_cfg["target"]["ipaddr"] = \ + context_cfg["target"]["ip"] + + if "targets" in scenario_cfg: + ip_list = [] + for target in scenario_cfg["targets"]: + if is_ip_addr(target): + ip_list.append(target) + context_cfg['target'] = {} + else: + context_cfg['target'] = Context.get_server(target) + if _is_same_heat_context(scenario_cfg["host"], target): + ip_list.append(context_cfg["target"]["private_ip"]) + else: + ip_list.append(context_cfg["target"]["ip"]) + context_cfg['target']['ipaddr'] = ','.join(ip_list) + + if "nodes" in scenario_cfg: + context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg) runner = base_runner.Runner.get(runner_cfg) print "Starting runner of type '%s'" % runner_cfg["type"] - runner.run(scenario_cfg["type"], scenario_cfg) + runner.run(scenario_cfg, context_cfg) return runner +def parse_nodes_with_context(scenario_cfg): + '''paras the 'nodes' fields in scenario ''' + nodes = scenario_cfg["nodes"] + + nodes_cfg = {} + for nodename in nodes: + nodes_cfg[nodename] = Context.get_server(nodes[nodename]) + + return nodes_cfg + + def runner_join(runner): '''join (wait for) a runner, exit process at runner failure''' status = runner.join()