X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fbenchmark%2Fcore%2Ftask.py;h=e7acde696b4c0bd80fb070a49a458a4af8856e14;hb=e4c7ab64f6886b74e3725967d048801e879b3ec4;hp=a8447e2cf468ebce3a2b62c42c4e60d49725ab25;hpb=38eb33a092e903b9854267d3e36496c919517103;p=yardstick.git diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py index a8447e2cf..e7acde696 100644 --- a/yardstick/benchmark/core/task.py +++ b/yardstick/benchmark/core/task.py @@ -7,10 +7,6 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -""" Handler for yardstick command 'task' """ - -from __future__ import absolute_import -from __future__ import print_function import sys import os from collections import OrderedDict @@ -28,18 +24,18 @@ from jinja2 import Environment from yardstick.benchmark.contexts.base import Context from yardstick.benchmark.runners import base as base_runner +from yardstick.common.constants import CONF_FILE from yardstick.common.yaml_loader import yaml_load from yardstick.dispatcher.base import Base as DispatcherBase -from yardstick.common.task_template import TaskTemplate -from yardstick.common import utils from yardstick.common import constants +from yardstick.common import exceptions as y_exc +from yardstick.common import task_template +from yardstick.common import utils from yardstick.common.html_template import report_template output_file_default = "/tmp/yardstick.out" -config_file = '/etc/yardstick/yardstick.conf' test_cases_dir_default = "tests/opnfv/test_cases/" LOG = logging.getLogger(__name__) -JOIN_TIMEOUT = 60 class Task(object): # pragma: no cover @@ -58,7 +54,7 @@ class Task(object): # pragma: no cover out_types = [s.strip() for s in dispatchers.split(',')] output_config['DEFAULT']['dispatcher'] = out_types - def start(self, args, **kwargs): + def start(self, args, **kwargs): # pylint: disable=unused-argument """Start a benchmark scenario.""" atexit.register(self.atexit_handler) @@ -69,8 +65,8 @@ class Task(object): # pragma: no cover self._set_log() try: - output_config = utils.parse_ini_file(config_file) - except Exception: + output_config = utils.parse_ini_file(CONF_FILE) + except Exception: # pylint: disable=broad-except # all error will be ignore, the default value is {} output_config = {} @@ -90,8 +86,7 @@ class Task(object): # pragma: no cover if args.suite: # 1.parse suite, return suite_params info - task_files, task_args, task_args_fnames = \ - parser.parse_suite() + task_files, task_args, task_args_fnames = parser.parse_suite() else: task_files = [parser.path] task_args = [args.task_args] @@ -104,31 +99,33 @@ class Task(object): # pragma: no cover sys.exit(0) testcases = {} - # parse task_files - for i in range(0, len(task_files)): - one_task_start_time = time.time() - parser.path = task_files[i] - scenarios, run_in_parallel, meet_precondition, contexts = \ - parser.parse_task(self.task_id, task_args[i], - task_args_fnames[i]) + tasks = self._parse_tasks(parser, task_files, args, task_args, + task_args_fnames) - self.contexts.extend(contexts) - - if not meet_precondition: - LOG.info("meet_precondition is %s, please check envrionment", - meet_precondition) + # Execute task files. + for i, _ in enumerate(task_files): + one_task_start_time = time.time() + self.contexts.extend(tasks[i]['contexts']) + if not tasks[i]['meet_precondition']: + LOG.info('"meet_precondition" is %s, please check environment', + tasks[i]['meet_precondition']) continue - case_name = os.path.splitext(os.path.basename(task_files[i]))[0] try: - data = self._run(scenarios, run_in_parallel, args.output_file) + data = self._run(tasks[i]['scenarios'], + tasks[i]['run_in_parallel'], + output_config) except KeyboardInterrupt: raise - except Exception: - LOG.exception("Running test case %s failed!", case_name) - testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []} + except Exception: # pylint: disable=broad-except + LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'], + exc_info=True) + testcases[tasks[i]['case_name']] = {'criteria': 'FAIL', + 'tc_data': []} else: - testcases[case_name] = {'criteria': 'PASS', 'tc_data': data} + LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name']) + testcases[tasks[i]['case_name']] = {'criteria': 'PASS', + 'tc_data': data} if args.keep_deploy: # keep deployment, forget about stack @@ -151,9 +148,8 @@ class Task(object): # pragma: no cover LOG.info("Total finished in %d secs", total_end_time - total_start_time) - scenario = scenarios[0] - LOG.info("To generate report, execute command " - "'yardstick report generate %(task_id)s %(tc)s'", scenario) + LOG.info('To generate report, execute command "yardstick report ' + 'generate %s "', self.task_id) LOG.info("Task ALL DONE, exiting") return result @@ -232,11 +228,12 @@ class Task(object): # pragma: no cover def _do_output(self, output_config, result): dispatchers = DispatcherBase.get(output_config) + dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb') for dispatcher in dispatchers: dispatcher.flush_result_data(result) - def _run(self, scenarios, run_in_parallel, output_file): + def _run(self, scenarios, run_in_parallel, output_config): """Deploys context and calls runners""" for context in self.contexts: context.deploy() @@ -247,36 +244,36 @@ class Task(object): # pragma: no cover # Start all background scenarios for scenario in filter(_is_background_scenario, scenarios): scenario["runner"] = dict(type="Duration", duration=1000000000) - runner = self.run_one_scenario(scenario, output_file) + runner = self.run_one_scenario(scenario, output_config) background_runners.append(runner) runners = [] if run_in_parallel: for scenario in scenarios: if not _is_background_scenario(scenario): - runner = self.run_one_scenario(scenario, output_file) + runner = self.run_one_scenario(scenario, output_config) runners.append(runner) # Wait for runners to finish for runner in runners: - status = runner_join(runner) + status = runner_join(runner, background_runners, self.outputs, result) if status != 0: - raise RuntimeError - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) - LOG.info("Runner ended, output in %s", output_file) + raise RuntimeError( + "{0} runner status {1}".format(runner.__execution_type__, status)) + LOG.info("Runner ended") else: # run serially for scenario in scenarios: if not _is_background_scenario(scenario): - runner = self.run_one_scenario(scenario, output_file) - status = runner_join(runner) + runner = self.run_one_scenario(scenario, output_config) + status = runner_join(runner, background_runners, self.outputs, result) if status != 0: - LOG.error('Scenario: %s ERROR', scenario.get('type')) - raise RuntimeError - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) - LOG.info("Runner ended, output in %s", output_file) + LOG.error('Scenario NO.%s: "%s" ERROR!', + scenarios.index(scenario) + 1, + scenario.get('type')) + raise RuntimeError( + "{0} runner status {1}".format(runner.__execution_type__, status)) + LOG.info("Runner ended") # Abort background runners for runner in background_runners: @@ -284,15 +281,13 @@ class Task(object): # pragma: no cover # Wait for background runners to finish for runner in background_runners: - status = runner.join(JOIN_TIMEOUT) + status = runner.join(self.outputs, result) if status is None: # Nuke if it did not stop nicely base_runner.Runner.terminate(runner) - runner.join(JOIN_TIMEOUT) + runner.join(self.outputs, result) base_runner.Runner.release(runner) - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) print("Background task ended") return result @@ -315,33 +310,64 @@ class Task(object): # pragma: no cover else: return op - def run_one_scenario(self, scenario_cfg, output_file): + def _parse_tasks(self, parser, task_files, args, task_args, + task_args_fnames): + tasks = [] + + # Parse task_files. + for i, _ in enumerate(task_files): + parser.path = task_files[i] + tasks.append(parser.parse_task(self.task_id, task_args[i], + task_args_fnames[i])) + tasks[i]['case_name'] = os.path.splitext( + os.path.basename(task_files[i]))[0] + + if args.render_only: + utils.makedirs(args.render_only) + for idx, task in enumerate(tasks): + output_file_name = os.path.abspath(os.path.join( + args.render_only, + '{0:03d}-{1}.yml'.format(idx, task['case_name']))) + utils.write_file(output_file_name, task['rendered']) + + sys.exit(0) + + return tasks + + def run_one_scenario(self, scenario_cfg, output_config): """run one scenario using context""" runner_cfg = scenario_cfg["runner"] - runner_cfg['output_filename'] = output_file + runner_cfg['output_config'] = output_config options = scenario_cfg.get('options', {}) scenario_cfg['options'] = self._parse_options(options) # TODO support get multi hosts/vms info context_cfg = {} - if "host" in scenario_cfg: - context_cfg['host'] = Context.get_server(scenario_cfg["host"]) + server_name = scenario_cfg.get('options', {}).get('server_name', {}) - if "target" in scenario_cfg: - if is_ip_addr(scenario_cfg["target"]): - context_cfg['target'] = {} - context_cfg['target']["ipaddr"] = scenario_cfg["target"] + def config_context_target(cfg): + target = cfg['target'] + if is_ip_addr(target): + context_cfg['target'] = {"ipaddr": target} else: - context_cfg['target'] = Context.get_server( - scenario_cfg["target"]) - if self._is_same_heat_context(scenario_cfg["host"], - scenario_cfg["target"]): - context_cfg["target"]["ipaddr"] = \ - context_cfg["target"]["private_ip"] + context_cfg['target'] = Context.get_server(target) + if self._is_same_context(cfg["host"], target): + context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"] else: - context_cfg["target"]["ipaddr"] = \ - context_cfg["target"]["ip"] + context_cfg['target']["ipaddr"] = context_cfg['target']["ip"] + + host_name = server_name.get('host', scenario_cfg.get('host')) + if host_name: + context_cfg['host'] = Context.get_server(host_name) + + for item in [server_name, scenario_cfg]: + try: + config_context_target(item) + except KeyError: + LOG.debug("Got a KeyError in config_context_target(%s)", item) + else: + break if "targets" in scenario_cfg: ip_list = [] @@ -351,8 +377,8 @@ class Task(object): # pragma: no cover context_cfg['target'] = {} else: context_cfg['target'] = Context.get_server(target) - if self._is_same_heat_context(scenario_cfg["host"], - target): + if self._is_same_context(scenario_cfg["host"], + target): ip_list.append(context_cfg["target"]["private_ip"]) else: ip_list.append(context_cfg["target"]["ip"]) @@ -362,6 +388,7 @@ class Task(object): # pragma: no cover context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg) context_cfg["networks"] = get_networks_from_nodes( context_cfg["nodes"]) + runner = base_runner.Runner.get(runner_cfg) LOG.info("Starting runner of type '%s'", runner_cfg["type"]) @@ -369,7 +396,7 @@ class Task(object): # pragma: no cover return runner - def _is_same_heat_context(self, host_attr, target_attr): + def _is_same_context(self, host_attr, target_attr): """check if two servers are in the same heat context host_attr: either a name for a server created by yardstick or a dict with attribute name mapping when using external heat templates @@ -377,7 +404,7 @@ class Task(object): # pragma: no cover with attribute name mapping when using external heat templates """ for context in self.contexts: - if context.__context_type__ != "Heat": + if context.__context_type__ not in {"Heat", "Kubernetes"}: continue host = context._get_server(host_attr) @@ -472,33 +499,42 @@ class TaskParser(object): # pragma: no cover return valid_task_files, valid_task_args, valid_task_args_fnames - def parse_task(self, task_id, task_args=None, task_args_file=None): - """parses the task file and return an context and scenario instances""" - LOG.info("Parsing task config: %s", self.path) + def _render_task(self, task_args, task_args_file): + """Render the input task with the given arguments + :param task_args: (dict) arguments to render the task + :param task_args_file: (str) file containing the arguments to render + the task + :return: (str) task file rendered + """ try: kw = {} if task_args_file: with open(task_args_file) as f: - kw.update(parse_task_args("task_args_file", f.read())) - kw.update(parse_task_args("task_args", task_args)) + kw.update(parse_task_args('task_args_file', f.read())) + kw.update(parse_task_args('task_args', task_args)) except TypeError: - raise TypeError() + raise y_exc.TaskRenderArgumentError() + input_task = None try: with open(self.path) as f: - try: - input_task = f.read() - rendered_task = TaskTemplate.render(input_task, **kw) - except Exception as e: - LOG.exception('Failed to render template:\n%s\n', input_task) - raise e - LOG.debug("Input task is:\n%s\n", rendered_task) - - cfg = yaml_load(rendered_task) - except IOError as ioerror: - sys.exit(ioerror) + input_task = f.read() + rendered_task = task_template.TaskTemplate.render(input_task, **kw) + LOG.debug('Input task is:\n%s', rendered_task) + parsed_task = yaml_load(rendered_task) + except (IOError, OSError): + raise y_exc.TaskReadError(task_file=self.path) + except Exception: + raise y_exc.TaskRenderError(input_task=input_task) + + return parsed_task, rendered_task + def parse_task(self, task_id, task_args=None, task_args_file=None): + """parses the task file and return an context and scenario instances""" + LOG.info("Parsing task config: %s", self.path) + + cfg, rendered = self._render_task(task_args, task_args_file) self._check_schema(cfg["schema"], "task") meet_precondition = self._check_precondition(cfg) @@ -512,17 +548,15 @@ class TaskParser(object): # pragma: no cover context_cfgs = [{"type": "Dummy"}] contexts = [] - name_suffix = '-{}'.format(task_id[:8]) for cfg_attrs in context_cfgs: - try: - cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'], - name_suffix) - except KeyError: - pass + + cfg_attrs['task_id'] = task_id # default to Heat context because we are testing OpenStack context_type = cfg_attrs.get("type", "Heat") context = Context.get(context_type) context.init(cfg_attrs) + # Update the name in case the context has used the name_suffix + cfg_attrs['name'] = context.name contexts.append(context) run_in_parallel = cfg.get("run_in_parallel", False) @@ -536,16 +570,74 @@ class TaskParser(object): # pragma: no cover # relative to task path scenario["task_path"] = os.path.dirname(self.path) - change_server_name(scenario, name_suffix) - - try: - for node in scenario['nodes']: - scenario['nodes'][node] += name_suffix - except KeyError: - pass + self._change_node_names(scenario, contexts) # TODO we need something better here, a class that represent the file - return cfg["scenarios"], run_in_parallel, meet_precondition, contexts + return {'scenarios': cfg['scenarios'], + 'run_in_parallel': run_in_parallel, + 'meet_precondition': meet_precondition, + 'contexts': contexts, + 'rendered': rendered} + + @staticmethod + def _change_node_names(scenario, contexts): + """Change the node names in a scenario, depending on the context config + + The nodes (VMs or physical servers) are referred in the context section + with the name of the server and the name of the context: + . + + If the context is going to be undeployed at the end of the test, the + task ID is suffixed to the name to avoid interferences with previous + deployments. If the context needs to be deployed at the end of the + test, the name assigned is kept. + + There are several places where a node name could appear in the scenario + configuration: + scenario: + host: athena.demo + target: kratos.demo + targets: + - athena.demo + - kratos.demo + + scenario: + options: + server_name: # JIRA: YARDSTICK-810 + host: athena.demo + target: kratos.demo + + scenario: + nodes: + tg__0: tg_0.yardstick + vnf__0: vnf_0.yardstick + """ + def qualified_name(name): + node_name, context_name = name.split('.') + try: + ctx = next((context for context in contexts + if context.assigned_name == context_name)) + except StopIteration: + raise y_exc.ScenarioConfigContextNameNotFound( + context_name=context_name) + + return '{}.{}'.format(node_name, ctx.name) + + if 'host' in scenario: + scenario['host'] = qualified_name(scenario['host']) + if 'target' in scenario: + scenario['target'] = qualified_name(scenario['target']) + server_name = scenario.get('options', {}).get('server_name', {}) + if 'host' in server_name: + server_name['host'] = qualified_name(server_name['host']) + if 'target' in server_name: + server_name['target'] = qualified_name(server_name['target']) + if 'targets' in scenario: + for idx, target in enumerate(scenario['targets']): + scenario['targets'][idx] = qualified_name(target) + if 'nodes' in scenario: + for scenario_node, target in scenario['nodes'].items(): + scenario['nodes'][scenario_node] = qualified_name(target) def _check_schema(self, cfg_schema, schema_type): """Check if config file is using the correct schema type""" @@ -634,9 +726,23 @@ def get_networks_from_nodes(nodes): return networks -def runner_join(runner): - """join (wait for) a runner, exit process at runner failure""" - status = runner.join() +def runner_join(runner, background_runners, outputs, result): + """join (wait for) a runner, exit process at runner failure + :param background_runners: + :type background_runners: + :param outputs: + :type outputs: dict + :param result: + :type result: list + """ + while runner.poll() is None: + outputs.update(runner.get_output()) + result.extend(runner.get_result()) + # drain all the background runner queues + for background in background_runners: + outputs.update(background.get_output()) + result.extend(background.get_result()) + status = runner.join(outputs, result) base_runner.Runner.release(runner) return status @@ -665,31 +771,3 @@ def parse_task_args(src_name, args): % {"src": src_name, "src_type": type(kw)}) raise TypeError() return kw - - -def change_server_name(scenario, suffix): - try: - host = scenario['host'] - except KeyError: - pass - else: - try: - host['name'] += suffix - except TypeError: - scenario['host'] += suffix - - try: - target = scenario['target'] - except KeyError: - pass - else: - try: - target['name'] += suffix - except TypeError: - scenario['target'] += suffix - - try: - key = 'targets' - scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]] - except KeyError: - pass