1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 from collections import OrderedDict
22 from six.moves import filter
23 from jinja2 import Environment
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.constants import CONF_FILE
28 from yardstick.common.yaml_loader import yaml_load
29 from yardstick.dispatcher.base import Base as DispatcherBase
30 from yardstick.common import constants
31 from yardstick.common import exceptions as y_exc
32 from yardstick.common import task_template
33 from yardstick.common import utils
34 from yardstick.common.html_template import report_template
36 output_file_default = "/tmp/yardstick.out"
37 test_cases_dir_default = "tests/opnfv/test_cases/"
38 LOG = logging.getLogger(__name__)
41 class Task(object): # pragma: no cover
44 Set of commands to manage benchmark tasks.
51 def _set_dispatchers(self, output_config):
52 dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
54 out_types = [s.strip() for s in dispatchers.split(',')]
55 output_config['DEFAULT']['dispatcher'] = out_types
57 def start(self, args, **kwargs): # pylint: disable=unused-argument
58 """Start a benchmark scenario."""
60 atexit.register(self.atexit_handler)
62 task_id = getattr(args, 'task_id')
63 self.task_id = task_id if task_id else str(uuid.uuid4())
68 output_config = utils.parse_ini_file(CONF_FILE)
69 except Exception: # pylint: disable=broad-except
70 # all error will be ignore, the default value is {}
73 self._init_output_config(output_config)
74 self._set_output_config(output_config, args.output_file)
75 LOG.debug('Output configuration is: %s', output_config)
77 self._set_dispatchers(output_config)
79 # update dispatcher list
80 if 'file' in output_config['DEFAULT']['dispatcher']:
81 result = {'status': 0, 'result': {}}
82 utils.write_json_to_file(args.output_file, result)
84 total_start_time = time.time()
85 parser = TaskParser(args.inputfile[0])
88 # 1.parse suite, return suite_params info
89 task_files, task_args, task_args_fnames = parser.parse_suite()
91 task_files = [parser.path]
92 task_args = [args.task_args]
93 task_args_fnames = [args.task_args_file]
95 LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
96 task_files, task_args, task_args_fnames)
102 tasks = self._parse_tasks(parser, task_files, args, task_args,
105 # Execute task files.
106 for i, _ in enumerate(task_files):
107 one_task_start_time = time.time()
108 self.contexts.extend(tasks[i]['contexts'])
109 if not tasks[i]['meet_precondition']:
110 LOG.info('"meet_precondition" is %s, please check environment',
111 tasks[i]['meet_precondition'])
115 data = self._run(tasks[i]['scenarios'],
116 tasks[i]['run_in_parallel'],
118 except KeyboardInterrupt:
120 except Exception: # pylint: disable=broad-except
121 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
123 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
126 LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name'])
127 testcases[tasks[i]['case_name']] = {'criteria': 'PASS',
131 # keep deployment, forget about stack
132 # (hide it for exit handler)
135 for context in self.contexts[::-1]:
138 one_task_end_time = time.time()
139 LOG.info("Task %s finished in %d secs", task_files[i],
140 one_task_end_time - one_task_start_time)
142 result = self._get_format_result(testcases)
144 self._do_output(output_config, result)
145 self._generate_reporting(result)
147 total_end_time = time.time()
148 LOG.info("Total finished in %d secs",
149 total_end_time - total_start_time)
151 LOG.info('To generate report, execute command "yardstick report '
152 'generate %(task_id)s <yaml_name>s"', self.task_id)
153 LOG.info("Task ALL DONE, exiting")
156 def _generate_reporting(self, result):
158 with open(constants.REPORTING_FILE, 'w') as f:
159 f.write(env.from_string(report_template).render(result))
161 LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
164 log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
165 log_formatter = logging.Formatter(log_format)
167 utils.makedirs(constants.TASK_LOG_DIR)
168 log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
169 log_handler = logging.FileHandler(log_path)
170 log_handler.setFormatter(log_formatter)
171 log_handler.setLevel(logging.DEBUG)
173 logging.root.addHandler(log_handler)
175 def _init_output_config(self, output_config):
176 output_config.setdefault('DEFAULT', {})
177 output_config.setdefault('dispatcher_http', {})
178 output_config.setdefault('dispatcher_file', {})
179 output_config.setdefault('dispatcher_influxdb', {})
180 output_config.setdefault('nsb', {})
182 def _set_output_config(self, output_config, file_path):
184 out_type = os.environ['DISPATCHER']
186 output_config['DEFAULT'].setdefault('dispatcher', 'file')
188 output_config['DEFAULT']['dispatcher'] = out_type
190 output_config['dispatcher_file']['file_path'] = file_path
193 target = os.environ['TARGET']
197 k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
198 output_config[k]['target'] = target
200 def _get_format_result(self, testcases):
201 criteria = self._get_task_criteria(testcases)
204 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
205 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
206 'pod_name': os.environ.get('NODE_NAME', 'unknown'),
207 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
213 'criteria': criteria,
214 'task_id': self.task_id,
216 'testcases': testcases
222 def _get_task_criteria(self, testcases):
223 criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
229 def _do_output(self, output_config, result):
230 dispatchers = DispatcherBase.get(output_config)
231 dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
233 for dispatcher in dispatchers:
234 dispatcher.flush_result_data(result)
236 def _run(self, scenarios, run_in_parallel, output_config):
237 """Deploys context and calls runners"""
238 for context in self.contexts:
241 background_runners = []
244 # Start all background scenarios
245 for scenario in filter(_is_background_scenario, scenarios):
246 scenario["runner"] = dict(type="Duration", duration=1000000000)
247 runner = self.run_one_scenario(scenario, output_config)
248 background_runners.append(runner)
252 for scenario in scenarios:
253 if not _is_background_scenario(scenario):
254 runner = self.run_one_scenario(scenario, output_config)
255 runners.append(runner)
257 # Wait for runners to finish
258 for runner in runners:
259 status = runner_join(runner, background_runners, self.outputs, result)
262 "{0} runner status {1}".format(runner.__execution_type__, status))
263 LOG.info("Runner ended")
266 for scenario in scenarios:
267 if not _is_background_scenario(scenario):
268 runner = self.run_one_scenario(scenario, output_config)
269 status = runner_join(runner, background_runners, self.outputs, result)
271 LOG.error('Scenario NO.%s: "%s" ERROR!',
272 scenarios.index(scenario) + 1,
273 scenario.get('type'))
275 "{0} runner status {1}".format(runner.__execution_type__, status))
276 LOG.info("Runner ended")
278 # Abort background runners
279 for runner in background_runners:
282 # Wait for background runners to finish
283 for runner in background_runners:
284 status = runner.join(self.outputs, result)
286 # Nuke if it did not stop nicely
287 base_runner.Runner.terminate(runner)
288 runner.join(self.outputs, result)
289 base_runner.Runner.release(runner)
291 print("Background task ended")
294 def atexit_handler(self):
295 """handler for process termination"""
296 base_runner.Runner.terminate_all()
299 LOG.info("Undeploying all contexts")
300 for context in self.contexts[::-1]:
303 def _parse_options(self, op):
304 if isinstance(op, dict):
305 return {k: self._parse_options(v) for k, v in op.items()}
306 elif isinstance(op, list):
307 return [self._parse_options(v) for v in op]
308 elif isinstance(op, str):
309 return self.outputs.get(op[1:]) if op.startswith('$') else op
313 def _parse_tasks(self, parser, task_files, args, task_args,
318 for i, _ in enumerate(task_files):
319 parser.path = task_files[i]
320 tasks.append(parser.parse_task(self.task_id, task_args[i],
321 task_args_fnames[i]))
322 tasks[i]['case_name'] = os.path.splitext(
323 os.path.basename(task_files[i]))[0]
326 utils.makedirs(args.render_only)
327 for idx, task in enumerate(tasks):
328 output_file_name = os.path.abspath(os.path.join(
330 '{0:03d}-{1}.yml'.format(idx, task['case_name'])))
331 utils.write_file(output_file_name, task['rendered'])
337 def run_one_scenario(self, scenario_cfg, output_config):
338 """run one scenario using context"""
339 runner_cfg = scenario_cfg["runner"]
340 runner_cfg['output_config'] = output_config
342 options = scenario_cfg.get('options', {})
343 scenario_cfg['options'] = self._parse_options(options)
345 # TODO support get multi hosts/vms info
347 server_name = scenario_cfg.get('options', {}).get('server_name', {})
349 def config_context_target(cfg):
350 target = cfg['target']
351 if is_ip_addr(target):
352 context_cfg['target'] = {"ipaddr": target}
354 context_cfg['target'] = Context.get_server(target)
355 if self._is_same_context(cfg["host"], target):
356 context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
358 context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
360 host_name = server_name.get('host', scenario_cfg.get('host'))
362 context_cfg['host'] = Context.get_server(host_name)
364 for item in [server_name, scenario_cfg]:
366 config_context_target(item)
368 LOG.debug("Got a KeyError in config_context_target(%s)", item)
372 if "targets" in scenario_cfg:
374 for target in scenario_cfg["targets"]:
375 if is_ip_addr(target):
376 ip_list.append(target)
377 context_cfg['target'] = {}
379 context_cfg['target'] = Context.get_server(target)
380 if self._is_same_context(scenario_cfg["host"],
382 ip_list.append(context_cfg["target"]["private_ip"])
384 ip_list.append(context_cfg["target"]["ip"])
385 context_cfg['target']['ipaddr'] = ','.join(ip_list)
387 if "nodes" in scenario_cfg:
388 context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
389 context_cfg["networks"] = get_networks_from_nodes(
390 context_cfg["nodes"])
392 runner = base_runner.Runner.get(runner_cfg)
394 LOG.info("Starting runner of type '%s'", runner_cfg["type"])
395 runner.run(scenario_cfg, context_cfg)
399 def _is_same_context(self, host_attr, target_attr):
400 """check if two servers are in the same heat context
401 host_attr: either a name for a server created by yardstick or a dict
402 with attribute name mapping when using external heat templates
403 target_attr: either a name for a server created by yardstick or a dict
404 with attribute name mapping when using external heat templates
406 for context in self.contexts:
407 if context.__context_type__ not in {"Heat", "Kubernetes"}:
410 host = context._get_server(host_attr)
414 target = context._get_server(target_attr)
418 # Both host and target is not None, then they are in the
425 class TaskParser(object): # pragma: no cover
426 """Parser for task config files in yaml format"""
428 def __init__(self, path):
431 def _meet_constraint(self, task, cur_pod, cur_installer):
432 if "constraint" in task:
433 constraint = task.get('constraint', None)
434 if constraint is not None:
435 tc_fit_pod = constraint.get('pod', None)
436 tc_fit_installer = constraint.get('installer', None)
437 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
438 cur_pod, cur_installer, constraint)
439 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
441 if (cur_installer is None) or (tc_fit_installer and cur_installer
442 not in tc_fit_installer):
446 def _get_task_para(self, task, cur_pod):
447 task_args = task.get('task_args', None)
448 if task_args is not None:
449 task_args = task_args.get(cur_pod, task_args.get('default'))
450 task_args_fnames = task.get('task_args_fnames', None)
451 if task_args_fnames is not None:
452 task_args_fnames = task_args_fnames.get(cur_pod, None)
453 return task_args, task_args_fnames
455 def parse_suite(self):
456 """parse the suite file and return a list of task config file paths
457 and lists of optional parameters if present"""
458 LOG.info("\nParsing suite file:%s", self.path)
461 with open(self.path) as stream:
462 cfg = yaml_load(stream)
463 except IOError as ioerror:
466 self._check_schema(cfg["schema"], "suite")
467 LOG.info("\nStarting scenario:%s", cfg["name"])
469 test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
470 test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
472 if test_cases_dir[-1] != os.sep:
473 test_cases_dir += os.sep
475 cur_pod = os.environ.get('NODE_NAME', None)
476 cur_installer = os.environ.get('INSTALLER_TYPE', None)
478 valid_task_files = []
480 valid_task_args_fnames = []
482 for task in cfg["test_cases"]:
484 if "file_name" in task:
485 task_fname = task.get('file_name', None)
486 if task_fname is None:
491 if self._meet_constraint(task, cur_pod, cur_installer):
492 valid_task_files.append(test_cases_dir + task_fname)
495 # 3.fetch task parameters
496 task_args, task_args_fnames = self._get_task_para(task, cur_pod)
497 valid_task_args.append(task_args)
498 valid_task_args_fnames.append(task_args_fnames)
500 return valid_task_files, valid_task_args, valid_task_args_fnames
502 def _render_task(self, task_args, task_args_file):
503 """Render the input task with the given arguments
505 :param task_args: (dict) arguments to render the task
506 :param task_args_file: (str) file containing the arguments to render
508 :return: (str) task file rendered
513 with open(task_args_file) as f:
514 kw.update(parse_task_args('task_args_file', f.read()))
515 kw.update(parse_task_args('task_args', task_args))
517 raise y_exc.TaskRenderArgumentError()
521 with open(self.path) as f:
522 input_task = f.read()
523 rendered_task = task_template.TaskTemplate.render(input_task, **kw)
524 LOG.debug('Input task is:\n%s', rendered_task)
525 parsed_task = yaml_load(rendered_task)
526 except (IOError, OSError):
527 raise y_exc.TaskReadError(task_file=self.path)
529 raise y_exc.TaskRenderError(input_task=input_task)
531 return parsed_task, rendered_task
533 def parse_task(self, task_id, task_args=None, task_args_file=None):
534 """parses the task file and return an context and scenario instances"""
535 LOG.info("Parsing task config: %s", self.path)
537 cfg, rendered = self._render_task(task_args, task_args_file)
538 self._check_schema(cfg["schema"], "task")
539 meet_precondition = self._check_precondition(cfg)
541 # TODO: support one or many contexts? Many would simpler and precise
542 # TODO: support hybrid context type
544 context_cfgs = [cfg["context"]]
545 elif "contexts" in cfg:
546 context_cfgs = cfg["contexts"]
548 context_cfgs = [{"type": "Dummy"}]
551 for cfg_attrs in context_cfgs:
553 cfg_attrs['task_id'] = task_id
554 # default to Heat context because we are testing OpenStack
555 context_type = cfg_attrs.get("type", "Heat")
556 context = Context.get(context_type)
557 context.init(cfg_attrs)
558 # Update the name in case the context has used the name_suffix
559 cfg_attrs['name'] = context.name
560 contexts.append(context)
562 run_in_parallel = cfg.get("run_in_parallel", False)
564 # add tc and task id for influxdb extended tags
565 for scenario in cfg["scenarios"]:
566 task_name = os.path.splitext(os.path.basename(self.path))[0]
567 scenario["tc"] = task_name
568 scenario["task_id"] = task_id
569 # embed task path into scenario so we can load other files
570 # relative to task path
571 scenario["task_path"] = os.path.dirname(self.path)
573 self._change_node_names(scenario, contexts)
575 # TODO we need something better here, a class that represent the file
576 return {'scenarios': cfg['scenarios'],
577 'run_in_parallel': run_in_parallel,
578 'meet_precondition': meet_precondition,
579 'contexts': contexts,
580 'rendered': rendered}
583 def _change_node_names(scenario, contexts):
584 """Change the node names in a scenario, depending on the context config
586 The nodes (VMs or physical servers) are referred in the context section
587 with the name of the server and the name of the context:
588 <server name>.<context name>
590 If the context is going to be undeployed at the end of the test, the
591 task ID is suffixed to the name to avoid interferences with previous
592 deployments. If the context needs to be deployed at the end of the
593 test, the name assigned is kept.
595 There are several places where a node name could appear in the scenario
606 server_name: # JIRA: YARDSTICK-810
612 tg__0: tg_0.yardstick
613 vnf__0: vnf_0.yardstick
615 def qualified_name(name):
616 node_name, context_name = name.split('.')
618 ctx = next((context for context in contexts
619 if context.assigned_name == context_name))
620 except StopIteration:
621 raise y_exc.ScenarioConfigContextNameNotFound(
622 context_name=context_name)
624 return '{}.{}'.format(node_name, ctx.name)
626 if 'host' in scenario:
627 scenario['host'] = qualified_name(scenario['host'])
628 if 'target' in scenario:
629 scenario['target'] = qualified_name(scenario['target'])
630 server_name = scenario.get('options', {}).get('server_name', {})
631 if 'host' in server_name:
632 server_name['host'] = qualified_name(server_name['host'])
633 if 'target' in server_name:
634 server_name['target'] = qualified_name(server_name['target'])
635 if 'targets' in scenario:
636 for idx, target in enumerate(scenario['targets']):
637 scenario['targets'][idx] = qualified_name(target)
638 if 'nodes' in scenario:
639 for scenario_node, target in scenario['nodes'].items():
640 scenario['nodes'][scenario_node] = qualified_name(target)
642 def _check_schema(self, cfg_schema, schema_type):
643 """Check if config file is using the correct schema type"""
645 if cfg_schema != "yardstick:" + schema_type + ":0.1":
646 sys.exit("error: file %s has unknown schema %s" % (self.path,
649 def _check_precondition(self, cfg):
650 """Check if the environment meet the precondition"""
652 if "precondition" in cfg:
653 precondition = cfg["precondition"]
654 installer_type = precondition.get("installer_type", None)
655 deploy_scenarios = precondition.get("deploy_scenarios", None)
656 tc_fit_pods = precondition.get("pod_name", None)
657 installer_type_env = os.environ.get('INSTALL_TYPE', None)
658 deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
659 pod_name_env = os.environ.get('NODE_NAME', None)
661 LOG.info("installer_type: %s, installer_type_env: %s",
662 installer_type, installer_type_env)
663 LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
664 deploy_scenarios, deploy_scenario_env)
665 LOG.info("tc_fit_pods: %s, pod_name_env: %s",
666 tc_fit_pods, pod_name_env)
667 if installer_type and installer_type_env:
668 if installer_type_env not in installer_type:
670 if deploy_scenarios and deploy_scenario_env:
671 deploy_scenarios_list = deploy_scenarios.split(',')
672 for deploy_scenario in deploy_scenarios_list:
673 if deploy_scenario_env.startswith(deploy_scenario):
676 if tc_fit_pods and pod_name_env:
677 if pod_name_env not in tc_fit_pods:
682 def is_ip_addr(addr):
683 """check if string addr is an IP address"""
685 addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
686 except AttributeError:
690 ipaddress.ip_address(addr.encode('utf-8'))
697 def _is_background_scenario(scenario):
698 if "run_in_background" in scenario:
699 return scenario["run_in_background"]
704 def parse_nodes_with_context(scenario_cfg):
705 """parse the 'nodes' fields in scenario """
706 # ensure consistency in node instantiation order
707 return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
708 for nodename in sorted(scenario_cfg["nodes"]))
711 def get_networks_from_nodes(nodes):
712 """parse the 'nodes' fields in scenario """
714 for node in nodes.values():
717 interfaces = node.get('interfaces', {})
718 for interface in interfaces.values():
719 # vld_id is network_name
720 network_name = interface.get('network_name')
723 network = Context.get_network(network_name)
725 networks[network['name']] = network
729 def runner_join(runner, background_runners, outputs, result):
730 """join (wait for) a runner, exit process at runner failure
731 :param background_runners:
732 :type background_runners:
738 while runner.poll() is None:
739 outputs.update(runner.get_output())
740 result.extend(runner.get_result())
741 # drain all the background runner queues
742 for background in background_runners:
743 outputs.update(background.get_output())
744 result.extend(background.get_result())
745 status = runner.join(outputs, result)
746 base_runner.Runner.release(runner)
750 def print_invalid_header(source_name, args):
751 print("Invalid %(source)s passed:\n\n %(args)s\n"
752 % {"source": source_name, "args": args})
755 def parse_task_args(src_name, args):
756 if isinstance(args, collections.Mapping):
760 kw = args and yaml_load(args)
761 kw = {} if kw is None else kw
762 except yaml.parser.ParserError as e:
763 print_invalid_header(src_name, args)
764 print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
765 % {"source": src_name, "err": e})
768 if not isinstance(kw, dict):
769 print_invalid_header(src_name, args)
770 print("%(src)s had to be dict, actually %(src_type)s\n"
771 % {"src": src_name, "src_type": type(kw)})