X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fbenchmark%2Fcore%2Ftask.py;h=dd35bd4f43ab6ad21a4193bc8e3179f5b8017be4;hb=43bf12d6ab7bcaea16dc75ed4ccbe3895cf51da3;hp=c44081b7328b622ce3b7153cff67b8390a4a1099;hpb=0e1b3483f1514177b61a645abda906b68b13bd36;p=yardstick.git diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py index c44081b73..dd35bd4f4 100644 --- a/yardstick/benchmark/core/task.py +++ b/yardstick/benchmark/core/task.py @@ -13,6 +13,8 @@ from __future__ import absolute_import from __future__ import print_function import sys import os +from collections import OrderedDict + import yaml import atexit import ipaddress @@ -20,10 +22,13 @@ import time import logging import uuid import errno +import collections + from six.moves import filter from yardstick.benchmark.contexts.base import Context from yardstick.benchmark.runners import base as base_runner +from yardstick.dispatcher.base import Base as DispatcherBase from yardstick.common.task_template import TaskTemplate from yardstick.common.utils import source_env from yardstick.common import utils @@ -33,6 +38,7 @@ output_file_default = "/tmp/yardstick.out" config_file = '/etc/yardstick/yardstick.conf' test_cases_dir_default = "tests/opnfv/test_cases/" LOG = logging.getLogger(__name__) +JOIN_TIMEOUT = 60 class Task(object): # pragma: no cover @@ -42,20 +48,43 @@ class Task(object): # pragma: no cover """ def __init__(self): - self.config = {} self.contexts = [] self.outputs = {} + def _set_dispatchers(self, output_config): + dispatchers = output_config.get('DEFAULT', {}).get('dispatcher', + 'file') + out_types = [s.strip() for s in dispatchers.split(',')] + output_config['DEFAULT']['dispatcher'] = out_types + def start(self, args, **kwargs): """Start a benchmark scenario.""" atexit.register(self.atexit_handler) - self.task_id = kwargs.get('task_id', str(uuid.uuid4())) + task_id = getattr(args, 'task_id') + self.task_id = task_id if task_id else str(uuid.uuid4()) + + self._set_log() check_environment() - self.config['yardstick'] = utils.parse_ini_file(config_file) + try: + output_config = utils.parse_ini_file(config_file) + except Exception: + # all error will be ignore, the default value is {} + output_config = {} + + self._init_output_config(output_config) + self._set_output_config(output_config, args.output_file) + LOG.debug('Output configuration is: %s', output_config) + + self._set_dispatchers(output_config) + + # update dispatcher list + if 'file' in output_config['DEFAULT']['dispatcher']: + result = {'status': 0, 'result': {}} + utils.write_json_to_file(args.output_file, result) total_start_time = time.time() parser = TaskParser(args.inputfile[0]) @@ -75,6 +104,7 @@ class Task(object): # pragma: no cover if args.parse_only: sys.exit(0) + testcases = {} # parse task_files for i in range(0, len(task_files)): one_task_start_time = time.time() @@ -90,7 +120,16 @@ class Task(object): # pragma: no cover meet_precondition) continue - self._run(scenarios, run_in_parallel, args.output_file) + case_name = os.path.splitext(os.path.basename(task_files[i]))[0] + try: + data = self._run(scenarios, run_in_parallel, args.output_file) + except KeyboardInterrupt: + raise + except Exception: + LOG.exception("Running test case %s failed!", case_name) + testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []} + else: + testcases[case_name] = {'criteria': 'PASS', 'tc_data': data} if args.keep_deploy: # keep deployment, forget about stack @@ -104,6 +143,10 @@ class Task(object): # pragma: no cover LOG.info("task %s finished in %d secs", task_files[i], one_task_end_time - one_task_start_time) + result = self._get_format_result(testcases) + + self._do_output(output_config, result) + total_end_time = time.time() LOG.info("total finished in %d secs", total_end_time - total_start_time) @@ -113,6 +156,78 @@ class Task(object): # pragma: no cover scenario['task_id'], scenario['tc']) print("Done, exiting") + return result + + def _set_log(self): + log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s' + log_formatter = logging.Formatter(log_format) + + log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id)) + log_handler = logging.FileHandler(log_path) + log_handler.setFormatter(log_formatter) + log_handler.setLevel(logging.DEBUG) + + logging.root.addHandler(log_handler) + + def _init_output_config(self, output_config): + output_config.setdefault('DEFAULT', {}) + output_config.setdefault('dispatcher_http', {}) + output_config.setdefault('dispatcher_file', {}) + output_config.setdefault('dispatcher_influxdb', {}) + output_config.setdefault('nsb', {}) + + def _set_output_config(self, output_config, file_path): + try: + out_type = os.environ['DISPATCHER'] + except KeyError: + output_config['DEFAULT'].setdefault('dispatcher', 'file') + else: + output_config['DEFAULT']['dispatcher'] = out_type + + output_config['dispatcher_file']['file_path'] = file_path + + try: + target = os.environ['TARGET'] + except KeyError: + pass + else: + k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher']) + output_config[k]['target'] = target + + def _get_format_result(self, testcases): + criteria = self._get_task_criteria(testcases) + + info = { + 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'), + 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'), + 'pod_name': os.environ.get('NODE_NAME', 'unknown'), + 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown') + } + + result = { + 'status': 1, + 'result': { + 'criteria': criteria, + 'task_id': self.task_id, + 'info': info, + 'testcases': testcases + } + } + + return result + + def _get_task_criteria(self, testcases): + criteria = any(t.get('criteria') != 'PASS' for t in testcases.values()) + if criteria: + return 'FAIL' + else: + return 'PASS' + + def _do_output(self, output_config, result): + dispatchers = DispatcherBase.get(output_config) + + for dispatcher in dispatchers: + dispatcher.flush_result_data(result) def _run(self, scenarios, run_in_parallel, output_file): """Deploys context and calls runners""" @@ -121,6 +236,7 @@ class Task(object): # pragma: no cover background_runners = [] + result = [] # Start all background scenarios for scenario in filter(_is_background_scenario, scenarios): scenario["runner"] = dict(type="Duration", duration=1000000000) @@ -136,16 +252,23 @@ class Task(object): # pragma: no cover # Wait for runners to finish for runner in runners: - runner_join(runner) + status = runner_join(runner) + if status != 0: + raise RuntimeError self.outputs.update(runner.get_output()) + result.extend(runner.get_result()) print("Runner ended, output in", output_file) else: # run serially for scenario in scenarios: if not _is_background_scenario(scenario): runner = self.run_one_scenario(scenario, output_file) - runner_join(runner) + status = runner_join(runner) + if status != 0: + LOG.error('Scenario: %s ERROR', scenario.get('type')) + raise RuntimeError self.outputs.update(runner.get_output()) + result.extend(runner.get_result()) print("Runner ended, output in", output_file) # Abort background runners @@ -154,14 +277,17 @@ class Task(object): # pragma: no cover # Wait for background runners to finish for runner in background_runners: - if runner.join(timeout=60) is None: + status = runner.join(JOIN_TIMEOUT) + if status is None: # Nuke if it did not stop nicely base_runner.Runner.terminate(runner) - runner_join(runner) - self.outputs.update(runner.get_output()) - else: - base_runner.Runner.release(runner) + runner.join(JOIN_TIMEOUT) + base_runner.Runner.release(runner) + + self.outputs.update(runner.get_output()) + result.extend(runner.get_result()) print("Background task ended") + return result def atexit_handler(self): """handler for process termination""" @@ -227,7 +353,9 @@ class Task(object): # pragma: no cover if "nodes" in scenario_cfg: context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg) - runner = base_runner.Runner.get(runner_cfg, self.config) + context_cfg["networks"] = get_networks_from_nodes( + context_cfg["nodes"]) + runner = base_runner.Runner.get(runner_cfg) print("Starting runner of type '%s'" % runner_cfg["type"]) runner.run(scenario_cfg, context_cfg) @@ -276,17 +404,17 @@ class TaskParser(object): # pragma: no cover tc_fit_installer = constraint.get('installer', None) LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s", cur_pod, cur_installer, constraint) - if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod: + if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod): return False - if cur_installer and tc_fit_installer and \ - cur_installer not in tc_fit_installer: + if (cur_installer is None) or (tc_fit_installer and cur_installer + not in tc_fit_installer): return False return True def _get_task_para(self, task, cur_pod): task_args = task.get('task_args', None) if task_args is not None: - task_args = task_args.get(cur_pod, None) + task_args = task_args.get(cur_pod, task_args.get('default')) task_args_fnames = task.get('task_args_fnames', None) if task_args_fnames is not None: task_args_fnames = task_args_fnames.get(cur_pod, None) @@ -400,6 +528,9 @@ class TaskParser(object): # pragma: no cover task_name = os.path.splitext(os.path.basename(self.path))[0] scenario["tc"] = task_name scenario["task_id"] = task_id + # embed task path into scenario so we can load other files + # relative to task path + scenario["task_path"] = os.path.dirname(self.path) change_server_name(scenario, name_suffix) @@ -420,7 +551,7 @@ class TaskParser(object): # pragma: no cover cfg_schema)) def _check_precondition(self, cfg): - """Check if the envrionment meet the preconditon""" + """Check if the environment meet the precondition""" if "precondition" in cfg: precondition = cfg["precondition"] @@ -475,22 +606,35 @@ def _is_background_scenario(scenario): def parse_nodes_with_context(scenario_cfg): - """paras the 'nodes' fields in scenario """ - nodes = scenario_cfg["nodes"] - - nodes_cfg = {} - for nodename in nodes: - nodes_cfg[nodename] = Context.get_server(nodes[nodename]) - - return nodes_cfg + """parse the 'nodes' fields in scenario """ + # ensure consistency in node instantiation order + return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename])) + for nodename in sorted(scenario_cfg["nodes"])) + + +def get_networks_from_nodes(nodes): + """parse the 'nodes' fields in scenario """ + networks = {} + for node in nodes.values(): + if not node: + continue + interfaces = node.get('interfaces', {}) + for interface in interfaces.values(): + vld_id = interface.get('vld_id') + # mgmt network doesn't have vld_id + if not vld_id: + continue + network = Context.get_network({"vld_id": vld_id}) + if network: + networks[network['name']] = network + return networks def runner_join(runner): """join (wait for) a runner, exit process at runner failure""" status = runner.join() base_runner.Runner.release(runner) - if status != 0: - sys.exit("Runner failed") + return status def print_invalid_header(source_name, args): @@ -499,6 +643,9 @@ def print_invalid_header(source_name, args): def parse_task_args(src_name, args): + if isinstance(args, collections.Mapping): + return args + try: kw = args and yaml.safe_load(args) kw = {} if kw is None else kw