1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 from collections import OrderedDict
22 from six.moves import filter
23 from jinja2 import Environment
25 from yardstick.benchmark import contexts
26 from yardstick.benchmark.contexts import base as base_context
27 from yardstick.benchmark.runners import base as base_runner
28 from yardstick.common.constants import CONF_FILE
29 from yardstick.common.yaml_loader import yaml_load
30 from yardstick.dispatcher.base import Base as DispatcherBase
31 from yardstick.common import constants
32 from yardstick.common import exceptions as y_exc
33 from yardstick.common import task_template
34 from yardstick.common import utils
35 from yardstick.common.html_template import report_template
37 output_file_default = "/tmp/yardstick.out"
38 test_cases_dir_default = "tests/opnfv/test_cases/"
39 LOG = logging.getLogger(__name__)
42 class Task(object): # pragma: no cover
45 Set of commands to manage benchmark tasks.
52 def _set_dispatchers(self, output_config):
53 dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
55 out_types = [s.strip() for s in dispatchers.split(',')]
56 output_config['DEFAULT']['dispatcher'] = out_types
58 def start(self, args, **kwargs): # pylint: disable=unused-argument
59 """Start a benchmark scenario."""
61 atexit.register(self.atexit_handler)
63 task_id = getattr(args, 'task_id')
64 self.task_id = task_id if task_id else str(uuid.uuid4())
69 output_config = utils.parse_ini_file(CONF_FILE)
70 except Exception: # pylint: disable=broad-except
71 # all error will be ignore, the default value is {}
74 self._init_output_config(output_config)
75 self._set_output_config(output_config, args.output_file)
76 LOG.debug('Output configuration is: %s', output_config)
78 self._set_dispatchers(output_config)
80 # update dispatcher list
81 if 'file' in output_config['DEFAULT']['dispatcher']:
82 result = {'status': 0, 'result': {}}
83 utils.write_json_to_file(args.output_file, result)
85 total_start_time = time.time()
86 parser = TaskParser(args.inputfile[0])
89 # 1.parse suite, return suite_params info
90 task_files, task_args, task_args_fnames = parser.parse_suite()
92 task_files = [parser.path]
93 task_args = [args.task_args]
94 task_args_fnames = [args.task_args_file]
96 LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
97 task_files, task_args, task_args_fnames)
103 tasks = self._parse_tasks(parser, task_files, args, task_args,
106 # Execute task files.
107 for i, _ in enumerate(task_files):
108 one_task_start_time = time.time()
109 self.contexts.extend(tasks[i]['contexts'])
110 if not tasks[i]['meet_precondition']:
111 LOG.info('"meet_precondition" is %s, please check environment',
112 tasks[i]['meet_precondition'])
116 success, data = self._run(tasks[i]['scenarios'],
117 tasks[i]['run_in_parallel'],
119 except KeyboardInterrupt:
121 except Exception: # pylint: disable=broad-except
122 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
124 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
128 LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name'])
129 testcases[tasks[i]['case_name']] = {'criteria': 'PASS',
132 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
134 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
138 # keep deployment, forget about stack
139 # (hide it for exit handler)
142 for context in self.contexts[::-1]:
145 one_task_end_time = time.time()
146 LOG.info("Task %s finished in %d secs", task_files[i],
147 one_task_end_time - one_task_start_time)
149 result = self._get_format_result(testcases)
151 self._do_output(output_config, result)
152 self._generate_reporting(result)
154 total_end_time = time.time()
155 LOG.info("Total finished in %d secs",
156 total_end_time - total_start_time)
158 LOG.info('To generate report, execute command "yardstick report '
159 'generate %s <YAML_NAME>"', self.task_id)
160 LOG.info("Task ALL DONE, exiting")
163 def _generate_reporting(self, result):
165 with open(constants.REPORTING_FILE, 'w') as f:
166 f.write(env.from_string(report_template).render(result))
168 LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
171 log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
172 log_formatter = logging.Formatter(log_format)
174 utils.makedirs(constants.TASK_LOG_DIR)
175 log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
176 log_handler = logging.FileHandler(log_path)
177 log_handler.setFormatter(log_formatter)
178 log_handler.setLevel(logging.DEBUG)
180 logging.root.addHandler(log_handler)
182 def _init_output_config(self, output_config):
183 output_config.setdefault('DEFAULT', {})
184 output_config.setdefault('dispatcher_http', {})
185 output_config.setdefault('dispatcher_file', {})
186 output_config.setdefault('dispatcher_influxdb', {})
187 output_config.setdefault('nsb', {})
189 def _set_output_config(self, output_config, file_path):
191 out_type = os.environ['DISPATCHER']
193 output_config['DEFAULT'].setdefault('dispatcher', 'file')
195 output_config['DEFAULT']['dispatcher'] = out_type
197 output_config['dispatcher_file']['file_path'] = file_path
200 target = os.environ['TARGET']
204 k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
205 output_config[k]['target'] = target
207 def _get_format_result(self, testcases):
208 criteria = self._get_task_criteria(testcases)
211 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
212 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
213 'pod_name': os.environ.get('NODE_NAME', 'unknown'),
214 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
220 'criteria': criteria,
221 'task_id': self.task_id,
223 'testcases': testcases
229 def _get_task_criteria(self, testcases):
230 criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
236 def _do_output(self, output_config, result):
237 dispatchers = DispatcherBase.get(output_config)
238 dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
240 for dispatcher in dispatchers:
241 dispatcher.flush_result_data(result)
243 def _run(self, scenarios, run_in_parallel, output_config):
244 """Deploys context and calls runners"""
245 for context in self.contexts:
248 background_runners = []
252 # Start all background scenarios
253 for scenario in filter(_is_background_scenario, scenarios):
254 scenario["runner"] = dict(type="Duration", duration=1000000000)
255 runner = self.run_one_scenario(scenario, output_config)
256 background_runners.append(runner)
260 for scenario in scenarios:
261 if not _is_background_scenario(scenario):
262 runner = self.run_one_scenario(scenario, output_config)
263 runners.append(runner)
265 # Wait for runners to finish
266 for runner in runners:
267 status = runner_join(runner, background_runners, self.outputs, result)
269 LOG.error("%s runner status %s", runner.__execution_type__, status)
271 LOG.info("Runner ended")
274 for scenario in scenarios:
275 if not _is_background_scenario(scenario):
276 runner = self.run_one_scenario(scenario, output_config)
277 status = runner_join(runner, background_runners, self.outputs, result)
279 LOG.error('Scenario NO.%s: "%s" ERROR!',
280 scenarios.index(scenario) + 1,
281 scenario.get('type'))
282 LOG.error("%s runner status %s", runner.__execution_type__, status)
284 LOG.info("Runner ended")
286 # Abort background runners
287 for runner in background_runners:
290 # Wait for background runners to finish
291 for runner in background_runners:
292 status = runner.join(self.outputs, result)
294 # Nuke if it did not stop nicely
295 base_runner.Runner.terminate(runner)
296 runner.join(self.outputs, result)
297 base_runner.Runner.release(runner)
299 print("Background task ended")
300 return task_success, result
302 def atexit_handler(self):
303 """handler for process termination"""
304 base_runner.Runner.terminate_all()
307 LOG.info("Undeploying all contexts")
308 for context in self.contexts[::-1]:
311 def _parse_options(self, op):
312 if isinstance(op, dict):
313 return {k: self._parse_options(v) for k, v in op.items()}
314 elif isinstance(op, list):
315 return [self._parse_options(v) for v in op]
316 elif isinstance(op, str):
317 return self.outputs.get(op[1:]) if op.startswith('$') else op
321 def _parse_tasks(self, parser, task_files, args, task_args,
326 for i, _ in enumerate(task_files):
327 parser.path = task_files[i]
328 tasks.append(parser.parse_task(self.task_id, task_args[i],
329 task_args_fnames[i]))
330 tasks[i]['case_name'] = os.path.splitext(
331 os.path.basename(task_files[i]))[0]
334 utils.makedirs(args.render_only)
335 for idx, task in enumerate(tasks):
336 output_file_name = os.path.abspath(os.path.join(
338 '{0:03d}-{1}.yml'.format(idx, task['case_name'])))
339 utils.write_file(output_file_name, task['rendered'])
345 def run_one_scenario(self, scenario_cfg, output_config):
346 """run one scenario using context"""
347 runner_cfg = scenario_cfg["runner"]
348 runner_cfg['output_config'] = output_config
350 options = scenario_cfg.get('options', {})
351 scenario_cfg['options'] = self._parse_options(options)
353 # TODO support get multi hosts/vms info
355 options = scenario_cfg.get('options') or {}
356 server_name = options.get('server_name') or {}
358 def config_context_target(cfg):
359 target = cfg['target']
360 if is_ip_addr(target):
361 context_cfg['target'] = {"ipaddr": target}
363 context_cfg['target'] = base_context.Context.get_server(target)
364 if self._is_same_context(cfg["host"], target):
365 context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
367 context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
369 host_name = server_name.get('host', scenario_cfg.get('host'))
371 context_cfg['host'] = base_context.Context.get_server(host_name)
373 for item in [server_name, scenario_cfg]:
375 config_context_target(item)
377 LOG.debug("Got a KeyError in config_context_target(%s)", item)
381 if "targets" in scenario_cfg:
383 for target in scenario_cfg["targets"]:
384 if is_ip_addr(target):
385 ip_list.append(target)
386 context_cfg['target'] = {}
388 context_cfg['target'] = (
389 base_context.Context.get_server(target))
390 if self._is_same_context(scenario_cfg["host"],
392 ip_list.append(context_cfg["target"]["private_ip"])
394 ip_list.append(context_cfg["target"]["ip"])
395 context_cfg['target']['ipaddr'] = ','.join(ip_list)
397 if "nodes" in scenario_cfg:
398 context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
399 context_cfg["networks"] = get_networks_from_nodes(
400 context_cfg["nodes"])
402 runner = base_runner.Runner.get(runner_cfg)
404 LOG.info("Starting runner of type '%s'", runner_cfg["type"])
405 runner.run(scenario_cfg, context_cfg)
409 def _is_same_context(self, host_attr, target_attr):
410 """check if two servers are in the same heat context
411 host_attr: either a name for a server created by yardstick or a dict
412 with attribute name mapping when using external heat templates
413 target_attr: either a name for a server created by yardstick or a dict
414 with attribute name mapping when using external heat templates
416 for context in self.contexts:
417 if context.__context_type__ not in {contexts.CONTEXT_HEAT,
418 contexts.CONTEXT_KUBERNETES}:
421 host = context._get_server(host_attr)
425 target = context._get_server(target_attr)
429 # Both host and target is not None, then they are in the
436 class TaskParser(object): # pragma: no cover
437 """Parser for task config files in yaml format"""
439 def __init__(self, path):
442 def _meet_constraint(self, task, cur_pod, cur_installer):
443 if "constraint" in task:
444 constraint = task.get('constraint', None)
445 if constraint is not None:
446 tc_fit_pod = constraint.get('pod', None)
447 tc_fit_installer = constraint.get('installer', None)
448 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
449 cur_pod, cur_installer, constraint)
450 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
452 if (cur_installer is None) or (tc_fit_installer and cur_installer
453 not in tc_fit_installer):
457 def _get_task_para(self, task, cur_pod):
458 task_args = task.get('task_args', None)
459 if task_args is not None:
460 task_args = task_args.get(cur_pod, task_args.get('default'))
461 task_args_fnames = task.get('task_args_fnames', None)
462 if task_args_fnames is not None:
463 task_args_fnames = task_args_fnames.get(cur_pod, None)
464 return task_args, task_args_fnames
466 def parse_suite(self):
467 """parse the suite file and return a list of task config file paths
468 and lists of optional parameters if present"""
469 LOG.info("\nParsing suite file:%s", self.path)
472 with open(self.path) as stream:
473 cfg = yaml_load(stream)
474 except IOError as ioerror:
477 self._check_schema(cfg["schema"], "suite")
478 LOG.info("\nStarting scenario:%s", cfg["name"])
480 test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
481 test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
483 if test_cases_dir[-1] != os.sep:
484 test_cases_dir += os.sep
486 cur_pod = os.environ.get('NODE_NAME', None)
487 cur_installer = os.environ.get('INSTALLER_TYPE', None)
489 valid_task_files = []
491 valid_task_args_fnames = []
493 for task in cfg["test_cases"]:
495 if "file_name" in task:
496 task_fname = task.get('file_name', None)
497 if task_fname is None:
502 if self._meet_constraint(task, cur_pod, cur_installer):
503 valid_task_files.append(test_cases_dir + task_fname)
506 # 3.fetch task parameters
507 task_args, task_args_fnames = self._get_task_para(task, cur_pod)
508 valid_task_args.append(task_args)
509 valid_task_args_fnames.append(task_args_fnames)
511 return valid_task_files, valid_task_args, valid_task_args_fnames
513 def _render_task(self, task_args, task_args_file):
514 """Render the input task with the given arguments
516 :param task_args: (dict) arguments to render the task
517 :param task_args_file: (str) file containing the arguments to render
519 :return: (str) task file rendered
524 with open(task_args_file) as f:
525 kw.update(parse_task_args('task_args_file', f.read()))
526 kw.update(parse_task_args('task_args', task_args))
528 raise y_exc.TaskRenderArgumentError()
532 with open(self.path) as f:
533 input_task = f.read()
534 rendered_task = task_template.TaskTemplate.render(input_task, **kw)
535 LOG.debug('Input task is:\n%s', rendered_task)
536 parsed_task = yaml_load(rendered_task)
537 except (IOError, OSError):
538 raise y_exc.TaskReadError(task_file=self.path)
540 raise y_exc.TaskRenderError(input_task=input_task)
542 return parsed_task, rendered_task
544 def parse_task(self, task_id, task_args=None, task_args_file=None):
545 """parses the task file and return an context and scenario instances"""
546 LOG.info("Parsing task config: %s", self.path)
548 cfg, rendered = self._render_task(task_args, task_args_file)
549 self._check_schema(cfg["schema"], "task")
550 meet_precondition = self._check_precondition(cfg)
552 # TODO: support one or many contexts? Many would simpler and precise
553 # TODO: support hybrid context type
555 context_cfgs = [cfg["context"]]
556 elif "contexts" in cfg:
557 context_cfgs = cfg["contexts"]
559 context_cfgs = [{"type": contexts.CONTEXT_DUMMY}]
562 for cfg_attrs in context_cfgs:
564 cfg_attrs['task_id'] = task_id
565 # default to Heat context because we are testing OpenStack
566 context_type = cfg_attrs.get("type", contexts.CONTEXT_HEAT)
567 context = base_context.Context.get(context_type)
568 context.init(cfg_attrs)
569 # Update the name in case the context has used the name_suffix
570 cfg_attrs['name'] = context.name
571 _contexts.append(context)
573 run_in_parallel = cfg.get("run_in_parallel", False)
575 # add tc and task id for influxdb extended tags
576 for scenario in cfg["scenarios"]:
577 task_name = os.path.splitext(os.path.basename(self.path))[0]
578 scenario["tc"] = task_name
579 scenario["task_id"] = task_id
580 # embed task path into scenario so we can load other files
581 # relative to task path
582 scenario["task_path"] = os.path.dirname(self.path)
584 self._change_node_names(scenario, _contexts)
586 # TODO we need something better here, a class that represent the file
587 return {'scenarios': cfg['scenarios'],
588 'run_in_parallel': run_in_parallel,
589 'meet_precondition': meet_precondition,
590 'contexts': _contexts,
591 'rendered': rendered}
594 def _change_node_names(scenario, _contexts):
595 """Change the node names in a scenario, depending on the context config
597 The nodes (VMs or physical servers) are referred in the context section
598 with the name of the server and the name of the context:
599 <server name>.<context name>
601 If the context is going to be undeployed at the end of the test, the
602 task ID is suffixed to the name to avoid interferences with previous
603 deployments. If the context needs to be deployed at the end of the
604 test, the name assigned is kept.
606 There are several places where a node name could appear in the scenario
617 server_name: # JIRA: YARDSTICK-810
623 tg__0: tg_0.yardstick
624 vnf__0: vnf_0.yardstick
626 NOTE: in Kubernetes context, the separator character between the server
627 name and the context name is "-":
632 def qualified_name(name):
633 for context in _contexts:
634 host_name, ctx_name = context.split_host_name(name)
635 if context.assigned_name == ctx_name:
636 return '{}{}{}'.format(host_name,
637 context.host_name_separator,
640 raise y_exc.ScenarioConfigContextNameNotFound(host_name=name)
642 if 'host' in scenario:
643 scenario['host'] = qualified_name(scenario['host'])
644 if 'target' in scenario:
645 scenario['target'] = qualified_name(scenario['target'])
646 options = scenario.get('options') or {}
647 server_name = options.get('server_name') or {}
648 if 'host' in server_name:
649 server_name['host'] = qualified_name(server_name['host'])
650 if 'target' in server_name:
651 server_name['target'] = qualified_name(server_name['target'])
652 if 'targets' in scenario:
653 for idx, target in enumerate(scenario['targets']):
654 scenario['targets'][idx] = qualified_name(target)
655 if 'nodes' in scenario:
656 for scenario_node, target in scenario['nodes'].items():
657 scenario['nodes'][scenario_node] = qualified_name(target)
659 def _check_schema(self, cfg_schema, schema_type):
660 """Check if config file is using the correct schema type"""
662 if cfg_schema != "yardstick:" + schema_type + ":0.1":
663 sys.exit("error: file %s has unknown schema %s" % (self.path,
666 def _check_precondition(self, cfg):
667 """Check if the environment meet the precondition"""
669 if "precondition" in cfg:
670 precondition = cfg["precondition"]
671 installer_type = precondition.get("installer_type", None)
672 deploy_scenarios = precondition.get("deploy_scenarios", None)
673 tc_fit_pods = precondition.get("pod_name", None)
674 installer_type_env = os.environ.get('INSTALL_TYPE', None)
675 deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
676 pod_name_env = os.environ.get('NODE_NAME', None)
678 LOG.info("installer_type: %s, installer_type_env: %s",
679 installer_type, installer_type_env)
680 LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
681 deploy_scenarios, deploy_scenario_env)
682 LOG.info("tc_fit_pods: %s, pod_name_env: %s",
683 tc_fit_pods, pod_name_env)
684 if installer_type and installer_type_env:
685 if installer_type_env not in installer_type:
687 if deploy_scenarios and deploy_scenario_env:
688 deploy_scenarios_list = deploy_scenarios.split(',')
689 for deploy_scenario in deploy_scenarios_list:
690 if deploy_scenario_env.startswith(deploy_scenario):
693 if tc_fit_pods and pod_name_env:
694 if pod_name_env not in tc_fit_pods:
699 def is_ip_addr(addr):
700 """check if string addr is an IP address"""
702 addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
703 except AttributeError:
707 ipaddress.ip_address(addr.encode('utf-8'))
714 def _is_background_scenario(scenario):
715 if "run_in_background" in scenario:
716 return scenario["run_in_background"]
721 def parse_nodes_with_context(scenario_cfg):
722 """parse the 'nodes' fields in scenario """
723 # ensure consistency in node instantiation order
724 return OrderedDict((nodename, base_context.Context.get_server(
725 scenario_cfg["nodes"][nodename]))
726 for nodename in sorted(scenario_cfg["nodes"]))
729 def get_networks_from_nodes(nodes):
730 """parse the 'nodes' fields in scenario """
732 for node in nodes.values():
735 interfaces = node.get('interfaces', {})
736 for interface in interfaces.values():
737 # vld_id is network_name
738 network_name = interface.get('network_name')
741 network = base_context.Context.get_network(network_name)
743 networks[network['name']] = network
747 def runner_join(runner, background_runners, outputs, result):
748 """join (wait for) a runner, exit process at runner failure
749 :param background_runners:
750 :type background_runners:
756 while runner.poll() is None:
757 outputs.update(runner.get_output())
758 result.extend(runner.get_result())
759 # drain all the background runner queues
760 for background in background_runners:
761 outputs.update(background.get_output())
762 result.extend(background.get_result())
763 status = runner.join(outputs, result)
764 base_runner.Runner.release(runner)
768 def print_invalid_header(source_name, args):
769 print("Invalid %(source)s passed:\n\n %(args)s\n"
770 % {"source": source_name, "args": args})
773 def parse_task_args(src_name, args):
774 if isinstance(args, collections.Mapping):
778 kw = args and yaml_load(args)
779 kw = {} if kw is None else kw
780 except yaml.parser.ParserError as e:
781 print_invalid_header(src_name, args)
782 print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
783 % {"source": src_name, "err": e})
786 if not isinstance(kw, dict):
787 print_invalid_header(src_name, args)
788 print("%(src)s had to be dict, actually %(src_type)s\n"
789 % {"src": src_name, "src_type": type(kw)})