1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
13 from collections import OrderedDict
23 from six.moves import filter
24 from jinja2 import Environment
26 from yardstick.benchmark.contexts.base import Context
27 from yardstick.benchmark.runners import base as base_runner
28 from yardstick.common.constants import CONF_FILE
29 from yardstick.common.yaml_loader import yaml_load
30 from yardstick.dispatcher.base import Base as DispatcherBase
31 from yardstick.common.task_template import TaskTemplate
32 from yardstick.common import utils
33 from yardstick.common import constants
34 from yardstick.common import exceptions
35 from yardstick.common.html_template import report_template
37 output_file_default = "/tmp/yardstick.out"
38 test_cases_dir_default = "tests/opnfv/test_cases/"
39 LOG = logging.getLogger(__name__)
42 class Task(object): # pragma: no cover
45 Set of commands to manage benchmark tasks.
52 def _set_dispatchers(self, output_config):
53 dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
55 out_types = [s.strip() for s in dispatchers.split(',')]
56 output_config['DEFAULT']['dispatcher'] = out_types
58 def start(self, args):
59 """Start a benchmark scenario."""
61 atexit.register(self.atexit_handler)
63 task_id = getattr(args, 'task_id')
64 self.task_id = task_id if task_id else str(uuid.uuid4())
69 output_config = utils.parse_ini_file(CONF_FILE)
70 except Exception: # pylint: disable=broad-except
71 # all error will be ignore, the default value is {}
74 self._init_output_config(output_config)
75 self._set_output_config(output_config, args.output_file)
76 LOG.debug('Output configuration is: %s', output_config)
78 self._set_dispatchers(output_config)
80 # update dispatcher list
81 if 'file' in output_config['DEFAULT']['dispatcher']:
82 result = {'status': 0, 'result': {}}
83 utils.write_json_to_file(args.output_file, result)
85 total_start_time = time.time()
86 parser = TaskParser(args.inputfile[0])
89 # 1.parse suite, return suite_params info
90 task_files, task_args, task_args_fnames = \
93 task_files = [parser.path]
94 task_args = [args.task_args]
95 task_args_fnames = [args.task_args_file]
97 LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
98 task_files, task_args, task_args_fnames)
105 for i in range(0, len(task_files)):
106 one_task_start_time = time.time()
107 parser.path = task_files[i]
108 scenarios, run_in_parallel, meet_precondition, contexts = \
109 parser.parse_task(self.task_id, task_args[i],
112 self.contexts.extend(contexts)
114 if not meet_precondition:
115 LOG.info("meet_precondition is %s, please check envrionment",
119 case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
121 data = self._run(scenarios, run_in_parallel, output_config)
122 except KeyboardInterrupt:
124 except Exception: # pylint: disable=broad-except
125 LOG.error('Testcase: "%s" FAILED!!!', case_name, exc_info=True)
126 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
128 LOG.info('Testcase: "%s" SUCCESS!!!', case_name)
129 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
132 # keep deployment, forget about stack
133 # (hide it for exit handler)
136 for context in self.contexts[::-1]:
139 one_task_end_time = time.time()
140 LOG.info("Task %s finished in %d secs", task_files[i],
141 one_task_end_time - one_task_start_time)
143 result = self._get_format_result(testcases)
145 self._do_output(output_config, result)
146 self._generate_reporting(result)
148 total_end_time = time.time()
149 LOG.info("Total finished in %d secs",
150 total_end_time - total_start_time)
152 scenario = scenarios[0]
153 LOG.info("To generate report, execute command "
154 "'yardstick report generate %(task_id)s %(tc)s'", scenario)
155 LOG.info("Task ALL DONE, exiting")
158 def _generate_reporting(self, result):
160 with open(constants.REPORTING_FILE, 'w') as f:
161 f.write(env.from_string(report_template).render(result))
163 LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
166 log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
167 log_formatter = logging.Formatter(log_format)
169 utils.makedirs(constants.TASK_LOG_DIR)
170 log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
171 log_handler = logging.FileHandler(log_path)
172 log_handler.setFormatter(log_formatter)
173 log_handler.setLevel(logging.DEBUG)
175 logging.root.addHandler(log_handler)
177 def _init_output_config(self, output_config):
178 output_config.setdefault('DEFAULT', {})
179 output_config.setdefault('dispatcher_http', {})
180 output_config.setdefault('dispatcher_file', {})
181 output_config.setdefault('dispatcher_influxdb', {})
182 output_config.setdefault('nsb', {})
184 def _set_output_config(self, output_config, file_path):
186 out_type = os.environ['DISPATCHER']
188 output_config['DEFAULT'].setdefault('dispatcher', 'file')
190 output_config['DEFAULT']['dispatcher'] = out_type
192 output_config['dispatcher_file']['file_path'] = file_path
195 target = os.environ['TARGET']
199 k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
200 output_config[k]['target'] = target
202 def _get_format_result(self, testcases):
203 criteria = self._get_task_criteria(testcases)
206 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
207 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
208 'pod_name': os.environ.get('NODE_NAME', 'unknown'),
209 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
215 'criteria': criteria,
216 'task_id': self.task_id,
218 'testcases': testcases
224 def _get_task_criteria(self, testcases):
225 criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
231 def _do_output(self, output_config, result):
232 dispatchers = DispatcherBase.get(output_config)
233 dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
235 for dispatcher in dispatchers:
236 dispatcher.flush_result_data(result)
238 def _run(self, scenarios, run_in_parallel, output_config):
239 """Deploys context and calls runners"""
240 for context in self.contexts:
243 background_runners = []
246 # Start all background scenarios
247 for scenario in filter(_is_background_scenario, scenarios):
248 scenario["runner"] = dict(type="Duration", duration=1000000000)
249 runner = self.run_one_scenario(scenario, output_config)
250 background_runners.append(runner)
254 for scenario in scenarios:
255 if not _is_background_scenario(scenario):
256 runner = self.run_one_scenario(scenario, output_config)
257 runners.append(runner)
259 # Wait for runners to finish
260 for runner in runners:
261 status = runner_join(runner, background_runners, self.outputs, result)
264 "{0} runner status {1}".format(runner.__execution_type__, status))
265 LOG.info("Runner ended")
268 for scenario in scenarios:
269 if not _is_background_scenario(scenario):
270 runner = self.run_one_scenario(scenario, output_config)
271 status = runner_join(runner, background_runners, self.outputs, result)
273 LOG.error('Scenario NO.%s: "%s" ERROR!',
274 scenarios.index(scenario) + 1,
275 scenario.get('type'))
277 "{0} runner status {1}".format(runner.__execution_type__, status))
278 LOG.info("Runner ended")
280 # Abort background runners
281 for runner in background_runners:
284 # Wait for background runners to finish
285 for runner in background_runners:
286 status = runner.join(self.outputs, result)
288 # Nuke if it did not stop nicely
289 base_runner.Runner.terminate(runner)
290 runner.join(self.outputs, result)
291 base_runner.Runner.release(runner)
293 print("Background task ended")
296 def atexit_handler(self):
297 """handler for process termination"""
298 base_runner.Runner.terminate_all()
301 LOG.info("Undeploying all contexts")
302 for context in self.contexts[::-1]:
305 def _parse_options(self, op):
306 if isinstance(op, dict):
307 return {k: self._parse_options(v) for k, v in op.items()}
308 elif isinstance(op, list):
309 return [self._parse_options(v) for v in op]
310 elif isinstance(op, str):
311 return self.outputs.get(op[1:]) if op.startswith('$') else op
315 def run_one_scenario(self, scenario_cfg, output_config):
316 """run one scenario using context"""
317 runner_cfg = scenario_cfg["runner"]
318 runner_cfg['output_config'] = output_config
320 options = scenario_cfg.get('options', {})
321 scenario_cfg['options'] = self._parse_options(options)
323 # TODO support get multi hosts/vms info
325 server_name = scenario_cfg.get('options', {}).get('server_name', {})
327 def config_context_target(cfg):
328 target = cfg['target']
329 if is_ip_addr(target):
330 context_cfg['target'] = {"ipaddr": target}
332 context_cfg['target'] = Context.get_server(target)
333 if self._is_same_context(cfg["host"], target):
334 context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
336 context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
338 host_name = server_name.get('host', scenario_cfg.get('host'))
340 context_cfg['host'] = Context.get_server(host_name)
342 for item in [server_name, scenario_cfg]:
344 config_context_target(item)
350 if "targets" in scenario_cfg:
352 for target in scenario_cfg["targets"]:
353 if is_ip_addr(target):
354 ip_list.append(target)
355 context_cfg['target'] = {}
357 context_cfg['target'] = Context.get_server(target)
358 if self._is_same_context(scenario_cfg["host"],
360 ip_list.append(context_cfg["target"]["private_ip"])
362 ip_list.append(context_cfg["target"]["ip"])
363 context_cfg['target']['ipaddr'] = ','.join(ip_list)
365 if "nodes" in scenario_cfg:
366 context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
367 context_cfg["networks"] = get_networks_from_nodes(
368 context_cfg["nodes"])
370 runner = base_runner.Runner.get(runner_cfg)
372 LOG.info("Starting runner of type '%s'", runner_cfg["type"])
373 runner.run(scenario_cfg, context_cfg)
377 def _is_same_context(self, host_attr, target_attr):
378 """check if two servers are in the same heat context
379 host_attr: either a name for a server created by yardstick or a dict
380 with attribute name mapping when using external heat templates
381 target_attr: either a name for a server created by yardstick or a dict
382 with attribute name mapping when using external heat templates
384 for context in self.contexts:
385 if context.__context_type__ not in {"Heat", "Kubernetes"}:
388 host = context._get_server(host_attr)
392 target = context._get_server(target_attr)
396 # Both host and target is not None, then they are in the
403 class TaskParser(object): # pragma: no cover
404 """Parser for task config files in yaml format"""
406 def __init__(self, path):
409 def _meet_constraint(self, task, cur_pod, cur_installer):
410 if "constraint" in task:
411 constraint = task.get('constraint', None)
412 if constraint is not None:
413 tc_fit_pod = constraint.get('pod', None)
414 tc_fit_installer = constraint.get('installer', None)
415 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
416 cur_pod, cur_installer, constraint)
417 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
419 if (cur_installer is None) or (tc_fit_installer and cur_installer
420 not in tc_fit_installer):
424 def _get_task_para(self, task, cur_pod):
425 task_args = task.get('task_args', None)
426 if task_args is not None:
427 task_args = task_args.get(cur_pod, task_args.get('default'))
428 task_args_fnames = task.get('task_args_fnames', None)
429 if task_args_fnames is not None:
430 task_args_fnames = task_args_fnames.get(cur_pod, None)
431 return task_args, task_args_fnames
433 def parse_suite(self):
434 """parse the suite file and return a list of task config file paths
435 and lists of optional parameters if present"""
436 LOG.info("\nParsing suite file:%s", self.path)
439 with open(self.path) as stream:
440 cfg = yaml_load(stream)
441 except IOError as ioerror:
444 self._check_schema(cfg["schema"], "suite")
445 LOG.info("\nStarting scenario:%s", cfg["name"])
447 test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
448 test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
450 if test_cases_dir[-1] != os.sep:
451 test_cases_dir += os.sep
453 cur_pod = os.environ.get('NODE_NAME', None)
454 cur_installer = os.environ.get('INSTALLER_TYPE', None)
456 valid_task_files = []
458 valid_task_args_fnames = []
460 for task in cfg["test_cases"]:
462 if "file_name" in task:
463 task_fname = task.get('file_name', None)
464 if task_fname is None:
469 if self._meet_constraint(task, cur_pod, cur_installer):
470 valid_task_files.append(test_cases_dir + task_fname)
473 # 3.fetch task parameters
474 task_args, task_args_fnames = self._get_task_para(task, cur_pod)
475 valid_task_args.append(task_args)
476 valid_task_args_fnames.append(task_args_fnames)
478 return valid_task_files, valid_task_args, valid_task_args_fnames
480 def parse_task(self, task_id, task_args=None, task_args_file=None):
481 """parses the task file and return an context and scenario instances"""
482 LOG.info("Parsing task config: %s", self.path)
487 with open(task_args_file) as f:
488 kw.update(parse_task_args("task_args_file", f.read()))
489 kw.update(parse_task_args("task_args", task_args))
494 with open(self.path) as f:
496 input_task = f.read()
497 rendered_task = TaskTemplate.render(input_task, **kw)
498 except Exception as e:
499 LOG.exception('Failed to render template:\n%s\n', input_task)
501 LOG.debug("Input task is:\n%s\n", rendered_task)
503 cfg = yaml_load(rendered_task)
504 except IOError as ioerror:
507 self._check_schema(cfg["schema"], "task")
508 meet_precondition = self._check_precondition(cfg)
510 # TODO: support one or many contexts? Many would simpler and precise
511 # TODO: support hybrid context type
513 context_cfgs = [cfg["context"]]
514 elif "contexts" in cfg:
515 context_cfgs = cfg["contexts"]
517 context_cfgs = [{"type": "Dummy"}]
520 for cfg_attrs in context_cfgs:
522 cfg_attrs['task_id'] = task_id
523 # default to Heat context because we are testing OpenStack
524 context_type = cfg_attrs.get("type", "Heat")
525 context = Context.get(context_type)
526 context.init(cfg_attrs)
527 contexts.append(context)
529 run_in_parallel = cfg.get("run_in_parallel", False)
531 # add tc and task id for influxdb extended tags
532 for scenario in cfg["scenarios"]:
533 task_name = os.path.splitext(os.path.basename(self.path))[0]
534 scenario["tc"] = task_name
535 scenario["task_id"] = task_id
536 # embed task path into scenario so we can load other files
537 # relative to task path
538 scenario["task_path"] = os.path.dirname(self.path)
540 self._change_node_names(scenario, contexts)
542 # TODO we need something better here, a class that represent the file
543 return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
546 def _change_node_names(scenario, contexts):
547 """Change the node names in a scenario, depending on the context config
549 The nodes (VMs or physical servers) are referred in the context section
550 with the name of the server and the name of the context:
551 <server name>.<context name>
553 If the context is going to be undeployed at the end of the test, the
554 task ID is suffixed to the name to avoid interferences with previous
555 deployments. If the context needs to be deployed at the end of the
556 test, the name assigned is kept.
558 There are several places where a node name could appear in the scenario
569 server_name: # JIRA: YARDSTICK-810
575 tg__0: tg_0.yardstick
576 vnf__0: vnf_0.yardstick
578 def qualified_name(name):
579 node_name, context_name = name.split('.')
581 ctx = next((context for context in contexts
582 if context.assigned_name == context_name))
583 except StopIteration:
584 raise exceptions.ScenarioConfigContextNameNotFound(
585 context_name=context_name)
587 return '{}.{}'.format(node_name, ctx.name)
589 if 'host' in scenario:
590 scenario['host'] = qualified_name(scenario['host'])
591 if 'target' in scenario:
592 scenario['target'] = qualified_name(scenario['target'])
593 server_name = scenario.get('options', {}).get('server_name', {})
594 if 'host' in server_name:
595 server_name['host'] = qualified_name(server_name['host'])
596 if 'target' in server_name:
597 server_name['target'] = qualified_name(server_name['target'])
598 if 'targets' in scenario:
599 for idx, target in enumerate(scenario['targets']):
600 scenario['targets'][idx] = qualified_name(target)
601 if 'nodes' in scenario:
602 for scenario_node, target in scenario['nodes'].items():
603 scenario['nodes'][scenario_node] = qualified_name(target)
605 def _check_schema(self, cfg_schema, schema_type):
606 """Check if config file is using the correct schema type"""
608 if cfg_schema != "yardstick:" + schema_type + ":0.1":
609 sys.exit("error: file %s has unknown schema %s" % (self.path,
612 def _check_precondition(self, cfg):
613 """Check if the environment meet the precondition"""
615 if "precondition" in cfg:
616 precondition = cfg["precondition"]
617 installer_type = precondition.get("installer_type", None)
618 deploy_scenarios = precondition.get("deploy_scenarios", None)
619 tc_fit_pods = precondition.get("pod_name", None)
620 installer_type_env = os.environ.get('INSTALL_TYPE', None)
621 deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
622 pod_name_env = os.environ.get('NODE_NAME', None)
624 LOG.info("installer_type: %s, installer_type_env: %s",
625 installer_type, installer_type_env)
626 LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
627 deploy_scenarios, deploy_scenario_env)
628 LOG.info("tc_fit_pods: %s, pod_name_env: %s",
629 tc_fit_pods, pod_name_env)
630 if installer_type and installer_type_env:
631 if installer_type_env not in installer_type:
633 if deploy_scenarios and deploy_scenario_env:
634 deploy_scenarios_list = deploy_scenarios.split(',')
635 for deploy_scenario in deploy_scenarios_list:
636 if deploy_scenario_env.startswith(deploy_scenario):
639 if tc_fit_pods and pod_name_env:
640 if pod_name_env not in tc_fit_pods:
645 def is_ip_addr(addr):
646 """check if string addr is an IP address"""
648 addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
649 except AttributeError:
653 ipaddress.ip_address(addr.encode('utf-8'))
660 def _is_background_scenario(scenario):
661 if "run_in_background" in scenario:
662 return scenario["run_in_background"]
667 def parse_nodes_with_context(scenario_cfg):
668 """parse the 'nodes' fields in scenario """
669 # ensure consistency in node instantiation order
670 return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
671 for nodename in sorted(scenario_cfg["nodes"]))
674 def get_networks_from_nodes(nodes):
675 """parse the 'nodes' fields in scenario """
677 for node in nodes.values():
680 interfaces = node.get('interfaces', {})
681 for interface in interfaces.values():
682 # vld_id is network_name
683 network_name = interface.get('network_name')
686 network = Context.get_network(network_name)
688 networks[network['name']] = network
692 def runner_join(runner, background_runners, outputs, result):
693 """join (wait for) a runner, exit process at runner failure
694 :param background_runners:
695 :type background_runners:
701 while runner.poll() is None:
702 outputs.update(runner.get_output())
703 result.extend(runner.get_result())
704 # drain all the background runner queues
705 for background in background_runners:
706 outputs.update(background.get_output())
707 result.extend(background.get_result())
708 status = runner.join(outputs, result)
709 base_runner.Runner.release(runner)
713 def print_invalid_header(source_name, args):
714 print("Invalid %(source)s passed:\n\n %(args)s\n"
715 % {"source": source_name, "args": args})
718 def parse_task_args(src_name, args):
719 if isinstance(args, collections.Mapping):
723 kw = args and yaml_load(args)
724 kw = {} if kw is None else kw
725 except yaml.parser.ParserError as e:
726 print_invalid_header(src_name, args)
727 print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
728 % {"source": src_name, "err": e})
731 if not isinstance(kw, dict):
732 print_invalid_header(src_name, args)
733 print("%(src)s had to be dict, actually %(src_type)s\n"
734 % {"src": src_name, "src_type": type(kw)})