1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 from collections import OrderedDict
22 from six.moves import filter
23 from jinja2 import Environment
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.constants import CONF_FILE
28 from yardstick.common.yaml_loader import yaml_load
29 from yardstick.dispatcher.base import Base as DispatcherBase
30 from yardstick.common import constants
31 from yardstick.common import exceptions as y_exc
32 from yardstick.common import task_template
33 from yardstick.common import utils
34 from yardstick.common.html_template import report_template
36 output_file_default = "/tmp/yardstick.out"
37 test_cases_dir_default = "tests/opnfv/test_cases/"
38 LOG = logging.getLogger(__name__)
41 class Task(object): # pragma: no cover
44 Set of commands to manage benchmark tasks.
51 def _set_dispatchers(self, output_config):
52 dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
54 out_types = [s.strip() for s in dispatchers.split(',')]
55 output_config['DEFAULT']['dispatcher'] = out_types
57 def start(self, args, **kwargs): # pylint: disable=unused-argument
58 """Start a benchmark scenario."""
60 atexit.register(self.atexit_handler)
62 task_id = getattr(args, 'task_id')
63 self.task_id = task_id if task_id else str(uuid.uuid4())
68 output_config = utils.parse_ini_file(CONF_FILE)
69 except Exception: # pylint: disable=broad-except
70 # all error will be ignore, the default value is {}
73 self._init_output_config(output_config)
74 self._set_output_config(output_config, args.output_file)
75 LOG.debug('Output configuration is: %s', output_config)
77 self._set_dispatchers(output_config)
79 # update dispatcher list
80 if 'file' in output_config['DEFAULT']['dispatcher']:
81 result = {'status': 0, 'result': {}}
82 utils.write_json_to_file(args.output_file, result)
84 total_start_time = time.time()
85 parser = TaskParser(args.inputfile[0])
88 # 1.parse suite, return suite_params info
89 task_files, task_args, task_args_fnames = parser.parse_suite()
91 task_files = [parser.path]
92 task_args = [args.task_args]
93 task_args_fnames = [args.task_args_file]
95 LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
96 task_files, task_args, task_args_fnames)
102 tasks = self._parse_tasks(parser, task_files, args, task_args,
105 # Execute task files.
106 for i, _ in enumerate(task_files):
107 one_task_start_time = time.time()
108 self.contexts.extend(tasks[i]['contexts'])
109 if not tasks[i]['meet_precondition']:
110 LOG.info('"meet_precondition" is %s, please check environment',
111 tasks[i]['meet_precondition'])
115 success, data = self._run(tasks[i]['scenarios'],
116 tasks[i]['run_in_parallel'],
118 except KeyboardInterrupt:
120 except Exception: # pylint: disable=broad-except
121 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
123 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
127 LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name'])
128 testcases[tasks[i]['case_name']] = {'criteria': 'PASS',
131 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
133 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
137 # keep deployment, forget about stack
138 # (hide it for exit handler)
141 for context in self.contexts[::-1]:
144 one_task_end_time = time.time()
145 LOG.info("Task %s finished in %d secs", task_files[i],
146 one_task_end_time - one_task_start_time)
148 result = self._get_format_result(testcases)
150 self._do_output(output_config, result)
151 self._generate_reporting(result)
153 total_end_time = time.time()
154 LOG.info("Total finished in %d secs",
155 total_end_time - total_start_time)
157 LOG.info('To generate report, execute command "yardstick report '
158 'generate %s <YAML_NAME>"', self.task_id)
159 LOG.info("Task ALL DONE, exiting")
162 def _generate_reporting(self, result):
164 with open(constants.REPORTING_FILE, 'w') as f:
165 f.write(env.from_string(report_template).render(result))
167 LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
170 log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
171 log_formatter = logging.Formatter(log_format)
173 utils.makedirs(constants.TASK_LOG_DIR)
174 log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
175 log_handler = logging.FileHandler(log_path)
176 log_handler.setFormatter(log_formatter)
177 log_handler.setLevel(logging.DEBUG)
179 logging.root.addHandler(log_handler)
181 def _init_output_config(self, output_config):
182 output_config.setdefault('DEFAULT', {})
183 output_config.setdefault('dispatcher_http', {})
184 output_config.setdefault('dispatcher_file', {})
185 output_config.setdefault('dispatcher_influxdb', {})
186 output_config.setdefault('nsb', {})
188 def _set_output_config(self, output_config, file_path):
190 out_type = os.environ['DISPATCHER']
192 output_config['DEFAULT'].setdefault('dispatcher', 'file')
194 output_config['DEFAULT']['dispatcher'] = out_type
196 output_config['dispatcher_file']['file_path'] = file_path
199 target = os.environ['TARGET']
203 k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
204 output_config[k]['target'] = target
206 def _get_format_result(self, testcases):
207 criteria = self._get_task_criteria(testcases)
210 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
211 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
212 'pod_name': os.environ.get('NODE_NAME', 'unknown'),
213 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
219 'criteria': criteria,
220 'task_id': self.task_id,
222 'testcases': testcases
228 def _get_task_criteria(self, testcases):
229 criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
235 def _do_output(self, output_config, result):
236 dispatchers = DispatcherBase.get(output_config)
237 dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
239 for dispatcher in dispatchers:
240 dispatcher.flush_result_data(result)
242 def _run(self, scenarios, run_in_parallel, output_config):
243 """Deploys context and calls runners"""
244 for context in self.contexts:
247 background_runners = []
251 # Start all background scenarios
252 for scenario in filter(_is_background_scenario, scenarios):
253 scenario["runner"] = dict(type="Duration", duration=1000000000)
254 runner = self.run_one_scenario(scenario, output_config)
255 background_runners.append(runner)
259 for scenario in scenarios:
260 if not _is_background_scenario(scenario):
261 runner = self.run_one_scenario(scenario, output_config)
262 runners.append(runner)
264 # Wait for runners to finish
265 for runner in runners:
266 status = runner_join(runner, background_runners, self.outputs, result)
268 LOG.error("%s runner status %s", runner.__execution_type__, status)
270 LOG.info("Runner ended")
273 for scenario in scenarios:
274 if not _is_background_scenario(scenario):
275 runner = self.run_one_scenario(scenario, output_config)
276 status = runner_join(runner, background_runners, self.outputs, result)
278 LOG.error('Scenario NO.%s: "%s" ERROR!',
279 scenarios.index(scenario) + 1,
280 scenario.get('type'))
281 LOG.error("%s runner status %s", runner.__execution_type__, status)
283 LOG.info("Runner ended")
285 # Abort background runners
286 for runner in background_runners:
289 # Wait for background runners to finish
290 for runner in background_runners:
291 status = runner.join(self.outputs, result)
293 # Nuke if it did not stop nicely
294 base_runner.Runner.terminate(runner)
295 runner.join(self.outputs, result)
296 base_runner.Runner.release(runner)
298 print("Background task ended")
299 return task_success, result
301 def atexit_handler(self):
302 """handler for process termination"""
303 base_runner.Runner.terminate_all()
306 LOG.info("Undeploying all contexts")
307 for context in self.contexts[::-1]:
310 def _parse_options(self, op):
311 if isinstance(op, dict):
312 return {k: self._parse_options(v) for k, v in op.items()}
313 elif isinstance(op, list):
314 return [self._parse_options(v) for v in op]
315 elif isinstance(op, str):
316 return self.outputs.get(op[1:]) if op.startswith('$') else op
320 def _parse_tasks(self, parser, task_files, args, task_args,
325 for i, _ in enumerate(task_files):
326 parser.path = task_files[i]
327 tasks.append(parser.parse_task(self.task_id, task_args[i],
328 task_args_fnames[i]))
329 tasks[i]['case_name'] = os.path.splitext(
330 os.path.basename(task_files[i]))[0]
333 utils.makedirs(args.render_only)
334 for idx, task in enumerate(tasks):
335 output_file_name = os.path.abspath(os.path.join(
337 '{0:03d}-{1}.yml'.format(idx, task['case_name'])))
338 utils.write_file(output_file_name, task['rendered'])
344 def run_one_scenario(self, scenario_cfg, output_config):
345 """run one scenario using context"""
346 runner_cfg = scenario_cfg["runner"]
347 runner_cfg['output_config'] = output_config
349 options = scenario_cfg.get('options', {})
350 scenario_cfg['options'] = self._parse_options(options)
352 # TODO support get multi hosts/vms info
354 options = scenario_cfg.get('options') or {}
355 server_name = options.get('server_name') or {}
357 def config_context_target(cfg):
358 target = cfg['target']
359 if is_ip_addr(target):
360 context_cfg['target'] = {"ipaddr": target}
362 context_cfg['target'] = Context.get_server(target)
363 if self._is_same_context(cfg["host"], target):
364 context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
366 context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
368 host_name = server_name.get('host', scenario_cfg.get('host'))
370 context_cfg['host'] = Context.get_server(host_name)
372 for item in [server_name, scenario_cfg]:
374 config_context_target(item)
376 LOG.debug("Got a KeyError in config_context_target(%s)", item)
380 if "targets" in scenario_cfg:
382 for target in scenario_cfg["targets"]:
383 if is_ip_addr(target):
384 ip_list.append(target)
385 context_cfg['target'] = {}
387 context_cfg['target'] = Context.get_server(target)
388 if self._is_same_context(scenario_cfg["host"],
390 ip_list.append(context_cfg["target"]["private_ip"])
392 ip_list.append(context_cfg["target"]["ip"])
393 context_cfg['target']['ipaddr'] = ','.join(ip_list)
395 if "nodes" in scenario_cfg:
396 context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
397 context_cfg["networks"] = get_networks_from_nodes(
398 context_cfg["nodes"])
400 runner = base_runner.Runner.get(runner_cfg)
402 LOG.info("Starting runner of type '%s'", runner_cfg["type"])
403 runner.run(scenario_cfg, context_cfg)
407 def _is_same_context(self, host_attr, target_attr):
408 """check if two servers are in the same heat context
409 host_attr: either a name for a server created by yardstick or a dict
410 with attribute name mapping when using external heat templates
411 target_attr: either a name for a server created by yardstick or a dict
412 with attribute name mapping when using external heat templates
414 for context in self.contexts:
415 if context.__context_type__ not in {"Heat", "Kubernetes"}:
418 host = context._get_server(host_attr)
422 target = context._get_server(target_attr)
426 # Both host and target is not None, then they are in the
433 class TaskParser(object): # pragma: no cover
434 """Parser for task config files in yaml format"""
436 def __init__(self, path):
439 def _meet_constraint(self, task, cur_pod, cur_installer):
440 if "constraint" in task:
441 constraint = task.get('constraint', None)
442 if constraint is not None:
443 tc_fit_pod = constraint.get('pod', None)
444 tc_fit_installer = constraint.get('installer', None)
445 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
446 cur_pod, cur_installer, constraint)
447 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
449 if (cur_installer is None) or (tc_fit_installer and cur_installer
450 not in tc_fit_installer):
454 def _get_task_para(self, task, cur_pod):
455 task_args = task.get('task_args', None)
456 if task_args is not None:
457 task_args = task_args.get(cur_pod, task_args.get('default'))
458 task_args_fnames = task.get('task_args_fnames', None)
459 if task_args_fnames is not None:
460 task_args_fnames = task_args_fnames.get(cur_pod, None)
461 return task_args, task_args_fnames
463 def parse_suite(self):
464 """parse the suite file and return a list of task config file paths
465 and lists of optional parameters if present"""
466 LOG.info("\nParsing suite file:%s", self.path)
469 with open(self.path) as stream:
470 cfg = yaml_load(stream)
471 except IOError as ioerror:
474 self._check_schema(cfg["schema"], "suite")
475 LOG.info("\nStarting scenario:%s", cfg["name"])
477 test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
478 test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
480 if test_cases_dir[-1] != os.sep:
481 test_cases_dir += os.sep
483 cur_pod = os.environ.get('NODE_NAME', None)
484 cur_installer = os.environ.get('INSTALLER_TYPE', None)
486 valid_task_files = []
488 valid_task_args_fnames = []
490 for task in cfg["test_cases"]:
492 if "file_name" in task:
493 task_fname = task.get('file_name', None)
494 if task_fname is None:
499 if self._meet_constraint(task, cur_pod, cur_installer):
500 valid_task_files.append(test_cases_dir + task_fname)
503 # 3.fetch task parameters
504 task_args, task_args_fnames = self._get_task_para(task, cur_pod)
505 valid_task_args.append(task_args)
506 valid_task_args_fnames.append(task_args_fnames)
508 return valid_task_files, valid_task_args, valid_task_args_fnames
510 def _render_task(self, task_args, task_args_file):
511 """Render the input task with the given arguments
513 :param task_args: (dict) arguments to render the task
514 :param task_args_file: (str) file containing the arguments to render
516 :return: (str) task file rendered
521 with open(task_args_file) as f:
522 kw.update(parse_task_args('task_args_file', f.read()))
523 kw.update(parse_task_args('task_args', task_args))
525 raise y_exc.TaskRenderArgumentError()
529 with open(self.path) as f:
530 input_task = f.read()
531 rendered_task = task_template.TaskTemplate.render(input_task, **kw)
532 LOG.debug('Input task is:\n%s', rendered_task)
533 parsed_task = yaml_load(rendered_task)
534 except (IOError, OSError):
535 raise y_exc.TaskReadError(task_file=self.path)
537 raise y_exc.TaskRenderError(input_task=input_task)
539 return parsed_task, rendered_task
541 def parse_task(self, task_id, task_args=None, task_args_file=None):
542 """parses the task file and return an context and scenario instances"""
543 LOG.info("Parsing task config: %s", self.path)
545 cfg, rendered = self._render_task(task_args, task_args_file)
546 self._check_schema(cfg["schema"], "task")
547 meet_precondition = self._check_precondition(cfg)
549 # TODO: support one or many contexts? Many would simpler and precise
550 # TODO: support hybrid context type
552 context_cfgs = [cfg["context"]]
553 elif "contexts" in cfg:
554 context_cfgs = cfg["contexts"]
556 context_cfgs = [{"type": "Dummy"}]
559 for cfg_attrs in context_cfgs:
561 cfg_attrs['task_id'] = task_id
562 # default to Heat context because we are testing OpenStack
563 context_type = cfg_attrs.get("type", "Heat")
564 context = Context.get(context_type)
565 context.init(cfg_attrs)
566 # Update the name in case the context has used the name_suffix
567 cfg_attrs['name'] = context.name
568 contexts.append(context)
570 run_in_parallel = cfg.get("run_in_parallel", False)
572 # add tc and task id for influxdb extended tags
573 for scenario in cfg["scenarios"]:
574 task_name = os.path.splitext(os.path.basename(self.path))[0]
575 scenario["tc"] = task_name
576 scenario["task_id"] = task_id
577 # embed task path into scenario so we can load other files
578 # relative to task path
579 scenario["task_path"] = os.path.dirname(self.path)
581 self._change_node_names(scenario, contexts)
583 # TODO we need something better here, a class that represent the file
584 return {'scenarios': cfg['scenarios'],
585 'run_in_parallel': run_in_parallel,
586 'meet_precondition': meet_precondition,
587 'contexts': contexts,
588 'rendered': rendered}
591 def _change_node_names(scenario, contexts):
592 """Change the node names in a scenario, depending on the context config
594 The nodes (VMs or physical servers) are referred in the context section
595 with the name of the server and the name of the context:
596 <server name>.<context name>
598 If the context is going to be undeployed at the end of the test, the
599 task ID is suffixed to the name to avoid interferences with previous
600 deployments. If the context needs to be deployed at the end of the
601 test, the name assigned is kept.
603 There are several places where a node name could appear in the scenario
614 server_name: # JIRA: YARDSTICK-810
620 tg__0: tg_0.yardstick
621 vnf__0: vnf_0.yardstick
623 NOTE: in Kubernetes context, the separator character between the server
624 name and the context name is "-":
629 def qualified_name(name):
630 for context in contexts:
631 host_name, ctx_name = context.split_host_name(name)
632 if context.assigned_name == ctx_name:
633 return '{}{}{}'.format(host_name,
634 context.host_name_separator,
637 raise y_exc.ScenarioConfigContextNameNotFound(host_name=name)
639 if 'host' in scenario:
640 scenario['host'] = qualified_name(scenario['host'])
641 if 'target' in scenario:
642 scenario['target'] = qualified_name(scenario['target'])
643 options = scenario.get('options') or {}
644 server_name = options.get('server_name') or {}
645 if 'host' in server_name:
646 server_name['host'] = qualified_name(server_name['host'])
647 if 'target' in server_name:
648 server_name['target'] = qualified_name(server_name['target'])
649 if 'targets' in scenario:
650 for idx, target in enumerate(scenario['targets']):
651 scenario['targets'][idx] = qualified_name(target)
652 if 'nodes' in scenario:
653 for scenario_node, target in scenario['nodes'].items():
654 scenario['nodes'][scenario_node] = qualified_name(target)
656 def _check_schema(self, cfg_schema, schema_type):
657 """Check if config file is using the correct schema type"""
659 if cfg_schema != "yardstick:" + schema_type + ":0.1":
660 sys.exit("error: file %s has unknown schema %s" % (self.path,
663 def _check_precondition(self, cfg):
664 """Check if the environment meet the precondition"""
666 if "precondition" in cfg:
667 precondition = cfg["precondition"]
668 installer_type = precondition.get("installer_type", None)
669 deploy_scenarios = precondition.get("deploy_scenarios", None)
670 tc_fit_pods = precondition.get("pod_name", None)
671 installer_type_env = os.environ.get('INSTALL_TYPE', None)
672 deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
673 pod_name_env = os.environ.get('NODE_NAME', None)
675 LOG.info("installer_type: %s, installer_type_env: %s",
676 installer_type, installer_type_env)
677 LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
678 deploy_scenarios, deploy_scenario_env)
679 LOG.info("tc_fit_pods: %s, pod_name_env: %s",
680 tc_fit_pods, pod_name_env)
681 if installer_type and installer_type_env:
682 if installer_type_env not in installer_type:
684 if deploy_scenarios and deploy_scenario_env:
685 deploy_scenarios_list = deploy_scenarios.split(',')
686 for deploy_scenario in deploy_scenarios_list:
687 if deploy_scenario_env.startswith(deploy_scenario):
690 if tc_fit_pods and pod_name_env:
691 if pod_name_env not in tc_fit_pods:
696 def is_ip_addr(addr):
697 """check if string addr is an IP address"""
699 addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
700 except AttributeError:
704 ipaddress.ip_address(addr.encode('utf-8'))
711 def _is_background_scenario(scenario):
712 if "run_in_background" in scenario:
713 return scenario["run_in_background"]
718 def parse_nodes_with_context(scenario_cfg):
719 """parse the 'nodes' fields in scenario """
720 # ensure consistency in node instantiation order
721 return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
722 for nodename in sorted(scenario_cfg["nodes"]))
725 def get_networks_from_nodes(nodes):
726 """parse the 'nodes' fields in scenario """
728 for node in nodes.values():
731 interfaces = node.get('interfaces', {})
732 for interface in interfaces.values():
733 # vld_id is network_name
734 network_name = interface.get('network_name')
737 network = Context.get_network(network_name)
739 networks[network['name']] = network
743 def runner_join(runner, background_runners, outputs, result):
744 """join (wait for) a runner, exit process at runner failure
745 :param background_runners:
746 :type background_runners:
752 while runner.poll() is None:
753 outputs.update(runner.get_output())
754 result.extend(runner.get_result())
755 # drain all the background runner queues
756 for background in background_runners:
757 outputs.update(background.get_output())
758 result.extend(background.get_result())
759 status = runner.join(outputs, result)
760 base_runner.Runner.release(runner)
764 def print_invalid_header(source_name, args):
765 print("Invalid %(source)s passed:\n\n %(args)s\n"
766 % {"source": source_name, "args": args})
769 def parse_task_args(src_name, args):
770 if isinstance(args, collections.Mapping):
774 kw = args and yaml_load(args)
775 kw = {} if kw is None else kw
776 except yaml.parser.ParserError as e:
777 print_invalid_header(src_name, args)
778 print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
779 % {"source": src_name, "err": e})
782 if not isinstance(kw, dict):
783 print_invalid_header(src_name, args)
784 print("%(src)s had to be dict, actually %(src_type)s\n"
785 % {"src": src_name, "src_type": type(kw)})