X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fbenchmark%2Fcore%2Ftask.py;h=478a51f9d164c596990aa7fe58065adf90eed33e;hb=1ff9df7e724eb0c981aebd5f5b8aa90db0da292b;hp=397ba00b087fc42178070093644c38e48f714609;hpb=45db0fdabb4585b96756a390650181a3c46facf7;p=yardstick.git diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py index 397ba00b0..478a51f9d 100644 --- a/yardstick/benchmark/core/task.py +++ b/yardstick/benchmark/core/task.py @@ -9,6 +9,8 @@ """ Handler for yardstick command 'task' """ +from __future__ import absolute_import +from __future__ import print_function import sys import os import yaml @@ -18,34 +20,50 @@ import time import logging import uuid import errno -from itertools import ifilter +from six.moves import filter from yardstick.benchmark.contexts.base import Context from yardstick.benchmark.runners import base as base_runner +from yardstick.dispatcher.base import Base as DispatcherBase from yardstick.common.task_template import TaskTemplate from yardstick.common.utils import source_env +from yardstick.common import utils from yardstick.common import constants output_file_default = "/tmp/yardstick.out" +config_file = '/etc/yardstick/yardstick.conf' test_cases_dir_default = "tests/opnfv/test_cases/" LOG = logging.getLogger(__name__) class Task(object): # pragma: no cover - '''Task commands. + """Task commands. Set of commands to manage benchmark tasks. - ''' + """ + + def __init__(self): + self.contexts = [] + self.outputs = {} def start(self, args, **kwargs): - '''Start a benchmark scenario.''' + """Start a benchmark scenario.""" - atexit.register(atexit_handler) + atexit.register(self.atexit_handler) self.task_id = kwargs.get('task_id', str(uuid.uuid4())) check_environment() + output_config = utils.parse_ini_file(config_file) + self._init_output_config(output_config) + self._set_output_config(output_config, args.output_file) + LOG.debug('Output configuration is: %s', output_config) + + if output_config['DEFAULT'].get('dispatcher') == 'file': + result = {'status': 0, 'result': {}} + utils.write_json_to_file(args.output_file, result) + total_start_time = time.time() parser = TaskParser(args.inputfile[0]) @@ -64,71 +82,158 @@ class Task(object): # pragma: no cover if args.parse_only: sys.exit(0) - if os.path.isfile(args.output_file): - os.remove(args.output_file) + testcases = {} # parse task_files for i in range(0, len(task_files)): one_task_start_time = time.time() parser.path = task_files[i] - scenarios, run_in_parallel, meet_precondition = parser.parse_task( - self.task_id, task_args[i], task_args_fnames[i]) + scenarios, run_in_parallel, meet_precondition, contexts = \ + parser.parse_task(self.task_id, task_args[i], + task_args_fnames[i]) + + self.contexts.extend(contexts) if not meet_precondition: LOG.info("meet_precondition is %s, please check envrionment", meet_precondition) continue - self._run(scenarios, run_in_parallel, args.output_file) + case_name = os.path.splitext(os.path.basename(task_files[i]))[0] + try: + data = self._run(scenarios, run_in_parallel, args.output_file) + except KeyboardInterrupt: + raise + except Exception: + testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []} + else: + testcases[case_name] = {'criteria': 'PASS', 'tc_data': data} if args.keep_deploy: # keep deployment, forget about stack # (hide it for exit handler) - Context.list = [] + self.contexts = [] else: - for context in Context.list: + for context in self.contexts[::-1]: context.undeploy() - Context.list = [] + self.contexts = [] one_task_end_time = time.time() LOG.info("task %s finished in %d secs", task_files[i], one_task_end_time - one_task_start_time) + result = self._get_format_result(testcases) + + self._do_output(output_config, result) + total_end_time = time.time() LOG.info("total finished in %d secs", total_end_time - total_start_time) - print "Done, exiting" + scenario = scenarios[0] + print("To generate report execute => yardstick report generate ", + scenario['task_id'], scenario['tc']) + + print("Done, exiting") + + def _init_output_config(self, output_config): + output_config.setdefault('DEFAULT', {}) + output_config.setdefault('dispatcher_http', {}) + output_config.setdefault('dispatcher_file', {}) + output_config.setdefault('dispatcher_influxdb', {}) + output_config.setdefault('nsb', {}) + + def _set_output_config(self, output_config, file_path): + try: + out_type = os.environ['DISPATCHER'] + except KeyError: + output_config['DEFAULT'].setdefault('dispatcher', 'file') + else: + output_config['DEFAULT']['dispatcher'] = out_type + + output_config['dispatcher_file']['file_path'] = file_path + + try: + target = os.environ['TARGET'] + except KeyError: + pass + else: + k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher']) + output_config[k]['target'] = target + + def _get_format_result(self, testcases): + criteria = self._get_task_criteria(testcases) + + info = { + 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'), + 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'), + 'pod_name': os.environ.get('NODE_NAME', 'unknown'), + 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown') + } + + result = { + 'status': 1, + 'result': { + 'criteria': criteria, + 'task_id': self.task_id, + 'info': info, + 'testcases': testcases + } + } + + return result + + def _get_task_criteria(self, testcases): + criteria = any(t.get('criteria') != 'PASS' for t in testcases.values()) + if criteria: + return 'FAIL' + else: + return 'PASS' + + def _do_output(self, output_config, result): + + dispatcher = DispatcherBase.get(output_config) + dispatcher.flush_result_data(result) def _run(self, scenarios, run_in_parallel, output_file): - '''Deploys context and calls runners''' - for context in Context.list: + """Deploys context and calls runners""" + for context in self.contexts: context.deploy() background_runners = [] + result = [] # Start all background scenarios - for scenario in ifilter(_is_background_scenario, scenarios): + for scenario in filter(_is_background_scenario, scenarios): scenario["runner"] = dict(type="Duration", duration=1000000000) - runner = run_one_scenario(scenario, output_file) + runner = self.run_one_scenario(scenario, output_file) background_runners.append(runner) runners = [] if run_in_parallel: for scenario in scenarios: if not _is_background_scenario(scenario): - runner = run_one_scenario(scenario, output_file) + runner = self.run_one_scenario(scenario, output_file) runners.append(runner) # Wait for runners to finish for runner in runners: - runner_join(runner) - print "Runner ended, output in", output_file + status = runner_join(runner) + if status != 0: + raise RuntimeError + self.outputs.update(runner.get_output()) + result.extend(runner.get_result()) + print("Runner ended, output in", output_file) else: # run serially for scenario in scenarios: if not _is_background_scenario(scenario): - runner = run_one_scenario(scenario, output_file) - runner_join(runner) - print "Runner ended, output in", output_file + runner = self.run_one_scenario(scenario, output_file) + status = runner_join(runner) + if status != 0: + LOG.error('Scenario: %s ERROR', scenario.get('type')) + raise RuntimeError + self.outputs.update(runner.get_output()) + result.extend(runner.get_result()) + print("Runner ended, output in", output_file) # Abort background runners for runner in background_runners: @@ -136,20 +241,123 @@ class Task(object): # pragma: no cover # Wait for background runners to finish for runner in background_runners: - if runner.join(timeout=60) is None: + status = runner.join(timeout=60) + if status is None: # Nuke if it did not stop nicely base_runner.Runner.terminate(runner) - runner_join(runner) + status = runner_join(runner) + self.outputs.update(runner.get_output()) + result.extend(runner.get_result()) else: base_runner.Runner.release(runner) - print "Background task ended" + if status != 0: + raise RuntimeError + print("Background task ended") + + return result + + def atexit_handler(self): + """handler for process termination""" + base_runner.Runner.terminate_all() + + if self.contexts: + print("Undeploying all contexts") + for context in self.contexts[::-1]: + context.undeploy() + + def _parse_options(self, op): + if isinstance(op, dict): + return {k: self._parse_options(v) for k, v in op.items()} + elif isinstance(op, list): + return [self._parse_options(v) for v in op] + elif isinstance(op, str): + return self.outputs.get(op[1:]) if op.startswith('$') else op + else: + return op + + def run_one_scenario(self, scenario_cfg, output_file): + """run one scenario using context""" + runner_cfg = scenario_cfg["runner"] + runner_cfg['output_filename'] = output_file + + options = scenario_cfg.get('options', {}) + scenario_cfg['options'] = self._parse_options(options) + + # TODO support get multi hosts/vms info + context_cfg = {} + if "host" in scenario_cfg: + context_cfg['host'] = Context.get_server(scenario_cfg["host"]) + + if "target" in scenario_cfg: + if is_ip_addr(scenario_cfg["target"]): + context_cfg['target'] = {} + context_cfg['target']["ipaddr"] = scenario_cfg["target"] + else: + context_cfg['target'] = Context.get_server( + scenario_cfg["target"]) + if self._is_same_heat_context(scenario_cfg["host"], + scenario_cfg["target"]): + context_cfg["target"]["ipaddr"] = \ + context_cfg["target"]["private_ip"] + else: + context_cfg["target"]["ipaddr"] = \ + context_cfg["target"]["ip"] + + if "targets" in scenario_cfg: + ip_list = [] + for target in scenario_cfg["targets"]: + if is_ip_addr(target): + ip_list.append(target) + context_cfg['target'] = {} + else: + context_cfg['target'] = Context.get_server(target) + if self._is_same_heat_context(scenario_cfg["host"], + target): + ip_list.append(context_cfg["target"]["private_ip"]) + else: + ip_list.append(context_cfg["target"]["ip"]) + context_cfg['target']['ipaddr'] = ','.join(ip_list) + + if "nodes" in scenario_cfg: + context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg) + runner = base_runner.Runner.get(runner_cfg) + + print("Starting runner of type '%s'" % runner_cfg["type"]) + runner.run(scenario_cfg, context_cfg) + + return runner + + def _is_same_heat_context(self, host_attr, target_attr): + """check if two servers are in the same heat context + host_attr: either a name for a server created by yardstick or a dict + with attribute name mapping when using external heat templates + target_attr: either a name for a server created by yardstick or a dict + with attribute name mapping when using external heat templates + """ + host = None + target = None + for context in self.contexts: + if context.__context_type__ != "Heat": + continue + host = context._get_server(host_attr) + if host is None: + continue -# TODO: Move stuff below into TaskCommands class !? + target = context._get_server(target_attr) + if target is None: + return False + + # Both host and target is not None, then they are in the + # same heat context. + return True + + return False class TaskParser(object): # pragma: no cover - '''Parser for task config files in yaml format''' + """Parser for task config files in yaml format""" + def __init__(self, path): self.path = path @@ -178,8 +386,8 @@ class TaskParser(object): # pragma: no cover return task_args, task_args_fnames def parse_suite(self): - '''parse the suite file and return a list of task config file paths - and lists of optional parameters if present''' + """parse the suite file and return a list of task config file paths + and lists of optional parameters if present""" LOG.info("\nParsing suite file:%s", self.path) try: @@ -192,6 +400,8 @@ class TaskParser(object): # pragma: no cover LOG.info("\nStarting scenario:%s", cfg["name"]) test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default) + test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH, + test_cases_dir) if test_cases_dir[-1] != os.sep: test_cases_dir += os.sep @@ -223,8 +433,8 @@ class TaskParser(object): # pragma: no cover return valid_task_files, valid_task_args, valid_task_args_fnames def parse_task(self, task_id, task_args=None, task_args_file=None): - '''parses the task file and return an context and scenario instances''' - print "Parsing task config:", self.path + """parses the task file and return an context and scenario instances""" + print("Parsing task config:", self.path) try: kw = {} @@ -241,10 +451,10 @@ class TaskParser(object): # pragma: no cover input_task = f.read() rendered_task = TaskTemplate.render(input_task, **kw) except Exception as e: - print(("Failed to render template:\n%(task)s\n%(err)s\n") + print("Failed to render template:\n%(task)s\n%(err)s\n" % {"task": input_task, "err": e}) raise e - print(("Input task is:\n%s\n") % rendered_task) + print("Input task is:\n%s\n" % rendered_task) cfg = yaml.load(rendered_task) except IOError as ioerror: @@ -262,21 +472,19 @@ class TaskParser(object): # pragma: no cover else: context_cfgs = [{"type": "Dummy"}] + contexts = [] + name_suffix = '-{}'.format(task_id[:8]) for cfg_attrs in context_cfgs: + try: + cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'], + name_suffix) + except KeyError: + pass + # default to Heat context because we are testing OpenStack context_type = cfg_attrs.get("type", "Heat") - if "Heat" == context_type and "networks" in cfg_attrs: - # bugfix: if there are more than one network, - # only add "external_network" on first one. - # the name of netwrok should follow this rule: - # test, test2, test3 ... - # sort network with the length of network's name - sorted_networks = sorted(cfg_attrs["networks"].keys()) - # config external_network based on env var - cfg_attrs["networks"][sorted_networks[0]]["external_network"] \ - = os.environ.get("EXTERNAL_NETWORK", "net04_ext") - context = Context.get(context_type) context.init(cfg_attrs) + contexts.append(context) run_in_parallel = cfg.get("run_in_parallel", False) @@ -285,19 +493,30 @@ class TaskParser(object): # pragma: no cover task_name = os.path.splitext(os.path.basename(self.path))[0] scenario["tc"] = task_name scenario["task_id"] = task_id + # embed task path into scenario so we can load other files + # relative to task path + scenario["task_path"] = os.path.dirname(self.path) + + change_server_name(scenario, name_suffix) + + try: + for node in scenario['nodes']: + scenario['nodes'][node] += name_suffix + except KeyError: + pass # TODO we need something better here, a class that represent the file - return cfg["scenarios"], run_in_parallel, meet_precondition + return cfg["scenarios"], run_in_parallel, meet_precondition, contexts def _check_schema(self, cfg_schema, schema_type): - '''Check if config file is using the correct schema type''' + """Check if config file is using the correct schema type""" if cfg_schema != "yardstick:" + schema_type + ":0.1": sys.exit("error: file %s has unknown schema %s" % (self.path, cfg_schema)) def _check_precondition(self, cfg): - '''Check if the envrionment meet the preconditon''' + """Check if the envrionment meet the preconditon""" if "precondition" in cfg: precondition = cfg["precondition"] @@ -329,52 +548,20 @@ class TaskParser(object): # pragma: no cover return True -def atexit_handler(): - '''handler for process termination''' - base_runner.Runner.terminate_all() - - if len(Context.list) > 0: - print "Undeploying all contexts" - for context in Context.list: - context.undeploy() - - def is_ip_addr(addr): - '''check if string addr is an IP address''' + """check if string addr is an IP address""" try: - ipaddress.ip_address(unicode(addr)) - return True + addr = addr.get('public_ip_attr', addr.get('private_ip_attr')) + except AttributeError: + pass + + try: + ipaddress.ip_address(addr.encode('utf-8')) except ValueError: return False - - -def _is_same_heat_context(host_attr, target_attr): - '''check if two servers are in the same heat context - host_attr: either a name for a server created by yardstick or a dict - with attribute name mapping when using external heat templates - target_attr: either a name for a server created by yardstick or a dict - with attribute name mapping when using external heat templates - ''' - host = None - target = None - for context in Context.list: - if context.__context_type__ != "Heat": - continue - - host = context._get_server(host_attr) - if host is None: - continue - - target = context._get_server(target_attr) - if target is None: - return False - - # Both host and target is not None, then they are in the - # same heat context. + else: return True - return False - def _is_background_scenario(scenario): if "run_in_background" in scenario: @@ -383,56 +570,8 @@ def _is_background_scenario(scenario): return False -def run_one_scenario(scenario_cfg, output_file): - '''run one scenario using context''' - runner_cfg = scenario_cfg["runner"] - runner_cfg['output_filename'] = output_file - - # TODO support get multi hosts/vms info - context_cfg = {} - if "host" in scenario_cfg: - context_cfg['host'] = Context.get_server(scenario_cfg["host"]) - - if "target" in scenario_cfg: - if is_ip_addr(scenario_cfg["target"]): - context_cfg['target'] = {} - context_cfg['target']["ipaddr"] = scenario_cfg["target"] - else: - context_cfg['target'] = Context.get_server(scenario_cfg["target"]) - if _is_same_heat_context(scenario_cfg["host"], - scenario_cfg["target"]): - context_cfg["target"]["ipaddr"] = \ - context_cfg["target"]["private_ip"] - else: - context_cfg["target"]["ipaddr"] = \ - context_cfg["target"]["ip"] - - if "targets" in scenario_cfg: - ip_list = [] - for target in scenario_cfg["targets"]: - if is_ip_addr(target): - ip_list.append(target) - context_cfg['target'] = {} - else: - context_cfg['target'] = Context.get_server(target) - if _is_same_heat_context(scenario_cfg["host"], target): - ip_list.append(context_cfg["target"]["private_ip"]) - else: - ip_list.append(context_cfg["target"]["ip"]) - context_cfg['target']['ipaddr'] = ','.join(ip_list) - - if "nodes" in scenario_cfg: - context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg) - runner = base_runner.Runner.get(runner_cfg) - - print "Starting runner of type '%s'" % runner_cfg["type"] - runner.run(scenario_cfg, context_cfg) - - return runner - - def parse_nodes_with_context(scenario_cfg): - '''paras the 'nodes' fields in scenario ''' + """paras the 'nodes' fields in scenario """ nodes = scenario_cfg["nodes"] nodes_cfg = {} @@ -443,15 +582,14 @@ def parse_nodes_with_context(scenario_cfg): def runner_join(runner): - '''join (wait for) a runner, exit process at runner failure''' + """join (wait for) a runner, exit process at runner failure""" status = runner.join() base_runner.Runner.release(runner) - if status != 0: - sys.exit("Runner failed") + return status def print_invalid_header(source_name, args): - print(("Invalid %(source)s passed:\n\n %(args)s\n") + print("Invalid %(source)s passed:\n\n %(args)s\n" % {"source": source_name, "args": args}) @@ -461,13 +599,13 @@ def parse_task_args(src_name, args): kw = {} if kw is None else kw except yaml.parser.ParserError as e: print_invalid_header(src_name, args) - print(("%(source)s has to be YAML. Details:\n\n%(err)s\n") + print("%(source)s has to be YAML. Details:\n\n%(err)s\n" % {"source": src_name, "err": e}) raise TypeError() if not isinstance(kw, dict): print_invalid_header(src_name, args) - print(("%(src)s had to be dict, actually %(src_type)s\n") + print("%(src)s had to be dict, actually %(src_type)s\n" % {"src": src_name, "src_type": type(kw)}) raise TypeError() return kw @@ -477,8 +615,36 @@ def check_environment(): auth_url = os.environ.get('OS_AUTH_URL', None) if not auth_url: try: - source_env(constants.OPENSTACK_RC_FILE) + source_env(constants.OPENRC) except IOError as e: if e.errno != errno.EEXIST: raise LOG.debug('OPENRC file not found') + + +def change_server_name(scenario, suffix): + try: + host = scenario['host'] + except KeyError: + pass + else: + try: + host['name'] += suffix + except TypeError: + scenario['host'] += suffix + + try: + target = scenario['target'] + except KeyError: + pass + else: + try: + target['name'] += suffix + except TypeError: + scenario['target'] += suffix + + try: + key = 'targets' + scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]] + except KeyError: + pass