1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 from collections import OrderedDict
22 from six.moves import filter
23 from jinja2 import Environment
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.constants import CONF_FILE
28 from yardstick.common.yaml_loader import yaml_load
29 from yardstick.dispatcher.base import Base as DispatcherBase
30 from yardstick.common import constants
31 from yardstick.common import exceptions as y_exc
32 from yardstick.common import task_template
33 from yardstick.common import utils
34 from yardstick.common.html_template import report_template
36 output_file_default = "/tmp/yardstick.out"
37 test_cases_dir_default = "tests/opnfv/test_cases/"
38 LOG = logging.getLogger(__name__)
41 class Task(object): # pragma: no cover
44 Set of commands to manage benchmark tasks.
51 def _set_dispatchers(self, output_config):
52 dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
54 out_types = [s.strip() for s in dispatchers.split(',')]
55 output_config['DEFAULT']['dispatcher'] = out_types
57 def start(self, args, **kwargs): # pylint: disable=unused-argument
58 """Start a benchmark scenario."""
60 atexit.register(self.atexit_handler)
62 task_id = getattr(args, 'task_id')
63 self.task_id = task_id if task_id else str(uuid.uuid4())
68 output_config = utils.parse_ini_file(CONF_FILE)
69 except Exception: # pylint: disable=broad-except
70 # all error will be ignore, the default value is {}
73 self._init_output_config(output_config)
74 self._set_output_config(output_config, args.output_file)
75 LOG.debug('Output configuration is: %s', output_config)
77 self._set_dispatchers(output_config)
79 # update dispatcher list
80 if 'file' in output_config['DEFAULT']['dispatcher']:
81 result = {'status': 0, 'result': {}}
82 utils.write_json_to_file(args.output_file, result)
84 total_start_time = time.time()
85 parser = TaskParser(args.inputfile[0])
88 # 1.parse suite, return suite_params info
89 task_files, task_args, task_args_fnames = parser.parse_suite()
91 task_files = [parser.path]
92 task_args = [args.task_args]
93 task_args_fnames = [args.task_args_file]
95 LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
96 task_files, task_args, task_args_fnames)
102 tasks = self._parse_tasks(parser, task_files, args, task_args,
105 # Execute task files.
106 for i, _ in enumerate(task_files):
107 one_task_start_time = time.time()
108 self.contexts.extend(tasks[i]['contexts'])
109 if not tasks[i]['meet_precondition']:
110 LOG.info('"meet_precondition" is %s, please check environment',
111 tasks[i]['meet_precondition'])
115 data = self._run(tasks[i]['scenarios'],
116 tasks[i]['run_in_parallel'],
118 except KeyboardInterrupt:
120 except Exception: # pylint: disable=broad-except
121 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
123 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
126 LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name'])
127 testcases[tasks[i]['case_name']] = {'criteria': 'PASS',
131 # keep deployment, forget about stack
132 # (hide it for exit handler)
135 for context in self.contexts[::-1]:
138 one_task_end_time = time.time()
139 LOG.info("Task %s finished in %d secs", task_files[i],
140 one_task_end_time - one_task_start_time)
142 result = self._get_format_result(testcases)
144 self._do_output(output_config, result)
145 self._generate_reporting(result)
147 total_end_time = time.time()
148 LOG.info("Total finished in %d secs",
149 total_end_time - total_start_time)
151 LOG.info('To generate report, execute command "yardstick report '
152 'generate %s <YAML_NAME>"', self.task_id)
153 LOG.info("Task ALL DONE, exiting")
156 def _generate_reporting(self, result):
158 with open(constants.REPORTING_FILE, 'w') as f:
159 f.write(env.from_string(report_template).render(result))
161 LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
164 log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
165 log_formatter = logging.Formatter(log_format)
167 utils.makedirs(constants.TASK_LOG_DIR)
168 log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
169 log_handler = logging.FileHandler(log_path)
170 log_handler.setFormatter(log_formatter)
171 log_handler.setLevel(logging.DEBUG)
173 logging.root.addHandler(log_handler)
175 def _init_output_config(self, output_config):
176 output_config.setdefault('DEFAULT', {})
177 output_config.setdefault('dispatcher_http', {})
178 output_config.setdefault('dispatcher_file', {})
179 output_config.setdefault('dispatcher_influxdb', {})
180 output_config.setdefault('nsb', {})
182 def _set_output_config(self, output_config, file_path):
184 out_type = os.environ['DISPATCHER']
186 output_config['DEFAULT'].setdefault('dispatcher', 'file')
188 output_config['DEFAULT']['dispatcher'] = out_type
190 output_config['dispatcher_file']['file_path'] = file_path
193 target = os.environ['TARGET']
197 k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
198 output_config[k]['target'] = target
200 def _get_format_result(self, testcases):
201 criteria = self._get_task_criteria(testcases)
204 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
205 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
206 'pod_name': os.environ.get('NODE_NAME', 'unknown'),
207 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
213 'criteria': criteria,
214 'task_id': self.task_id,
216 'testcases': testcases
222 def _get_task_criteria(self, testcases):
223 criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
229 def _do_output(self, output_config, result):
230 dispatchers = DispatcherBase.get(output_config)
231 dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
233 for dispatcher in dispatchers:
234 dispatcher.flush_result_data(result)
236 def _run(self, scenarios, run_in_parallel, output_config):
237 """Deploys context and calls runners"""
238 for context in self.contexts:
241 background_runners = []
244 # Start all background scenarios
245 for scenario in filter(_is_background_scenario, scenarios):
246 scenario["runner"] = dict(type="Duration", duration=1000000000)
247 runner = self.run_one_scenario(scenario, output_config)
248 background_runners.append(runner)
252 for scenario in scenarios:
253 if not _is_background_scenario(scenario):
254 runner = self.run_one_scenario(scenario, output_config)
255 runners.append(runner)
257 # Wait for runners to finish
258 for runner in runners:
259 status = runner_join(runner, background_runners, self.outputs, result)
262 "{0} runner status {1}".format(runner.__execution_type__, status))
263 LOG.info("Runner ended")
266 for scenario in scenarios:
267 if not _is_background_scenario(scenario):
268 runner = self.run_one_scenario(scenario, output_config)
269 status = runner_join(runner, background_runners, self.outputs, result)
271 LOG.error('Scenario NO.%s: "%s" ERROR!',
272 scenarios.index(scenario) + 1,
273 scenario.get('type'))
275 "{0} runner status {1}".format(runner.__execution_type__, status))
276 LOG.info("Runner ended")
278 # Abort background runners
279 for runner in background_runners:
282 # Wait for background runners to finish
283 for runner in background_runners:
284 status = runner.join(self.outputs, result)
286 # Nuke if it did not stop nicely
287 base_runner.Runner.terminate(runner)
288 runner.join(self.outputs, result)
289 base_runner.Runner.release(runner)
291 print("Background task ended")
294 def atexit_handler(self):
295 """handler for process termination"""
296 base_runner.Runner.terminate_all()
299 LOG.info("Undeploying all contexts")
300 for context in self.contexts[::-1]:
303 def _parse_options(self, op):
304 if isinstance(op, dict):
305 return {k: self._parse_options(v) for k, v in op.items()}
306 elif isinstance(op, list):
307 return [self._parse_options(v) for v in op]
308 elif isinstance(op, str):
309 return self.outputs.get(op[1:]) if op.startswith('$') else op
313 def _parse_tasks(self, parser, task_files, args, task_args,
318 for i, _ in enumerate(task_files):
319 parser.path = task_files[i]
320 tasks.append(parser.parse_task(self.task_id, task_args[i],
321 task_args_fnames[i]))
322 tasks[i]['case_name'] = os.path.splitext(
323 os.path.basename(task_files[i]))[0]
326 utils.makedirs(args.render_only)
327 for idx, task in enumerate(tasks):
328 output_file_name = os.path.abspath(os.path.join(
330 '{0:03d}-{1}.yml'.format(idx, task['case_name'])))
331 utils.write_file(output_file_name, task['rendered'])
337 def run_one_scenario(self, scenario_cfg, output_config):
338 """run one scenario using context"""
339 runner_cfg = scenario_cfg["runner"]
340 runner_cfg['output_config'] = output_config
342 options = scenario_cfg.get('options', {})
343 scenario_cfg['options'] = self._parse_options(options)
345 # TODO support get multi hosts/vms info
347 server_name = scenario_cfg.get('options', {}).get('server_name', {})
349 def config_context_target(cfg):
350 target = cfg['target']
351 if is_ip_addr(target):
352 context_cfg['target'] = {"ipaddr": target}
354 context_cfg['target'] = Context.get_server(target)
355 if self._is_same_context(cfg["host"], target):
356 context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
358 context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
360 host_name = server_name.get('host', scenario_cfg.get('host'))
362 context_cfg['host'] = Context.get_server(host_name)
364 for item in [server_name, scenario_cfg]:
366 config_context_target(item)
368 LOG.debug("Got a KeyError in config_context_target(%s)", item)
372 if "targets" in scenario_cfg:
374 for target in scenario_cfg["targets"]:
375 if is_ip_addr(target):
376 ip_list.append(target)
377 context_cfg['target'] = {}
379 context_cfg['target'] = Context.get_server(target)
380 if self._is_same_context(scenario_cfg["host"],
382 ip_list.append(context_cfg["target"]["private_ip"])
384 ip_list.append(context_cfg["target"]["ip"])
385 context_cfg['target']['ipaddr'] = ','.join(ip_list)
387 if "nodes" in scenario_cfg:
388 context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
389 context_cfg["networks"] = get_networks_from_nodes(
390 context_cfg["nodes"])
392 runner = base_runner.Runner.get(runner_cfg)
394 LOG.info("Starting runner of type '%s'", runner_cfg["type"])
395 runner.run(scenario_cfg, context_cfg)
399 def _is_same_context(self, host_attr, target_attr):
400 """check if two servers are in the same heat context
401 host_attr: either a name for a server created by yardstick or a dict
402 with attribute name mapping when using external heat templates
403 target_attr: either a name for a server created by yardstick or a dict
404 with attribute name mapping when using external heat templates
406 for context in self.contexts:
407 if context.__context_type__ not in {"Heat", "Kubernetes"}:
410 host = context._get_server(host_attr)
414 target = context._get_server(target_attr)
418 # Both host and target is not None, then they are in the
425 class TaskParser(object): # pragma: no cover
426 """Parser for task config files in yaml format"""
428 def __init__(self, path):
431 def _meet_constraint(self, task, cur_pod, cur_installer):
432 if "constraint" in task:
433 constraint = task.get('constraint', None)
434 if constraint is not None:
435 tc_fit_pod = constraint.get('pod', None)
436 tc_fit_installer = constraint.get('installer', None)
437 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
438 cur_pod, cur_installer, constraint)
439 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
441 if (cur_installer is None) or (tc_fit_installer and cur_installer
442 not in tc_fit_installer):
446 def _get_task_para(self, task, cur_pod):
447 task_args = task.get('task_args', None)
448 if task_args is not None:
449 task_args = task_args.get(cur_pod, task_args.get('default'))
450 task_args_fnames = task.get('task_args_fnames', None)
451 if task_args_fnames is not None:
452 task_args_fnames = task_args_fnames.get(cur_pod, None)
453 return task_args, task_args_fnames
455 def parse_suite(self):
456 """parse the suite file and return a list of task config file paths
457 and lists of optional parameters if present"""
458 LOG.info("\nParsing suite file:%s", self.path)
461 with open(self.path) as stream:
462 cfg = yaml_load(stream)
463 except IOError as ioerror:
466 self._check_schema(cfg["schema"], "suite")
467 LOG.info("\nStarting scenario:%s", cfg["name"])
469 test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
470 test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
472 if test_cases_dir[-1] != os.sep:
473 test_cases_dir += os.sep
475 cur_pod = os.environ.get('NODE_NAME', None)
476 cur_installer = os.environ.get('INSTALLER_TYPE', None)
478 valid_task_files = []
480 valid_task_args_fnames = []
482 for task in cfg["test_cases"]:
484 if "file_name" in task:
485 task_fname = task.get('file_name', None)
486 if task_fname is None:
491 if self._meet_constraint(task, cur_pod, cur_installer):
492 valid_task_files.append(test_cases_dir + task_fname)
495 # 3.fetch task parameters
496 task_args, task_args_fnames = self._get_task_para(task, cur_pod)
497 valid_task_args.append(task_args)
498 valid_task_args_fnames.append(task_args_fnames)
500 return valid_task_files, valid_task_args, valid_task_args_fnames
502 def _render_task(self, task_args, task_args_file):
503 """Render the input task with the given arguments
505 :param task_args: (dict) arguments to render the task
506 :param task_args_file: (str) file containing the arguments to render
508 :return: (str) task file rendered
513 with open(task_args_file) as f:
514 kw.update(parse_task_args('task_args_file', f.read()))
515 kw.update(parse_task_args('task_args', task_args))
517 raise y_exc.TaskRenderArgumentError()
521 with open(self.path) as f:
522 input_task = f.read()
523 rendered_task = task_template.TaskTemplate.render(input_task, **kw)
524 LOG.debug('Input task is:\n%s', rendered_task)
525 parsed_task = yaml_load(rendered_task)
526 except (IOError, OSError):
527 raise y_exc.TaskReadError(task_file=self.path)
529 raise y_exc.TaskRenderError(input_task=input_task)
531 return parsed_task, rendered_task
533 def parse_task(self, task_id, task_args=None, task_args_file=None):
534 """parses the task file and return an context and scenario instances"""
535 LOG.info("Parsing task config: %s", self.path)
537 cfg, rendered = self._render_task(task_args, task_args_file)
538 self._check_schema(cfg["schema"], "task")
539 meet_precondition = self._check_precondition(cfg)
541 # TODO: support one or many contexts? Many would simpler and precise
542 # TODO: support hybrid context type
544 context_cfgs = [cfg["context"]]
545 elif "contexts" in cfg:
546 context_cfgs = cfg["contexts"]
548 context_cfgs = [{"type": "Dummy"}]
551 for cfg_attrs in context_cfgs:
553 cfg_attrs['task_id'] = task_id
554 # default to Heat context because we are testing OpenStack
555 context_type = cfg_attrs.get("type", "Heat")
556 context = Context.get(context_type)
557 context.init(cfg_attrs)
558 # Update the name in case the context has used the name_suffix
559 cfg_attrs['name'] = context.name
560 contexts.append(context)
562 run_in_parallel = cfg.get("run_in_parallel", False)
564 # add tc and task id for influxdb extended tags
565 for scenario in cfg["scenarios"]:
566 task_name = os.path.splitext(os.path.basename(self.path))[0]
567 scenario["tc"] = task_name
568 scenario["task_id"] = task_id
569 # embed task path into scenario so we can load other files
570 # relative to task path
571 scenario["task_path"] = os.path.dirname(self.path)
573 self._change_node_names(scenario, contexts)
575 # TODO we need something better here, a class that represent the file
576 return {'scenarios': cfg['scenarios'],
577 'run_in_parallel': run_in_parallel,
578 'meet_precondition': meet_precondition,
579 'contexts': contexts,
580 'rendered': rendered}
583 def _change_node_names(scenario, contexts):
584 """Change the node names in a scenario, depending on the context config
586 The nodes (VMs or physical servers) are referred in the context section
587 with the name of the server and the name of the context:
588 <server name>.<context name>
590 If the context is going to be undeployed at the end of the test, the
591 task ID is suffixed to the name to avoid interferences with previous
592 deployments. If the context needs to be deployed at the end of the
593 test, the name assigned is kept.
595 There are several places where a node name could appear in the scenario
606 server_name: # JIRA: YARDSTICK-810
612 tg__0: tg_0.yardstick
613 vnf__0: vnf_0.yardstick
615 def qualified_name(name):
618 node_name, context_name = name.split('.')
621 # for kubernetes, some kubernetes resources don't support
622 # name format like 'xxx.xxx', so we use '-' instead
624 node_name, context_name = name.split('-')
628 ctx = next((context for context in contexts
629 if context.assigned_name == context_name))
630 except StopIteration:
631 raise y_exc.ScenarioConfigContextNameNotFound(
632 context_name=context_name)
634 return '{}{}{}'.format(node_name, sep, ctx.name)
636 if 'host' in scenario:
637 scenario['host'] = qualified_name(scenario['host'])
638 if 'target' in scenario:
639 scenario['target'] = qualified_name(scenario['target'])
640 server_name = scenario.get('options', {}).get('server_name', {})
641 if 'host' in server_name:
642 server_name['host'] = qualified_name(server_name['host'])
643 if 'target' in server_name:
644 server_name['target'] = qualified_name(server_name['target'])
645 if 'targets' in scenario:
646 for idx, target in enumerate(scenario['targets']):
647 scenario['targets'][idx] = qualified_name(target)
648 if 'nodes' in scenario:
649 for scenario_node, target in scenario['nodes'].items():
650 scenario['nodes'][scenario_node] = qualified_name(target)
652 def _check_schema(self, cfg_schema, schema_type):
653 """Check if config file is using the correct schema type"""
655 if cfg_schema != "yardstick:" + schema_type + ":0.1":
656 sys.exit("error: file %s has unknown schema %s" % (self.path,
659 def _check_precondition(self, cfg):
660 """Check if the environment meet the precondition"""
662 if "precondition" in cfg:
663 precondition = cfg["precondition"]
664 installer_type = precondition.get("installer_type", None)
665 deploy_scenarios = precondition.get("deploy_scenarios", None)
666 tc_fit_pods = precondition.get("pod_name", None)
667 installer_type_env = os.environ.get('INSTALL_TYPE', None)
668 deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
669 pod_name_env = os.environ.get('NODE_NAME', None)
671 LOG.info("installer_type: %s, installer_type_env: %s",
672 installer_type, installer_type_env)
673 LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
674 deploy_scenarios, deploy_scenario_env)
675 LOG.info("tc_fit_pods: %s, pod_name_env: %s",
676 tc_fit_pods, pod_name_env)
677 if installer_type and installer_type_env:
678 if installer_type_env not in installer_type:
680 if deploy_scenarios and deploy_scenario_env:
681 deploy_scenarios_list = deploy_scenarios.split(',')
682 for deploy_scenario in deploy_scenarios_list:
683 if deploy_scenario_env.startswith(deploy_scenario):
686 if tc_fit_pods and pod_name_env:
687 if pod_name_env not in tc_fit_pods:
692 def is_ip_addr(addr):
693 """check if string addr is an IP address"""
695 addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
696 except AttributeError:
700 ipaddress.ip_address(addr.encode('utf-8'))
707 def _is_background_scenario(scenario):
708 if "run_in_background" in scenario:
709 return scenario["run_in_background"]
714 def parse_nodes_with_context(scenario_cfg):
715 """parse the 'nodes' fields in scenario """
716 # ensure consistency in node instantiation order
717 return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
718 for nodename in sorted(scenario_cfg["nodes"]))
721 def get_networks_from_nodes(nodes):
722 """parse the 'nodes' fields in scenario """
724 for node in nodes.values():
727 interfaces = node.get('interfaces', {})
728 for interface in interfaces.values():
729 # vld_id is network_name
730 network_name = interface.get('network_name')
733 network = Context.get_network(network_name)
735 networks[network['name']] = network
739 def runner_join(runner, background_runners, outputs, result):
740 """join (wait for) a runner, exit process at runner failure
741 :param background_runners:
742 :type background_runners:
748 while runner.poll() is None:
749 outputs.update(runner.get_output())
750 result.extend(runner.get_result())
751 # drain all the background runner queues
752 for background in background_runners:
753 outputs.update(background.get_output())
754 result.extend(background.get_result())
755 status = runner.join(outputs, result)
756 base_runner.Runner.release(runner)
760 def print_invalid_header(source_name, args):
761 print("Invalid %(source)s passed:\n\n %(args)s\n"
762 % {"source": source_name, "args": args})
765 def parse_task_args(src_name, args):
766 if isinstance(args, collections.Mapping):
770 kw = args and yaml_load(args)
771 kw = {} if kw is None else kw
772 except yaml.parser.ParserError as e:
773 print_invalid_header(src_name, args)
774 print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
775 % {"source": src_name, "err": e})
778 if not isinstance(kw, dict):
779 print_invalid_header(src_name, args)
780 print("%(src)s had to be dict, actually %(src_type)s\n"
781 % {"src": src_name, "src_type": type(kw)})