1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
13 from collections import OrderedDict
23 from six.moves import filter
24 from jinja2 import Environment
26 from yardstick.benchmark.contexts.base import Context
27 from yardstick.benchmark.runners import base as base_runner
28 from yardstick.common.constants import CONF_FILE
29 from yardstick.common.yaml_loader import yaml_load
30 from yardstick.dispatcher.base import Base as DispatcherBase
31 from yardstick.common.task_template import TaskTemplate
32 from yardstick.common import utils
33 from yardstick.common import constants
34 from yardstick.common import exceptions
35 from yardstick.common.html_template import report_template
37 output_file_default = "/tmp/yardstick.out"
38 test_cases_dir_default = "tests/opnfv/test_cases/"
39 LOG = logging.getLogger(__name__)
42 class Task(object): # pragma: no cover
45 Set of commands to manage benchmark tasks.
52 def _set_dispatchers(self, output_config):
53 dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
55 out_types = [s.strip() for s in dispatchers.split(',')]
56 output_config['DEFAULT']['dispatcher'] = out_types
58 def start(self, args):
59 """Start a benchmark scenario."""
61 atexit.register(self.atexit_handler)
63 task_id = getattr(args, 'task_id')
64 self.task_id = task_id if task_id else str(uuid.uuid4())
69 output_config = utils.parse_ini_file(CONF_FILE)
70 except Exception: # pylint: disable=broad-except
71 # all error will be ignore, the default value is {}
74 self._init_output_config(output_config)
75 self._set_output_config(output_config, args.output_file)
76 LOG.debug('Output configuration is: %s', output_config)
78 self._set_dispatchers(output_config)
80 # update dispatcher list
81 if 'file' in output_config['DEFAULT']['dispatcher']:
82 result = {'status': 0, 'result': {}}
83 utils.write_json_to_file(args.output_file, result)
85 total_start_time = time.time()
86 parser = TaskParser(args.inputfile[0])
89 # 1.parse suite, return suite_params info
90 task_files, task_args, task_args_fnames = \
93 task_files = [parser.path]
94 task_args = [args.task_args]
95 task_args_fnames = [args.task_args_file]
97 LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
98 task_files, task_args, task_args_fnames)
105 for i in range(0, len(task_files)):
106 one_task_start_time = time.time()
107 parser.path = task_files[i]
108 scenarios, run_in_parallel, meet_precondition, contexts = \
109 parser.parse_task(self.task_id, task_args[i],
112 self.contexts.extend(contexts)
114 if not meet_precondition:
115 LOG.info("meet_precondition is %s, please check envrionment",
119 case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
121 data = self._run(scenarios, run_in_parallel, args.output_file)
122 except KeyboardInterrupt:
124 except Exception: # pylint: disable=broad-except
125 LOG.error('Testcase: "%s" FAILED!!!', case_name, exc_info=True)
126 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
128 LOG.info('Testcase: "%s" SUCCESS!!!', case_name)
129 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
132 # keep deployment, forget about stack
133 # (hide it for exit handler)
136 for context in self.contexts[::-1]:
139 one_task_end_time = time.time()
140 LOG.info("Task %s finished in %d secs", task_files[i],
141 one_task_end_time - one_task_start_time)
143 result = self._get_format_result(testcases)
145 self._do_output(output_config, result)
146 self._generate_reporting(result)
148 total_end_time = time.time()
149 LOG.info("Total finished in %d secs",
150 total_end_time - total_start_time)
152 scenario = scenarios[0]
153 LOG.info("To generate report, execute command "
154 "'yardstick report generate %(task_id)s %(tc)s'", scenario)
155 LOG.info("Task ALL DONE, exiting")
158 def _generate_reporting(self, result):
160 with open(constants.REPORTING_FILE, 'w') as f:
161 f.write(env.from_string(report_template).render(result))
163 LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
166 log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
167 log_formatter = logging.Formatter(log_format)
169 utils.makedirs(constants.TASK_LOG_DIR)
170 log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
171 log_handler = logging.FileHandler(log_path)
172 log_handler.setFormatter(log_formatter)
173 log_handler.setLevel(logging.DEBUG)
175 logging.root.addHandler(log_handler)
177 def _init_output_config(self, output_config):
178 output_config.setdefault('DEFAULT', {})
179 output_config.setdefault('dispatcher_http', {})
180 output_config.setdefault('dispatcher_file', {})
181 output_config.setdefault('dispatcher_influxdb', {})
182 output_config.setdefault('nsb', {})
184 def _set_output_config(self, output_config, file_path):
186 out_type = os.environ['DISPATCHER']
188 output_config['DEFAULT'].setdefault('dispatcher', 'file')
190 output_config['DEFAULT']['dispatcher'] = out_type
192 output_config['dispatcher_file']['file_path'] = file_path
195 target = os.environ['TARGET']
199 k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
200 output_config[k]['target'] = target
202 def _get_format_result(self, testcases):
203 criteria = self._get_task_criteria(testcases)
206 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
207 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
208 'pod_name': os.environ.get('NODE_NAME', 'unknown'),
209 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
215 'criteria': criteria,
216 'task_id': self.task_id,
218 'testcases': testcases
224 def _get_task_criteria(self, testcases):
225 criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
231 def _do_output(self, output_config, result):
232 dispatchers = DispatcherBase.get(output_config)
234 for dispatcher in dispatchers:
235 dispatcher.flush_result_data(result)
237 def _run(self, scenarios, run_in_parallel, output_file):
238 """Deploys context and calls runners"""
239 for context in self.contexts:
242 background_runners = []
245 # Start all background scenarios
246 for scenario in filter(_is_background_scenario, scenarios):
247 scenario["runner"] = dict(type="Duration", duration=1000000000)
248 runner = self.run_one_scenario(scenario, output_file)
249 background_runners.append(runner)
253 for scenario in scenarios:
254 if not _is_background_scenario(scenario):
255 runner = self.run_one_scenario(scenario, output_file)
256 runners.append(runner)
258 # Wait for runners to finish
259 for runner in runners:
260 status = runner_join(runner, background_runners, self.outputs, result)
263 "{0} runner status {1}".format(runner.__execution_type__, status))
264 LOG.info("Runner ended, output in %s", output_file)
267 for scenario in scenarios:
268 if not _is_background_scenario(scenario):
269 runner = self.run_one_scenario(scenario, output_file)
270 status = runner_join(runner, background_runners, self.outputs, result)
272 LOG.error('Scenario NO.%s: "%s" ERROR!',
273 scenarios.index(scenario) + 1,
274 scenario.get('type'))
276 "{0} runner status {1}".format(runner.__execution_type__, status))
277 LOG.info("Runner ended, output in %s", output_file)
279 # Abort background runners
280 for runner in background_runners:
283 # Wait for background runners to finish
284 for runner in background_runners:
285 status = runner.join(self.outputs, result)
287 # Nuke if it did not stop nicely
288 base_runner.Runner.terminate(runner)
289 runner.join(self.outputs, result)
290 base_runner.Runner.release(runner)
292 print("Background task ended")
295 def atexit_handler(self):
296 """handler for process termination"""
297 base_runner.Runner.terminate_all()
300 LOG.info("Undeploying all contexts")
301 for context in self.contexts[::-1]:
304 def _parse_options(self, op):
305 if isinstance(op, dict):
306 return {k: self._parse_options(v) for k, v in op.items()}
307 elif isinstance(op, list):
308 return [self._parse_options(v) for v in op]
309 elif isinstance(op, str):
310 return self.outputs.get(op[1:]) if op.startswith('$') else op
314 def run_one_scenario(self, scenario_cfg, output_file):
315 """run one scenario using context"""
316 runner_cfg = scenario_cfg["runner"]
317 runner_cfg['output_filename'] = output_file
319 options = scenario_cfg.get('options', {})
320 scenario_cfg['options'] = self._parse_options(options)
322 # TODO support get multi hosts/vms info
324 server_name = scenario_cfg.get('options', {}).get('server_name', {})
326 def config_context_target(cfg):
327 target = cfg['target']
328 if is_ip_addr(target):
329 context_cfg['target'] = {"ipaddr": target}
331 context_cfg['target'] = Context.get_server(target)
332 if self._is_same_context(cfg["host"], target):
333 context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
335 context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
337 host_name = server_name.get('host', scenario_cfg.get('host'))
339 context_cfg['host'] = Context.get_server(host_name)
341 for item in [server_name, scenario_cfg]:
343 config_context_target(item)
349 if "targets" in scenario_cfg:
351 for target in scenario_cfg["targets"]:
352 if is_ip_addr(target):
353 ip_list.append(target)
354 context_cfg['target'] = {}
356 context_cfg['target'] = Context.get_server(target)
357 if self._is_same_context(scenario_cfg["host"],
359 ip_list.append(context_cfg["target"]["private_ip"])
361 ip_list.append(context_cfg["target"]["ip"])
362 context_cfg['target']['ipaddr'] = ','.join(ip_list)
364 if "nodes" in scenario_cfg:
365 context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
366 context_cfg["networks"] = get_networks_from_nodes(
367 context_cfg["nodes"])
369 runner = base_runner.Runner.get(runner_cfg)
371 LOG.info("Starting runner of type '%s'", runner_cfg["type"])
372 runner.run(scenario_cfg, context_cfg)
376 def _is_same_context(self, host_attr, target_attr):
377 """check if two servers are in the same heat context
378 host_attr: either a name for a server created by yardstick or a dict
379 with attribute name mapping when using external heat templates
380 target_attr: either a name for a server created by yardstick or a dict
381 with attribute name mapping when using external heat templates
383 for context in self.contexts:
384 if context.__context_type__ not in {"Heat", "Kubernetes"}:
387 host = context._get_server(host_attr)
391 target = context._get_server(target_attr)
395 # Both host and target is not None, then they are in the
402 class TaskParser(object): # pragma: no cover
403 """Parser for task config files in yaml format"""
405 def __init__(self, path):
408 def _meet_constraint(self, task, cur_pod, cur_installer):
409 if "constraint" in task:
410 constraint = task.get('constraint', None)
411 if constraint is not None:
412 tc_fit_pod = constraint.get('pod', None)
413 tc_fit_installer = constraint.get('installer', None)
414 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
415 cur_pod, cur_installer, constraint)
416 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
418 if (cur_installer is None) or (tc_fit_installer and cur_installer
419 not in tc_fit_installer):
423 def _get_task_para(self, task, cur_pod):
424 task_args = task.get('task_args', None)
425 if task_args is not None:
426 task_args = task_args.get(cur_pod, task_args.get('default'))
427 task_args_fnames = task.get('task_args_fnames', None)
428 if task_args_fnames is not None:
429 task_args_fnames = task_args_fnames.get(cur_pod, None)
430 return task_args, task_args_fnames
432 def parse_suite(self):
433 """parse the suite file and return a list of task config file paths
434 and lists of optional parameters if present"""
435 LOG.info("\nParsing suite file:%s", self.path)
438 with open(self.path) as stream:
439 cfg = yaml_load(stream)
440 except IOError as ioerror:
443 self._check_schema(cfg["schema"], "suite")
444 LOG.info("\nStarting scenario:%s", cfg["name"])
446 test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
447 test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
449 if test_cases_dir[-1] != os.sep:
450 test_cases_dir += os.sep
452 cur_pod = os.environ.get('NODE_NAME', None)
453 cur_installer = os.environ.get('INSTALLER_TYPE', None)
455 valid_task_files = []
457 valid_task_args_fnames = []
459 for task in cfg["test_cases"]:
461 if "file_name" in task:
462 task_fname = task.get('file_name', None)
463 if task_fname is None:
468 if self._meet_constraint(task, cur_pod, cur_installer):
469 valid_task_files.append(test_cases_dir + task_fname)
472 # 3.fetch task parameters
473 task_args, task_args_fnames = self._get_task_para(task, cur_pod)
474 valid_task_args.append(task_args)
475 valid_task_args_fnames.append(task_args_fnames)
477 return valid_task_files, valid_task_args, valid_task_args_fnames
479 def parse_task(self, task_id, task_args=None, task_args_file=None):
480 """parses the task file and return an context and scenario instances"""
481 LOG.info("Parsing task config: %s", self.path)
486 with open(task_args_file) as f:
487 kw.update(parse_task_args("task_args_file", f.read()))
488 kw.update(parse_task_args("task_args", task_args))
493 with open(self.path) as f:
495 input_task = f.read()
496 rendered_task = TaskTemplate.render(input_task, **kw)
497 except Exception as e:
498 LOG.exception('Failed to render template:\n%s\n', input_task)
500 LOG.debug("Input task is:\n%s\n", rendered_task)
502 cfg = yaml_load(rendered_task)
503 except IOError as ioerror:
506 self._check_schema(cfg["schema"], "task")
507 meet_precondition = self._check_precondition(cfg)
509 # TODO: support one or many contexts? Many would simpler and precise
510 # TODO: support hybrid context type
512 context_cfgs = [cfg["context"]]
513 elif "contexts" in cfg:
514 context_cfgs = cfg["contexts"]
516 context_cfgs = [{"type": "Dummy"}]
519 for cfg_attrs in context_cfgs:
521 cfg_attrs['task_id'] = task_id
522 # default to Heat context because we are testing OpenStack
523 context_type = cfg_attrs.get("type", "Heat")
524 context = Context.get(context_type)
525 context.init(cfg_attrs)
526 contexts.append(context)
528 run_in_parallel = cfg.get("run_in_parallel", False)
530 # add tc and task id for influxdb extended tags
531 for scenario in cfg["scenarios"]:
532 task_name = os.path.splitext(os.path.basename(self.path))[0]
533 scenario["tc"] = task_name
534 scenario["task_id"] = task_id
535 # embed task path into scenario so we can load other files
536 # relative to task path
537 scenario["task_path"] = os.path.dirname(self.path)
539 self._change_node_names(scenario, contexts)
541 # TODO we need something better here, a class that represent the file
542 return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
545 def _change_node_names(scenario, contexts):
546 """Change the node names in a scenario, depending on the context config
548 The nodes (VMs or physical servers) are referred in the context section
549 with the name of the server and the name of the context:
550 <server name>.<context name>
552 If the context is going to be undeployed at the end of the test, the
553 task ID is suffixed to the name to avoid interferences with previous
554 deployments. If the context needs to be deployed at the end of the
555 test, the name assigned is kept.
557 There are several places where a node name could appear in the scenario
568 server_name: # JIRA: YARDSTICK-810
574 tg__0: tg_0.yardstick
575 vnf__0: vnf_0.yardstick
577 def qualified_name(name):
578 node_name, context_name = name.split('.')
580 ctx = next((context for context in contexts
581 if context.assigned_name == context_name))
582 except StopIteration:
583 raise exceptions.ScenarioConfigContextNameNotFound(
584 context_name=context_name)
586 return '{}.{}'.format(node_name, ctx.name)
588 if 'host' in scenario:
589 scenario['host'] = qualified_name(scenario['host'])
590 if 'target' in scenario:
591 scenario['target'] = qualified_name(scenario['target'])
592 server_name = scenario.get('options', {}).get('server_name', {})
593 if 'host' in server_name:
594 server_name['host'] = qualified_name(server_name['host'])
595 if 'target' in server_name:
596 server_name['target'] = qualified_name(server_name['target'])
597 if 'targets' in scenario:
598 for idx, target in enumerate(scenario['targets']):
599 scenario['targets'][idx] = qualified_name(target)
600 if 'nodes' in scenario:
601 for scenario_node, target in scenario['nodes'].items():
602 scenario['nodes'][scenario_node] = qualified_name(target)
604 def _check_schema(self, cfg_schema, schema_type):
605 """Check if config file is using the correct schema type"""
607 if cfg_schema != "yardstick:" + schema_type + ":0.1":
608 sys.exit("error: file %s has unknown schema %s" % (self.path,
611 def _check_precondition(self, cfg):
612 """Check if the environment meet the precondition"""
614 if "precondition" in cfg:
615 precondition = cfg["precondition"]
616 installer_type = precondition.get("installer_type", None)
617 deploy_scenarios = precondition.get("deploy_scenarios", None)
618 tc_fit_pods = precondition.get("pod_name", None)
619 installer_type_env = os.environ.get('INSTALL_TYPE', None)
620 deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
621 pod_name_env = os.environ.get('NODE_NAME', None)
623 LOG.info("installer_type: %s, installer_type_env: %s",
624 installer_type, installer_type_env)
625 LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
626 deploy_scenarios, deploy_scenario_env)
627 LOG.info("tc_fit_pods: %s, pod_name_env: %s",
628 tc_fit_pods, pod_name_env)
629 if installer_type and installer_type_env:
630 if installer_type_env not in installer_type:
632 if deploy_scenarios and deploy_scenario_env:
633 deploy_scenarios_list = deploy_scenarios.split(',')
634 for deploy_scenario in deploy_scenarios_list:
635 if deploy_scenario_env.startswith(deploy_scenario):
638 if tc_fit_pods and pod_name_env:
639 if pod_name_env not in tc_fit_pods:
644 def is_ip_addr(addr):
645 """check if string addr is an IP address"""
647 addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
648 except AttributeError:
652 ipaddress.ip_address(addr.encode('utf-8'))
659 def _is_background_scenario(scenario):
660 if "run_in_background" in scenario:
661 return scenario["run_in_background"]
666 def parse_nodes_with_context(scenario_cfg):
667 """parse the 'nodes' fields in scenario """
668 # ensure consistency in node instantiation order
669 return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
670 for nodename in sorted(scenario_cfg["nodes"]))
673 def get_networks_from_nodes(nodes):
674 """parse the 'nodes' fields in scenario """
676 for node in nodes.values():
679 interfaces = node.get('interfaces', {})
680 for interface in interfaces.values():
681 # vld_id is network_name
682 network_name = interface.get('network_name')
685 network = Context.get_network(network_name)
687 networks[network['name']] = network
691 def runner_join(runner, background_runners, outputs, result):
692 """join (wait for) a runner, exit process at runner failure
693 :param background_runners:
694 :type background_runners:
700 while runner.poll() is None:
701 outputs.update(runner.get_output())
702 result.extend(runner.get_result())
703 # drain all the background runner queues
704 for background in background_runners:
705 outputs.update(background.get_output())
706 result.extend(background.get_result())
707 status = runner.join(outputs, result)
708 base_runner.Runner.release(runner)
712 def print_invalid_header(source_name, args):
713 print("Invalid %(source)s passed:\n\n %(args)s\n"
714 % {"source": source_name, "args": args})
717 def parse_task_args(src_name, args):
718 if isinstance(args, collections.Mapping):
722 kw = args and yaml_load(args)
723 kw = {} if kw is None else kw
724 except yaml.parser.ParserError as e:
725 print_invalid_header(src_name, args)
726 print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
727 % {"source": src_name, "err": e})
730 if not isinstance(kw, dict):
731 print_invalid_header(src_name, args)
732 print("%(src)s had to be dict, actually %(src_type)s\n"
733 % {"src": src_name, "src_type": type(kw)})