1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 from collections import OrderedDict
22 from six.moves import filter
23 from jinja2 import Environment
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.constants import CONF_FILE
28 from yardstick.common.yaml_loader import yaml_load
29 from yardstick.dispatcher.base import Base as DispatcherBase
30 from yardstick.common import constants
31 from yardstick.common import exceptions as y_exc
32 from yardstick.common import task_template
33 from yardstick.common import utils
34 from yardstick.common.html_template import report_template
36 output_file_default = "/tmp/yardstick.out"
37 test_cases_dir_default = "tests/opnfv/test_cases/"
38 LOG = logging.getLogger(__name__)
41 class Task(object): # pragma: no cover
44 Set of commands to manage benchmark tasks.
51 def _set_dispatchers(self, output_config):
52 dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
54 out_types = [s.strip() for s in dispatchers.split(',')]
55 output_config['DEFAULT']['dispatcher'] = out_types
57 def start(self, args, **kwargs): # pylint: disable=unused-argument
58 """Start a benchmark scenario."""
60 atexit.register(self.atexit_handler)
62 task_id = getattr(args, 'task_id')
63 self.task_id = task_id if task_id else str(uuid.uuid4())
68 output_config = utils.parse_ini_file(CONF_FILE)
69 except Exception: # pylint: disable=broad-except
70 # all error will be ignore, the default value is {}
73 self._init_output_config(output_config)
74 self._set_output_config(output_config, args.output_file)
75 LOG.debug('Output configuration is: %s', output_config)
77 self._set_dispatchers(output_config)
79 # update dispatcher list
80 if 'file' in output_config['DEFAULT']['dispatcher']:
81 result = {'status': 0, 'result': {}}
82 utils.write_json_to_file(args.output_file, result)
84 total_start_time = time.time()
85 parser = TaskParser(args.inputfile[0])
88 # 1.parse suite, return suite_params info
89 task_files, task_args, task_args_fnames = parser.parse_suite()
91 task_files = [parser.path]
92 task_args = [args.task_args]
93 task_args_fnames = [args.task_args_file]
95 LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
96 task_files, task_args, task_args_fnames)
102 tasks = self._parse_tasks(parser, task_files, args, task_args,
105 # Execute task files.
106 for i, _ in enumerate(task_files):
107 one_task_start_time = time.time()
108 self.contexts.extend(tasks[i]['contexts'])
109 if not tasks[i]['meet_precondition']:
110 LOG.info('"meet_precondition" is %s, please check environment',
111 tasks[i]['meet_precondition'])
115 data = self._run(tasks[i]['scenarios'],
116 tasks[i]['run_in_parallel'],
118 except KeyboardInterrupt:
120 except Exception: # pylint: disable=broad-except
121 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
123 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
126 LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name'])
127 testcases[tasks[i]['case_name']] = {'criteria': 'PASS',
131 # keep deployment, forget about stack
132 # (hide it for exit handler)
135 for context in self.contexts[::-1]:
138 one_task_end_time = time.time()
139 LOG.info("Task %s finished in %d secs", task_files[i],
140 one_task_end_time - one_task_start_time)
142 result = self._get_format_result(testcases)
144 self._do_output(output_config, result)
145 self._generate_reporting(result)
147 total_end_time = time.time()
148 LOG.info("Total finished in %d secs",
149 total_end_time - total_start_time)
151 LOG.info('To generate report, execute command "yardstick report '
152 'generate %s <YAML_NAME>"', self.task_id)
153 LOG.info("Task ALL DONE, exiting")
156 def _generate_reporting(self, result):
158 with open(constants.REPORTING_FILE, 'w') as f:
159 f.write(env.from_string(report_template).render(result))
161 LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
164 log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
165 log_formatter = logging.Formatter(log_format)
167 utils.makedirs(constants.TASK_LOG_DIR)
168 log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
169 log_handler = logging.FileHandler(log_path)
170 log_handler.setFormatter(log_formatter)
171 log_handler.setLevel(logging.DEBUG)
173 logging.root.addHandler(log_handler)
175 def _init_output_config(self, output_config):
176 output_config.setdefault('DEFAULT', {})
177 output_config.setdefault('dispatcher_http', {})
178 output_config.setdefault('dispatcher_file', {})
179 output_config.setdefault('dispatcher_influxdb', {})
180 output_config.setdefault('nsb', {})
182 def _set_output_config(self, output_config, file_path):
184 out_type = os.environ['DISPATCHER']
186 output_config['DEFAULT'].setdefault('dispatcher', 'file')
188 output_config['DEFAULT']['dispatcher'] = out_type
190 output_config['dispatcher_file']['file_path'] = file_path
193 target = os.environ['TARGET']
197 k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
198 output_config[k]['target'] = target
200 def _get_format_result(self, testcases):
201 criteria = self._get_task_criteria(testcases)
204 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
205 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
206 'pod_name': os.environ.get('NODE_NAME', 'unknown'),
207 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
213 'criteria': criteria,
214 'task_id': self.task_id,
216 'testcases': testcases
222 def _get_task_criteria(self, testcases):
223 criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
229 def _do_output(self, output_config, result):
230 dispatchers = DispatcherBase.get(output_config)
231 dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
233 for dispatcher in dispatchers:
234 dispatcher.flush_result_data(result)
236 def _run(self, scenarios, run_in_parallel, output_config):
237 """Deploys context and calls runners"""
238 for context in self.contexts:
241 background_runners = []
244 # Start all background scenarios
245 for scenario in filter(_is_background_scenario, scenarios):
246 scenario["runner"] = dict(type="Duration", duration=1000000000)
247 runner = self.run_one_scenario(scenario, output_config)
248 background_runners.append(runner)
252 for scenario in scenarios:
253 if not _is_background_scenario(scenario):
254 runner = self.run_one_scenario(scenario, output_config)
255 runners.append(runner)
257 # Wait for runners to finish
258 for runner in runners:
259 status = runner_join(runner, background_runners, self.outputs, result)
262 "{0} runner status {1}".format(runner.__execution_type__, status))
263 LOG.info("Runner ended")
266 for scenario in scenarios:
267 if not _is_background_scenario(scenario):
268 runner = self.run_one_scenario(scenario, output_config)
269 status = runner_join(runner, background_runners, self.outputs, result)
271 LOG.error('Scenario NO.%s: "%s" ERROR!',
272 scenarios.index(scenario) + 1,
273 scenario.get('type'))
275 "{0} runner status {1}".format(runner.__execution_type__, status))
276 LOG.info("Runner ended")
278 # Abort background runners
279 for runner in background_runners:
282 # Wait for background runners to finish
283 for runner in background_runners:
284 status = runner.join(self.outputs, result)
286 # Nuke if it did not stop nicely
287 base_runner.Runner.terminate(runner)
288 runner.join(self.outputs, result)
289 base_runner.Runner.release(runner)
291 print("Background task ended")
294 def atexit_handler(self):
295 """handler for process termination"""
296 base_runner.Runner.terminate_all()
299 LOG.info("Undeploying all contexts")
300 for context in self.contexts[::-1]:
303 def _parse_options(self, op):
304 if isinstance(op, dict):
305 return {k: self._parse_options(v) for k, v in op.items()}
306 elif isinstance(op, list):
307 return [self._parse_options(v) for v in op]
308 elif isinstance(op, str):
309 return self.outputs.get(op[1:]) if op.startswith('$') else op
313 def _parse_tasks(self, parser, task_files, args, task_args,
318 for i, _ in enumerate(task_files):
319 parser.path = task_files[i]
320 tasks.append(parser.parse_task(self.task_id, task_args[i],
321 task_args_fnames[i]))
322 tasks[i]['case_name'] = os.path.splitext(
323 os.path.basename(task_files[i]))[0]
326 utils.makedirs(args.render_only)
327 for idx, task in enumerate(tasks):
328 output_file_name = os.path.abspath(os.path.join(
330 '{0:03d}-{1}.yml'.format(idx, task['case_name'])))
331 utils.write_file(output_file_name, task['rendered'])
337 def run_one_scenario(self, scenario_cfg, output_config):
338 """run one scenario using context"""
339 runner_cfg = scenario_cfg["runner"]
340 runner_cfg['output_config'] = output_config
342 options = scenario_cfg.get('options', {})
343 scenario_cfg['options'] = self._parse_options(options)
345 # TODO support get multi hosts/vms info
347 options = scenario_cfg.get('options') or {}
348 server_name = options.get('server_name') or {}
350 def config_context_target(cfg):
351 target = cfg['target']
352 if is_ip_addr(target):
353 context_cfg['target'] = {"ipaddr": target}
355 context_cfg['target'] = Context.get_server(target)
356 if self._is_same_context(cfg["host"], target):
357 context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
359 context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
361 host_name = server_name.get('host', scenario_cfg.get('host'))
363 context_cfg['host'] = Context.get_server(host_name)
365 for item in [server_name, scenario_cfg]:
367 config_context_target(item)
369 LOG.debug("Got a KeyError in config_context_target(%s)", item)
373 if "targets" in scenario_cfg:
375 for target in scenario_cfg["targets"]:
376 if is_ip_addr(target):
377 ip_list.append(target)
378 context_cfg['target'] = {}
380 context_cfg['target'] = Context.get_server(target)
381 if self._is_same_context(scenario_cfg["host"],
383 ip_list.append(context_cfg["target"]["private_ip"])
385 ip_list.append(context_cfg["target"]["ip"])
386 context_cfg['target']['ipaddr'] = ','.join(ip_list)
388 if "nodes" in scenario_cfg:
389 context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
390 context_cfg["networks"] = get_networks_from_nodes(
391 context_cfg["nodes"])
393 runner = base_runner.Runner.get(runner_cfg)
395 LOG.info("Starting runner of type '%s'", runner_cfg["type"])
396 runner.run(scenario_cfg, context_cfg)
400 def _is_same_context(self, host_attr, target_attr):
401 """check if two servers are in the same heat context
402 host_attr: either a name for a server created by yardstick or a dict
403 with attribute name mapping when using external heat templates
404 target_attr: either a name for a server created by yardstick or a dict
405 with attribute name mapping when using external heat templates
407 for context in self.contexts:
408 if context.__context_type__ not in {"Heat", "Kubernetes"}:
411 host = context._get_server(host_attr)
415 target = context._get_server(target_attr)
419 # Both host and target is not None, then they are in the
426 class TaskParser(object): # pragma: no cover
427 """Parser for task config files in yaml format"""
429 def __init__(self, path):
432 def _meet_constraint(self, task, cur_pod, cur_installer):
433 if "constraint" in task:
434 constraint = task.get('constraint', None)
435 if constraint is not None:
436 tc_fit_pod = constraint.get('pod', None)
437 tc_fit_installer = constraint.get('installer', None)
438 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
439 cur_pod, cur_installer, constraint)
440 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
442 if (cur_installer is None) or (tc_fit_installer and cur_installer
443 not in tc_fit_installer):
447 def _get_task_para(self, task, cur_pod):
448 task_args = task.get('task_args', None)
449 if task_args is not None:
450 task_args = task_args.get(cur_pod, task_args.get('default'))
451 task_args_fnames = task.get('task_args_fnames', None)
452 if task_args_fnames is not None:
453 task_args_fnames = task_args_fnames.get(cur_pod, None)
454 return task_args, task_args_fnames
456 def parse_suite(self):
457 """parse the suite file and return a list of task config file paths
458 and lists of optional parameters if present"""
459 LOG.info("\nParsing suite file:%s", self.path)
462 with open(self.path) as stream:
463 cfg = yaml_load(stream)
464 except IOError as ioerror:
467 self._check_schema(cfg["schema"], "suite")
468 LOG.info("\nStarting scenario:%s", cfg["name"])
470 test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
471 test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
473 if test_cases_dir[-1] != os.sep:
474 test_cases_dir += os.sep
476 cur_pod = os.environ.get('NODE_NAME', None)
477 cur_installer = os.environ.get('INSTALLER_TYPE', None)
479 valid_task_files = []
481 valid_task_args_fnames = []
483 for task in cfg["test_cases"]:
485 if "file_name" in task:
486 task_fname = task.get('file_name', None)
487 if task_fname is None:
492 if self._meet_constraint(task, cur_pod, cur_installer):
493 valid_task_files.append(test_cases_dir + task_fname)
496 # 3.fetch task parameters
497 task_args, task_args_fnames = self._get_task_para(task, cur_pod)
498 valid_task_args.append(task_args)
499 valid_task_args_fnames.append(task_args_fnames)
501 return valid_task_files, valid_task_args, valid_task_args_fnames
503 def _render_task(self, task_args, task_args_file):
504 """Render the input task with the given arguments
506 :param task_args: (dict) arguments to render the task
507 :param task_args_file: (str) file containing the arguments to render
509 :return: (str) task file rendered
514 with open(task_args_file) as f:
515 kw.update(parse_task_args('task_args_file', f.read()))
516 kw.update(parse_task_args('task_args', task_args))
518 raise y_exc.TaskRenderArgumentError()
522 with open(self.path) as f:
523 input_task = f.read()
524 rendered_task = task_template.TaskTemplate.render(input_task, **kw)
525 LOG.debug('Input task is:\n%s', rendered_task)
526 parsed_task = yaml_load(rendered_task)
527 except (IOError, OSError):
528 raise y_exc.TaskReadError(task_file=self.path)
530 raise y_exc.TaskRenderError(input_task=input_task)
532 return parsed_task, rendered_task
534 def parse_task(self, task_id, task_args=None, task_args_file=None):
535 """parses the task file and return an context and scenario instances"""
536 LOG.info("Parsing task config: %s", self.path)
538 cfg, rendered = self._render_task(task_args, task_args_file)
539 self._check_schema(cfg["schema"], "task")
540 meet_precondition = self._check_precondition(cfg)
542 # TODO: support one or many contexts? Many would simpler and precise
543 # TODO: support hybrid context type
545 context_cfgs = [cfg["context"]]
546 elif "contexts" in cfg:
547 context_cfgs = cfg["contexts"]
549 context_cfgs = [{"type": "Dummy"}]
552 for cfg_attrs in context_cfgs:
554 cfg_attrs['task_id'] = task_id
555 # default to Heat context because we are testing OpenStack
556 context_type = cfg_attrs.get("type", "Heat")
557 context = Context.get(context_type)
558 context.init(cfg_attrs)
559 # Update the name in case the context has used the name_suffix
560 cfg_attrs['name'] = context.name
561 contexts.append(context)
563 run_in_parallel = cfg.get("run_in_parallel", False)
565 # add tc and task id for influxdb extended tags
566 for scenario in cfg["scenarios"]:
567 task_name = os.path.splitext(os.path.basename(self.path))[0]
568 scenario["tc"] = task_name
569 scenario["task_id"] = task_id
570 # embed task path into scenario so we can load other files
571 # relative to task path
572 scenario["task_path"] = os.path.dirname(self.path)
574 self._change_node_names(scenario, contexts)
576 # TODO we need something better here, a class that represent the file
577 return {'scenarios': cfg['scenarios'],
578 'run_in_parallel': run_in_parallel,
579 'meet_precondition': meet_precondition,
580 'contexts': contexts,
581 'rendered': rendered}
584 def _change_node_names(scenario, contexts):
585 """Change the node names in a scenario, depending on the context config
587 The nodes (VMs or physical servers) are referred in the context section
588 with the name of the server and the name of the context:
589 <server name>.<context name>
591 If the context is going to be undeployed at the end of the test, the
592 task ID is suffixed to the name to avoid interferences with previous
593 deployments. If the context needs to be deployed at the end of the
594 test, the name assigned is kept.
596 There are several places where a node name could appear in the scenario
607 server_name: # JIRA: YARDSTICK-810
613 tg__0: tg_0.yardstick
614 vnf__0: vnf_0.yardstick
616 def qualified_name(name):
619 node_name, context_name = name.split('.')
622 # for kubernetes, some kubernetes resources don't support
623 # name format like 'xxx.xxx', so we use '-' instead
625 node_name, context_name = name.split('-')
629 ctx = next((context for context in contexts
630 if context.assigned_name == context_name))
631 except StopIteration:
632 raise y_exc.ScenarioConfigContextNameNotFound(
633 context_name=context_name)
635 return '{}{}{}'.format(node_name, sep, ctx.name)
637 if 'host' in scenario:
638 scenario['host'] = qualified_name(scenario['host'])
639 if 'target' in scenario:
640 scenario['target'] = qualified_name(scenario['target'])
641 options = scenario.get('options') or {}
642 server_name = options.get('server_name') or {}
643 if 'host' in server_name:
644 server_name['host'] = qualified_name(server_name['host'])
645 if 'target' in server_name:
646 server_name['target'] = qualified_name(server_name['target'])
647 if 'targets' in scenario:
648 for idx, target in enumerate(scenario['targets']):
649 scenario['targets'][idx] = qualified_name(target)
650 if 'nodes' in scenario:
651 for scenario_node, target in scenario['nodes'].items():
652 scenario['nodes'][scenario_node] = qualified_name(target)
654 def _check_schema(self, cfg_schema, schema_type):
655 """Check if config file is using the correct schema type"""
657 if cfg_schema != "yardstick:" + schema_type + ":0.1":
658 sys.exit("error: file %s has unknown schema %s" % (self.path,
661 def _check_precondition(self, cfg):
662 """Check if the environment meet the precondition"""
664 if "precondition" in cfg:
665 precondition = cfg["precondition"]
666 installer_type = precondition.get("installer_type", None)
667 deploy_scenarios = precondition.get("deploy_scenarios", None)
668 tc_fit_pods = precondition.get("pod_name", None)
669 installer_type_env = os.environ.get('INSTALL_TYPE', None)
670 deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
671 pod_name_env = os.environ.get('NODE_NAME', None)
673 LOG.info("installer_type: %s, installer_type_env: %s",
674 installer_type, installer_type_env)
675 LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
676 deploy_scenarios, deploy_scenario_env)
677 LOG.info("tc_fit_pods: %s, pod_name_env: %s",
678 tc_fit_pods, pod_name_env)
679 if installer_type and installer_type_env:
680 if installer_type_env not in installer_type:
682 if deploy_scenarios and deploy_scenario_env:
683 deploy_scenarios_list = deploy_scenarios.split(',')
684 for deploy_scenario in deploy_scenarios_list:
685 if deploy_scenario_env.startswith(deploy_scenario):
688 if tc_fit_pods and pod_name_env:
689 if pod_name_env not in tc_fit_pods:
694 def is_ip_addr(addr):
695 """check if string addr is an IP address"""
697 addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
698 except AttributeError:
702 ipaddress.ip_address(addr.encode('utf-8'))
709 def _is_background_scenario(scenario):
710 if "run_in_background" in scenario:
711 return scenario["run_in_background"]
716 def parse_nodes_with_context(scenario_cfg):
717 """parse the 'nodes' fields in scenario """
718 # ensure consistency in node instantiation order
719 return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
720 for nodename in sorted(scenario_cfg["nodes"]))
723 def get_networks_from_nodes(nodes):
724 """parse the 'nodes' fields in scenario """
726 for node in nodes.values():
729 interfaces = node.get('interfaces', {})
730 for interface in interfaces.values():
731 # vld_id is network_name
732 network_name = interface.get('network_name')
735 network = Context.get_network(network_name)
737 networks[network['name']] = network
741 def runner_join(runner, background_runners, outputs, result):
742 """join (wait for) a runner, exit process at runner failure
743 :param background_runners:
744 :type background_runners:
750 while runner.poll() is None:
751 outputs.update(runner.get_output())
752 result.extend(runner.get_result())
753 # drain all the background runner queues
754 for background in background_runners:
755 outputs.update(background.get_output())
756 result.extend(background.get_result())
757 status = runner.join(outputs, result)
758 base_runner.Runner.release(runner)
762 def print_invalid_header(source_name, args):
763 print("Invalid %(source)s passed:\n\n %(args)s\n"
764 % {"source": source_name, "args": args})
767 def parse_task_args(src_name, args):
768 if isinstance(args, collections.Mapping):
772 kw = args and yaml_load(args)
773 kw = {} if kw is None else kw
774 except yaml.parser.ParserError as e:
775 print_invalid_header(src_name, args)
776 print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
777 % {"source": src_name, "err": e})
780 if not isinstance(kw, dict):
781 print_invalid_header(src_name, args)
782 print("%(src)s had to be dict, actually %(src_type)s\n"
783 % {"src": src_name, "src_type": type(kw)})