X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fcmd%2Fcommands%2Ftask.py;h=e2e8bf67d86353cb8d475338bcc86fc75575beaf;hb=refs%2Fchanges%2F35%2F41835%2F5;hp=a10a2a8a3e6becff00d49580caf7ba23718bf8b1;hpb=1873da9a908a50fe70a074731e3643297b6eab6d;p=yardstick.git diff --git a/yardstick/cmd/commands/task.py b/yardstick/cmd/commands/task.py index a10a2a8a3..e2e8bf67d 100644 --- a/yardstick/cmd/commands/task.py +++ b/yardstick/cmd/commands/task.py @@ -1,4 +1,4 @@ -############################################################################## +############################################################################# # Copyright (c) 2015 Ericsson AB and others. # # All rights reserved. This program and the accompanying materials @@ -8,32 +8,27 @@ ############################################################################## """ Handler for yardstick command 'task' """ +from __future__ import print_function +from __future__ import absolute_import -import sys -import os -import yaml -import atexit -import ipaddress -import time import logging -import uuid -from itertools import ifilter -from yardstick.benchmark.contexts.base import Context -from yardstick.benchmark.runners import base as base_runner -from yardstick.common.task_template import TaskTemplate +from yardstick.benchmark.core.task import Task from yardstick.common.utils import cliargs +from yardstick.common.utils import write_json_to_file +from yardstick.cmd.commands import change_osloobj_to_paras output_file_default = "/tmp/yardstick.out" -test_cases_dir_default = "tests/opnfv/test_cases/" + + LOG = logging.getLogger(__name__) -class TaskCommands(object): - '''Task commands. +class TaskCommands(object): # pragma: no cover + """Task commands. Set of commands to manage benchmark tasks. - ''' + """ @cliargs("inputfile", type=str, help="path to task or suite file", nargs=1) @cliargs("--task-args", dest="task_args", @@ -52,433 +47,23 @@ class TaskCommands(object): @cliargs("--suite", help="process test suite file instead of a task file", action="store_true") def do_start(self, args, **kwargs): - '''Start a benchmark scenario.''' - - atexit.register(atexit_handler) - - total_start_time = time.time() - parser = TaskParser(args.inputfile[0]) - - if args.suite: - # 1.parse suite, return suite_params info - task_files, task_args, task_args_fnames = \ - parser.parse_suite() - else: - task_files = [parser.path] - task_args = [args.task_args] - task_args_fnames = [args.task_args_file] - - LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s", - task_files, task_args, task_args_fnames) - - if args.parse_only: - sys.exit(0) - - if os.path.isfile(args.output_file): - os.remove(args.output_file) - # parse task_files - for i in range(0, len(task_files)): - one_task_start_time = time.time() - parser.path = task_files[i] - scenarios, run_in_parallel, meet_precondition = parser.parse_task( - task_args[i], task_args_fnames[i]) - - if not meet_precondition: - LOG.info("meet_precondition is %s, please check envrionment", - meet_precondition) - continue - - self._run(scenarios, run_in_parallel, args.output_file) - - if args.keep_deploy: - # keep deployment, forget about stack - # (hide it for exit handler) - Context.list = [] - else: - for context in Context.list: - context.undeploy() - Context.list = [] - one_task_end_time = time.time() - LOG.info("task %s finished in %d secs", task_files[i], - one_task_end_time - one_task_start_time) - - total_end_time = time.time() - LOG.info("total finished in %d secs", - total_end_time - total_start_time) - - print "Done, exiting" - - def _run(self, scenarios, run_in_parallel, output_file): - '''Deploys context and calls runners''' - for context in Context.list: - context.deploy() - - background_runners = [] - - # Start all background scenarios - for scenario in ifilter(_is_background_scenario, scenarios): - scenario["runner"] = dict(type="Duration", duration=1000000000) - runner = run_one_scenario(scenario, output_file) - background_runners.append(runner) - - runners = [] - if run_in_parallel: - for scenario in scenarios: - if not _is_background_scenario(scenario): - runner = run_one_scenario(scenario, output_file) - runners.append(runner) - - # Wait for runners to finish - for runner in runners: - runner_join(runner) - print "Runner ended, output in", output_file - else: - # run serially - for scenario in scenarios: - if not _is_background_scenario(scenario): - runner = run_one_scenario(scenario, output_file) - runner_join(runner) - print "Runner ended, output in", output_file - - # Abort background runners - for runner in background_runners: - runner.abort() - - # Wait for background runners to finish - for runner in background_runners: - if runner.join(timeout=60) is None: - # Nuke if it did not stop nicely - base_runner.Runner.terminate(runner) - runner_join(runner) - else: - base_runner.Runner.release(runner) - print "Background task ended" - - -# TODO: Move stuff below into TaskCommands class !? - - -class TaskParser(object): - '''Parser for task config files in yaml format''' - def __init__(self, path): - self.path = path - - def _meet_constraint(self, task, cur_pod, cur_installer): - if "constraint" in task: - constraint = task.get('constraint', None) - if constraint is not None: - tc_fit_pod = constraint.get('pod', None) - tc_fit_installer = constraint.get('installer', None) - LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s", - cur_pod, cur_installer, constraint) - if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod: - return False - if cur_installer and tc_fit_installer and \ - cur_installer not in tc_fit_installer: - return False - return True - - def _get_task_para(self, task, cur_pod): - task_args = task.get('task_args', None) - if task_args is not None: - task_args = task_args.get(cur_pod, None) - task_args_fnames = task.get('task_args_fnames', None) - if task_args_fnames is not None: - task_args_fnames = task_args_fnames.get(cur_pod, None) - return task_args, task_args_fnames - - def parse_suite(self): - '''parse the suite file and return a list of task config file paths - and lists of optional parameters if present''' - LOG.info("\nParsing suite file:%s", self.path) + param = change_osloobj_to_paras(args) + self.output_file = param.output_file + result = {} + LOG.info('Task START') try: - with open(self.path) as stream: - cfg = yaml.load(stream) - except IOError as ioerror: - sys.exit(ioerror) - - self._check_schema(cfg["schema"], "suite") - LOG.info("\nStarting scenario:%s", cfg["name"]) - - test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default) - if test_cases_dir[-1] != os.sep: - test_cases_dir += os.sep - - cur_pod = os.environ.get('NODE_NAME', None) - cur_installer = os.environ.get('INSTALLER_TYPE', None) - - valid_task_files = [] - valid_task_args = [] - valid_task_args_fnames = [] - - for task in cfg["test_cases"]: - # 1.check file_name - if "file_name" in task: - task_fname = task.get('file_name', None) - if task_fname is None: - continue - else: - continue - # 2.check constraint - if self._meet_constraint(task, cur_pod, cur_installer): - valid_task_files.append(test_cases_dir + task_fname) - else: - continue - # 3.fetch task parameters - task_args, task_args_fnames = self._get_task_para(task, cur_pod) - valid_task_args.append(task_args) - valid_task_args_fnames.append(task_args_fnames) - - return valid_task_files, valid_task_args, valid_task_args_fnames - - def parse_task(self, task_args=None, task_args_file=None): - '''parses the task file and return an context and scenario instances''' - print "Parsing task config:", self.path - - try: - kw = {} - if task_args_file: - with open(task_args_file) as f: - kw.update(parse_task_args("task_args_file", f.read())) - kw.update(parse_task_args("task_args", task_args)) - except TypeError: - raise TypeError() - - try: - with open(self.path) as f: - try: - input_task = f.read() - rendered_task = TaskTemplate.render(input_task, **kw) - except Exception as e: - print(("Failed to render template:\n%(task)s\n%(err)s\n") - % {"task": input_task, "err": e}) - raise e - print(("Input task is:\n%s\n") % rendered_task) - - cfg = yaml.load(rendered_task) - except IOError as ioerror: - sys.exit(ioerror) - - self._check_schema(cfg["schema"], "task") - meet_precondition = self._check_precondition(cfg) + result = Task().start(param, **kwargs) + except Exception as e: + self._write_error_data(e) + LOG.exception("") - # TODO: support one or many contexts? Many would simpler and precise - # TODO: support hybrid context type - if "context" in cfg: - context_cfgs = [cfg["context"]] - elif "contexts" in cfg: - context_cfgs = cfg["contexts"] + if result.get('result', {}).get('criteria') == 'PASS': + LOG.info('Task SUCCESS') else: - context_cfgs = [{"type": "Dummy"}] - - for cfg_attrs in context_cfgs: - context_type = cfg_attrs.get("type", "Heat") - if "Heat" == context_type and "networks" in cfg_attrs: - # bugfix: if there are more than one network, - # only add "external_network" on first one. - # the name of netwrok should follow this rule: - # test, test2, test3 ... - # sort network with the length of network's name - sorted_networks = sorted(cfg_attrs["networks"].keys()) - # config external_network based on env var - cfg_attrs["networks"][sorted_networks[0]]["external_network"] \ - = os.environ.get("EXTERNAL_NETWORK", "net04_ext") - - context = Context.get(context_type) - context.init(cfg_attrs) - - run_in_parallel = cfg.get("run_in_parallel", False) - - # add tc and task id for influxdb extended tags - task_id = str(uuid.uuid4()) - for scenario in cfg["scenarios"]: - task_name = os.path.splitext(os.path.basename(self.path))[0] - scenario["tc"] = task_name - scenario["task_id"] = task_id - - # TODO we need something better here, a class that represent the file - return cfg["scenarios"], run_in_parallel, meet_precondition - - def _check_schema(self, cfg_schema, schema_type): - '''Check if config file is using the correct schema type''' - - if cfg_schema != "yardstick:" + schema_type + ":0.1": - sys.exit("error: file %s has unknown schema %s" % (self.path, - cfg_schema)) - - def _check_precondition(self, cfg): - '''Check if the envrionment meet the preconditon''' - - if "precondition" in cfg: - precondition = cfg["precondition"] - installer_type = precondition.get("installer_type", None) - deploy_scenarios = precondition.get("deploy_scenarios", None) - tc_fit_pods = precondition.get("pod_name", None) - installer_type_env = os.environ.get('INSTALL_TYPE', None) - deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None) - pod_name_env = os.environ.get('NODE_NAME', None) - - LOG.info("installer_type: %s, installer_type_env: %s", - installer_type, installer_type_env) - LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s", - deploy_scenarios, deploy_scenario_env) - LOG.info("tc_fit_pods: %s, pod_name_env: %s", - tc_fit_pods, pod_name_env) - if installer_type and installer_type_env: - if installer_type_env not in installer_type: - return False - if deploy_scenarios and deploy_scenario_env: - deploy_scenarios_list = deploy_scenarios.split(',') - for deploy_scenario in deploy_scenarios_list: - if deploy_scenario_env.startswith(deploy_scenario): - return True - return False - if tc_fit_pods and pod_name_env: - if pod_name_env not in tc_fit_pods: - return False - return True - - -def atexit_handler(): - '''handler for process termination''' - base_runner.Runner.terminate_all() - - if len(Context.list) > 0: - print "Undeploying all contexts" - for context in Context.list: - context.undeploy() - - -def is_ip_addr(addr): - '''check if string addr is an IP address''' - try: - ipaddress.ip_address(unicode(addr)) - return True - except ValueError: - return False - - -def _is_same_heat_context(host_attr, target_attr): - '''check if two servers are in the same heat context - host_attr: either a name for a server created by yardstick or a dict - with attribute name mapping when using external heat templates - target_attr: either a name for a server created by yardstick or a dict - with attribute name mapping when using external heat templates - ''' - host = None - target = None - for context in Context.list: - if context.__context_type__ != "Heat": - continue - - host = context._get_server(host_attr) - if host is None: - continue - - target = context._get_server(target_attr) - if target is None: - return False - - # Both host and target is not None, then they are in the - # same heat context. - return True - - return False - - -def _is_background_scenario(scenario): - if "run_in_background" in scenario: - return scenario["run_in_background"] - else: - return False - - -def run_one_scenario(scenario_cfg, output_file): - '''run one scenario using context''' - runner_cfg = scenario_cfg["runner"] - runner_cfg['output_filename'] = output_file - - # TODO support get multi hosts/vms info - context_cfg = {} - if "host" in scenario_cfg: - context_cfg['host'] = Context.get_server(scenario_cfg["host"]) - - if "target" in scenario_cfg: - if is_ip_addr(scenario_cfg["target"]): - context_cfg['target'] = {} - context_cfg['target']["ipaddr"] = scenario_cfg["target"] - else: - context_cfg['target'] = Context.get_server(scenario_cfg["target"]) - if _is_same_heat_context(scenario_cfg["host"], - scenario_cfg["target"]): - context_cfg["target"]["ipaddr"] = \ - context_cfg["target"]["private_ip"] - else: - context_cfg["target"]["ipaddr"] = \ - context_cfg["target"]["ip"] - - if "targets" in scenario_cfg: - ip_list = [] - for target in scenario_cfg["targets"]: - if is_ip_addr(target): - ip_list.append(target) - context_cfg['target'] = {} - else: - context_cfg['target'] = Context.get_server(target) - if _is_same_heat_context(scenario_cfg["host"], target): - ip_list.append(context_cfg["target"]["private_ip"]) - else: - ip_list.append(context_cfg["target"]["ip"]) - context_cfg['target']['ipaddr'] = ','.join(ip_list) - - if "nodes" in scenario_cfg: - context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg) - runner = base_runner.Runner.get(runner_cfg) - - print "Starting runner of type '%s'" % runner_cfg["type"] - runner.run(scenario_cfg, context_cfg) - - return runner - - -def parse_nodes_with_context(scenario_cfg): - '''paras the 'nodes' fields in scenario ''' - nodes = scenario_cfg["nodes"] - - nodes_cfg = {} - for nodename in nodes: - nodes_cfg[nodename] = Context.get_server(nodes[nodename]) - - return nodes_cfg - - -def runner_join(runner): - '''join (wait for) a runner, exit process at runner failure''' - status = runner.join() - base_runner.Runner.release(runner) - if status != 0: - sys.exit("Runner failed") - - -def print_invalid_header(source_name, args): - print(("Invalid %(source)s passed:\n\n %(args)s\n") - % {"source": source_name, "args": args}) - - -def parse_task_args(src_name, args): - try: - kw = args and yaml.safe_load(args) - kw = {} if kw is None else kw - except yaml.parser.ParserError as e: - print_invalid_header(src_name, args) - print(("%(source)s has to be YAML. Details:\n\n%(err)s\n") - % {"source": src_name, "err": e}) - raise TypeError() + LOG.info('Task FAILED') + raise RuntimeError('Task Failed') - if not isinstance(kw, dict): - print_invalid_header(src_name, args) - print(("%(src)s had to be dict, actually %(src_type)s\n") - % {"src": src_name, "src_type": type(kw)}) - raise TypeError() - return kw + def _write_error_data(self, error): + data = {'status': 2, 'result': str(error)} + write_json_to_file(self.output_file, data)