1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
13 from collections import OrderedDict
23 from six.moves import filter
24 from jinja2 import Environment
26 from yardstick.benchmark.contexts.base import Context
27 from yardstick.benchmark.runners import base as base_runner
28 from yardstick.common.constants import CONF_FILE
29 from yardstick.common.yaml_loader import yaml_load
30 from yardstick.dispatcher.base import Base as DispatcherBase
31 from yardstick.common.task_template import TaskTemplate
32 from yardstick.common import utils
33 from yardstick.common import constants
34 from yardstick.common import exceptions
35 from yardstick.common.html_template import report_template
37 output_file_default = "/tmp/yardstick.out"
38 test_cases_dir_default = "tests/opnfv/test_cases/"
39 LOG = logging.getLogger(__name__)
42 class Task(object): # pragma: no cover
45 Set of commands to manage benchmark tasks.
52 def _set_dispatchers(self, output_config):
53 dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
55 out_types = [s.strip() for s in dispatchers.split(',')]
56 output_config['DEFAULT']['dispatcher'] = out_types
58 def start(self, args):
59 """Start a benchmark scenario."""
61 atexit.register(self.atexit_handler)
63 task_id = getattr(args, 'task_id')
64 self.task_id = task_id if task_id else str(uuid.uuid4())
69 output_config = utils.parse_ini_file(CONF_FILE)
70 except Exception: # pylint: disable=broad-except
71 # all error will be ignore, the default value is {}
74 self._init_output_config(output_config)
75 self._set_output_config(output_config, args.output_file)
76 LOG.debug('Output configuration is: %s', output_config)
78 self._set_dispatchers(output_config)
80 # update dispatcher list
81 if 'file' in output_config['DEFAULT']['dispatcher']:
82 result = {'status': 0, 'result': {}}
83 utils.write_json_to_file(args.output_file, result)
85 total_start_time = time.time()
86 parser = TaskParser(args.inputfile[0])
89 # 1.parse suite, return suite_params info
90 task_files, task_args, task_args_fnames = \
93 task_files = [parser.path]
94 task_args = [args.task_args]
95 task_args_fnames = [args.task_args_file]
97 LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
98 task_files, task_args, task_args_fnames)
105 for i in range(0, len(task_files)):
106 one_task_start_time = time.time()
107 parser.path = task_files[i]
108 scenarios, run_in_parallel, meet_precondition, contexts = \
109 parser.parse_task(self.task_id, task_args[i],
112 self.contexts.extend(contexts)
114 if not meet_precondition:
115 LOG.info("meet_precondition is %s, please check envrionment",
119 case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
121 data = self._run(scenarios, run_in_parallel, args.output_file)
122 except KeyboardInterrupt:
124 except Exception: # pylint: disable=broad-except
125 LOG.error('Testcase: "%s" FAILED!!!', case_name, exc_info=True)
126 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
128 LOG.info('Testcase: "%s" SUCCESS!!!', case_name)
129 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
132 # keep deployment, forget about stack
133 # (hide it for exit handler)
136 for context in self.contexts[::-1]:
139 one_task_end_time = time.time()
140 LOG.info("Task %s finished in %d secs", task_files[i],
141 one_task_end_time - one_task_start_time)
143 result = self._get_format_result(testcases)
145 self._do_output(output_config, result)
146 self._generate_reporting(result)
148 total_end_time = time.time()
149 LOG.info("Total finished in %d secs",
150 total_end_time - total_start_time)
152 scenario = scenarios[0]
153 LOG.info("To generate report, execute command "
154 "'yardstick report generate %(task_id)s %(tc)s'", scenario)
155 LOG.info("Task ALL DONE, exiting")
158 def _generate_reporting(self, result):
160 with open(constants.REPORTING_FILE, 'w') as f:
161 f.write(env.from_string(report_template).render(result))
163 LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
166 log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
167 log_formatter = logging.Formatter(log_format)
169 utils.makedirs(constants.TASK_LOG_DIR)
170 log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
171 log_handler = logging.FileHandler(log_path)
172 log_handler.setFormatter(log_formatter)
173 log_handler.setLevel(logging.DEBUG)
175 logging.root.addHandler(log_handler)
177 def _init_output_config(self, output_config):
178 output_config.setdefault('DEFAULT', {})
179 output_config.setdefault('dispatcher_http', {})
180 output_config.setdefault('dispatcher_file', {})
181 output_config.setdefault('dispatcher_influxdb', {})
182 output_config.setdefault('nsb', {})
184 def _set_output_config(self, output_config, file_path):
186 out_type = os.environ['DISPATCHER']
188 output_config['DEFAULT'].setdefault('dispatcher', 'file')
190 output_config['DEFAULT']['dispatcher'] = out_type
192 output_config['dispatcher_file']['file_path'] = file_path
195 target = os.environ['TARGET']
199 k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
200 output_config[k]['target'] = target
202 def _get_format_result(self, testcases):
203 criteria = self._get_task_criteria(testcases)
206 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
207 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
208 'pod_name': os.environ.get('NODE_NAME', 'unknown'),
209 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
215 'criteria': criteria,
216 'task_id': self.task_id,
218 'testcases': testcases
224 def _get_task_criteria(self, testcases):
225 criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
231 def _do_output(self, output_config, result):
232 dispatchers = DispatcherBase.get(output_config)
234 for dispatcher in dispatchers:
235 dispatcher.flush_result_data(result)
237 def _run(self, scenarios, run_in_parallel, output_file):
238 """Deploys context and calls runners"""
239 for context in self.contexts:
242 background_runners = []
245 # Start all background scenarios
246 for scenario in filter(_is_background_scenario, scenarios):
247 scenario["runner"] = dict(type="Duration", duration=1000000000)
248 runner = self.run_one_scenario(scenario, output_file)
249 background_runners.append(runner)
253 for scenario in scenarios:
254 if not _is_background_scenario(scenario):
255 runner = self.run_one_scenario(scenario, output_file)
256 runners.append(runner)
258 # Wait for runners to finish
259 for runner in runners:
260 status = runner_join(runner, background_runners, self.outputs, result)
263 "{0} runner status {1}".format(runner.__execution_type__, status))
264 LOG.info("Runner ended, output in %s", output_file)
267 for scenario in scenarios:
268 if not _is_background_scenario(scenario):
269 runner = self.run_one_scenario(scenario, output_file)
270 status = runner_join(runner, background_runners, self.outputs, result)
272 LOG.error('Scenario NO.%s: "%s" ERROR!',
273 scenarios.index(scenario) + 1,
274 scenario.get('type'))
276 "{0} runner status {1}".format(runner.__execution_type__, status))
277 LOG.info("Runner ended, output in %s", output_file)
279 # Abort background runners
280 for runner in background_runners:
283 # Wait for background runners to finish
284 for runner in background_runners:
285 status = runner.join(self.outputs, result)
287 # Nuke if it did not stop nicely
288 base_runner.Runner.terminate(runner)
289 runner.join(self.outputs, result)
290 base_runner.Runner.release(runner)
292 print("Background task ended")
295 def atexit_handler(self):
296 """handler for process termination"""
297 base_runner.Runner.terminate_all()
300 LOG.info("Undeploying all contexts")
301 for context in self.contexts[::-1]:
304 def _parse_options(self, op):
305 if isinstance(op, dict):
306 return {k: self._parse_options(v) for k, v in op.items()}
307 elif isinstance(op, list):
308 return [self._parse_options(v) for v in op]
309 elif isinstance(op, str):
310 return self.outputs.get(op[1:]) if op.startswith('$') else op
314 def run_one_scenario(self, scenario_cfg, output_file):
315 """run one scenario using context"""
316 runner_cfg = scenario_cfg["runner"]
317 runner_cfg['output_filename'] = output_file
319 options = scenario_cfg.get('options', {})
320 scenario_cfg['options'] = self._parse_options(options)
322 # TODO support get multi hosts/vms info
324 server_name = scenario_cfg.get('options', {}).get('server_name', {})
326 def config_context_target(cfg):
327 target = cfg['target']
328 if is_ip_addr(target):
329 context_cfg['target'] = {"ipaddr": target}
331 context_cfg['target'] = Context.get_server(target)
332 if self._is_same_context(cfg["host"], target):
333 context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
335 context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
337 host_name = server_name.get('host', scenario_cfg.get('host'))
339 context_cfg['host'] = Context.get_server(host_name)
341 for item in [server_name, scenario_cfg]:
343 config_context_target(item)
345 LOG.debug("Got a KeyError in config_context_target(%s)", item)
349 if "targets" in scenario_cfg:
351 for target in scenario_cfg["targets"]:
352 if is_ip_addr(target):
353 ip_list.append(target)
354 context_cfg['target'] = {}
356 context_cfg['target'] = Context.get_server(target)
357 if self._is_same_context(scenario_cfg["host"],
359 ip_list.append(context_cfg["target"]["private_ip"])
361 ip_list.append(context_cfg["target"]["ip"])
362 context_cfg['target']['ipaddr'] = ','.join(ip_list)
364 if "nodes" in scenario_cfg:
365 context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
366 context_cfg["networks"] = get_networks_from_nodes(
367 context_cfg["nodes"])
369 runner = base_runner.Runner.get(runner_cfg)
371 LOG.info("Starting runner of type '%s'", runner_cfg["type"])
372 runner.run(scenario_cfg, context_cfg)
376 def _is_same_context(self, host_attr, target_attr):
377 """check if two servers are in the same heat context
378 host_attr: either a name for a server created by yardstick or a dict
379 with attribute name mapping when using external heat templates
380 target_attr: either a name for a server created by yardstick or a dict
381 with attribute name mapping when using external heat templates
383 for context in self.contexts:
384 if context.__context_type__ not in {"Heat", "Kubernetes"}:
387 host = context._get_server(host_attr)
391 target = context._get_server(target_attr)
395 # Both host and target is not None, then they are in the
402 class TaskParser(object): # pragma: no cover
403 """Parser for task config files in yaml format"""
405 def __init__(self, path):
408 def _meet_constraint(self, task, cur_pod, cur_installer):
409 if "constraint" in task:
410 constraint = task.get('constraint', None)
411 if constraint is not None:
412 tc_fit_pod = constraint.get('pod', None)
413 tc_fit_installer = constraint.get('installer', None)
414 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
415 cur_pod, cur_installer, constraint)
416 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
418 if (cur_installer is None) or (tc_fit_installer and cur_installer
419 not in tc_fit_installer):
423 def _get_task_para(self, task, cur_pod):
424 task_args = task.get('task_args', None)
425 if task_args is not None:
426 task_args = task_args.get(cur_pod, task_args.get('default'))
427 task_args_fnames = task.get('task_args_fnames', None)
428 if task_args_fnames is not None:
429 task_args_fnames = task_args_fnames.get(cur_pod, None)
430 return task_args, task_args_fnames
432 def parse_suite(self):
433 """parse the suite file and return a list of task config file paths
434 and lists of optional parameters if present"""
435 LOG.info("\nParsing suite file:%s", self.path)
438 with open(self.path) as stream:
439 cfg = yaml_load(stream)
440 except IOError as ioerror:
443 self._check_schema(cfg["schema"], "suite")
444 LOG.info("\nStarting scenario:%s", cfg["name"])
446 test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
447 test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
449 if test_cases_dir[-1] != os.sep:
450 test_cases_dir += os.sep
452 cur_pod = os.environ.get('NODE_NAME', None)
453 cur_installer = os.environ.get('INSTALLER_TYPE', None)
455 valid_task_files = []
457 valid_task_args_fnames = []
459 for task in cfg["test_cases"]:
461 if "file_name" in task:
462 task_fname = task.get('file_name', None)
463 if task_fname is None:
468 if self._meet_constraint(task, cur_pod, cur_installer):
469 valid_task_files.append(test_cases_dir + task_fname)
472 # 3.fetch task parameters
473 task_args, task_args_fnames = self._get_task_para(task, cur_pod)
474 valid_task_args.append(task_args)
475 valid_task_args_fnames.append(task_args_fnames)
477 return valid_task_files, valid_task_args, valid_task_args_fnames
479 def parse_task(self, task_id, task_args=None, task_args_file=None):
480 """parses the task file and return an context and scenario instances"""
481 LOG.info("Parsing task config: %s", self.path)
486 with open(task_args_file) as f:
487 kw.update(parse_task_args("task_args_file", f.read()))
488 kw.update(parse_task_args("task_args", task_args))
493 with open(self.path) as f:
495 input_task = f.read()
496 rendered_task = TaskTemplate.render(input_task, **kw)
497 except Exception as e:
498 LOG.exception('Failed to render template:\n%s\n', input_task)
500 LOG.debug("Input task is:\n%s\n", rendered_task)
502 cfg = yaml_load(rendered_task)
503 except IOError as ioerror:
506 self._check_schema(cfg["schema"], "task")
507 meet_precondition = self._check_precondition(cfg)
509 # TODO: support one or many contexts? Many would simpler and precise
510 # TODO: support hybrid context type
512 context_cfgs = [cfg["context"]]
513 elif "contexts" in cfg:
514 context_cfgs = cfg["contexts"]
516 context_cfgs = [{"type": "Dummy"}]
519 for cfg_attrs in context_cfgs:
521 cfg_attrs['task_id'] = task_id
522 # default to Heat context because we are testing OpenStack
523 context_type = cfg_attrs.get("type", "Heat")
524 context = Context.get(context_type)
525 context.init(cfg_attrs)
526 # Update the name in case the context has used the name_suffix
527 cfg_attrs['name'] = context.name
528 contexts.append(context)
530 run_in_parallel = cfg.get("run_in_parallel", False)
532 # add tc and task id for influxdb extended tags
533 for scenario in cfg["scenarios"]:
534 task_name = os.path.splitext(os.path.basename(self.path))[0]
535 scenario["tc"] = task_name
536 scenario["task_id"] = task_id
537 # embed task path into scenario so we can load other files
538 # relative to task path
539 scenario["task_path"] = os.path.dirname(self.path)
541 self._change_node_names(scenario, contexts)
543 # TODO we need something better here, a class that represent the file
544 return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
547 def _change_node_names(scenario, contexts):
548 """Change the node names in a scenario, depending on the context config
550 The nodes (VMs or physical servers) are referred in the context section
551 with the name of the server and the name of the context:
552 <server name>.<context name>
554 If the context is going to be undeployed at the end of the test, the
555 task ID is suffixed to the name to avoid interferences with previous
556 deployments. If the context needs to be deployed at the end of the
557 test, the name assigned is kept.
559 There are several places where a node name could appear in the scenario
570 server_name: # JIRA: YARDSTICK-810
576 tg__0: tg_0.yardstick
577 vnf__0: vnf_0.yardstick
579 def qualified_name(name):
580 node_name, context_name = name.split('.')
582 ctx = next((context for context in contexts
583 if context.assigned_name == context_name))
584 except StopIteration:
585 raise exceptions.ScenarioConfigContextNameNotFound(
586 context_name=context_name)
588 return '{}.{}'.format(node_name, ctx.name)
590 if 'host' in scenario:
591 scenario['host'] = qualified_name(scenario['host'])
592 if 'target' in scenario:
593 scenario['target'] = qualified_name(scenario['target'])
594 server_name = scenario.get('options', {}).get('server_name', {})
595 if 'host' in server_name:
596 server_name['host'] = qualified_name(server_name['host'])
597 if 'target' in server_name:
598 server_name['target'] = qualified_name(server_name['target'])
599 if 'targets' in scenario:
600 for idx, target in enumerate(scenario['targets']):
601 scenario['targets'][idx] = qualified_name(target)
602 if 'nodes' in scenario:
603 for scenario_node, target in scenario['nodes'].items():
604 scenario['nodes'][scenario_node] = qualified_name(target)
606 def _check_schema(self, cfg_schema, schema_type):
607 """Check if config file is using the correct schema type"""
609 if cfg_schema != "yardstick:" + schema_type + ":0.1":
610 sys.exit("error: file %s has unknown schema %s" % (self.path,
613 def _check_precondition(self, cfg):
614 """Check if the environment meet the precondition"""
616 if "precondition" in cfg:
617 precondition = cfg["precondition"]
618 installer_type = precondition.get("installer_type", None)
619 deploy_scenarios = precondition.get("deploy_scenarios", None)
620 tc_fit_pods = precondition.get("pod_name", None)
621 installer_type_env = os.environ.get('INSTALL_TYPE', None)
622 deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
623 pod_name_env = os.environ.get('NODE_NAME', None)
625 LOG.info("installer_type: %s, installer_type_env: %s",
626 installer_type, installer_type_env)
627 LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
628 deploy_scenarios, deploy_scenario_env)
629 LOG.info("tc_fit_pods: %s, pod_name_env: %s",
630 tc_fit_pods, pod_name_env)
631 if installer_type and installer_type_env:
632 if installer_type_env not in installer_type:
634 if deploy_scenarios and deploy_scenario_env:
635 deploy_scenarios_list = deploy_scenarios.split(',')
636 for deploy_scenario in deploy_scenarios_list:
637 if deploy_scenario_env.startswith(deploy_scenario):
640 if tc_fit_pods and pod_name_env:
641 if pod_name_env not in tc_fit_pods:
646 def is_ip_addr(addr):
647 """check if string addr is an IP address"""
649 addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
650 except AttributeError:
654 ipaddress.ip_address(addr.encode('utf-8'))
661 def _is_background_scenario(scenario):
662 if "run_in_background" in scenario:
663 return scenario["run_in_background"]
668 def parse_nodes_with_context(scenario_cfg):
669 """parse the 'nodes' fields in scenario """
670 # ensure consistency in node instantiation order
671 return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
672 for nodename in sorted(scenario_cfg["nodes"]))
675 def get_networks_from_nodes(nodes):
676 """parse the 'nodes' fields in scenario """
678 for node in nodes.values():
681 interfaces = node.get('interfaces', {})
682 for interface in interfaces.values():
683 # vld_id is network_name
684 network_name = interface.get('network_name')
687 network = Context.get_network(network_name)
689 networks[network['name']] = network
693 def runner_join(runner, background_runners, outputs, result):
694 """join (wait for) a runner, exit process at runner failure
695 :param background_runners:
696 :type background_runners:
702 while runner.poll() is None:
703 outputs.update(runner.get_output())
704 result.extend(runner.get_result())
705 # drain all the background runner queues
706 for background in background_runners:
707 outputs.update(background.get_output())
708 result.extend(background.get_result())
709 status = runner.join(outputs, result)
710 base_runner.Runner.release(runner)
714 def print_invalid_header(source_name, args):
715 print("Invalid %(source)s passed:\n\n %(args)s\n"
716 % {"source": source_name, "args": args})
719 def parse_task_args(src_name, args):
720 if isinstance(args, collections.Mapping):
724 kw = args and yaml_load(args)
725 kw = {} if kw is None else kw
726 except yaml.parser.ParserError as e:
727 print_invalid_header(src_name, args)
728 print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
729 % {"source": src_name, "err": e})
732 if not isinstance(kw, dict):
733 print_invalid_header(src_name, args)
734 print("%(src)s had to be dict, actually %(src_type)s\n"
735 % {"src": src_name, "src_type": type(kw)})