1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 from collections import OrderedDict
23 from six.moves import filter
24 from jinja2 import Environment
26 from yardstick.benchmark import contexts
27 from yardstick.benchmark.contexts import base as base_context
28 from yardstick.benchmark.runners import base as base_runner
29 from yardstick.common.constants import CONF_FILE
30 from yardstick.common.yaml_loader import yaml_load
31 from yardstick.dispatcher.base import Base as DispatcherBase
32 from yardstick.common import constants
33 from yardstick.common import exceptions as y_exc
34 from yardstick.common import task_template
35 from yardstick.common import utils
36 from yardstick.common.html_template import report_template
38 output_file_default = "/tmp/yardstick.out"
39 test_cases_dir_default = "tests/opnfv/test_cases/"
40 LOG = logging.getLogger(__name__)
43 class Task(object): # pragma: no cover
46 Set of commands to manage benchmark tasks.
53 def _set_dispatchers(self, output_config):
54 dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
56 out_types = [s.strip() for s in dispatchers.split(',')]
57 output_config['DEFAULT']['dispatcher'] = out_types
59 def start(self, args, **kwargs): # pylint: disable=unused-argument
60 """Start a benchmark scenario."""
62 atexit.register(self.atexit_handler)
64 task_id = getattr(args, 'task_id')
65 self.task_id = task_id if task_id else str(uuid.uuid4())
70 output_config = utils.parse_ini_file(CONF_FILE)
71 except Exception: # pylint: disable=broad-except
72 # all error will be ignore, the default value is {}
75 self._init_output_config(output_config)
76 self._set_output_config(output_config, args.output_file)
77 LOG.debug('Output configuration is: %s', output_config)
79 self._set_dispatchers(output_config)
81 # update dispatcher list
82 if 'file' in output_config['DEFAULT']['dispatcher']:
83 result = {'status': 0, 'result': {}}
84 utils.write_json_to_file(args.output_file, result)
86 total_start_time = time.time()
87 parser = TaskParser(args.inputfile[0])
90 # 1.parse suite, return suite_params info
91 task_files, task_args, task_args_fnames = parser.parse_suite()
93 task_files = [parser.path]
94 task_args = [args.task_args]
95 task_args_fnames = [args.task_args_file]
97 LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
98 task_files, task_args, task_args_fnames)
104 tasks = self._parse_tasks(parser, task_files, args, task_args,
107 # Execute task files.
108 for i, _ in enumerate(task_files):
109 one_task_start_time = time.time()
110 self.contexts.extend(tasks[i]['contexts'])
111 if not tasks[i]['meet_precondition']:
112 LOG.info('"meet_precondition" is %s, please check environment',
113 tasks[i]['meet_precondition'])
117 success, data = self._run(tasks[i]['scenarios'],
118 tasks[i]['run_in_parallel'],
120 except KeyboardInterrupt:
122 except Exception: # pylint: disable=broad-except
123 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
125 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
129 LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name'])
130 testcases[tasks[i]['case_name']] = {'criteria': 'PASS',
133 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
135 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
139 # keep deployment, forget about stack
140 # (hide it for exit handler)
143 for context in self.contexts[::-1]:
146 one_task_end_time = time.time()
147 LOG.info("Task %s finished in %d secs", task_files[i],
148 one_task_end_time - one_task_start_time)
150 result = self._get_format_result(testcases)
152 self._do_output(output_config, result)
153 self._generate_reporting(result)
155 total_end_time = time.time()
156 LOG.info("Total finished in %d secs",
157 total_end_time - total_start_time)
159 LOG.info('To generate report, execute command "yardstick report '
160 'generate %s <YAML_NAME>"', self.task_id)
161 LOG.info("Task ALL DONE, exiting")
164 def _generate_reporting(self, result):
166 with open(constants.REPORTING_FILE, 'w') as f:
167 f.write(env.from_string(report_template).render(result))
169 LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
172 log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
173 log_formatter = logging.Formatter(log_format)
175 utils.makedirs(constants.TASK_LOG_DIR)
176 log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
177 log_handler = logging.FileHandler(log_path)
178 log_handler.setFormatter(log_formatter)
179 log_handler.setLevel(logging.DEBUG)
181 logging.root.addHandler(log_handler)
183 def _init_output_config(self, output_config):
184 output_config.setdefault('DEFAULT', {})
185 output_config.setdefault('dispatcher_http', {})
186 output_config.setdefault('dispatcher_file', {})
187 output_config.setdefault('dispatcher_influxdb', {})
188 output_config.setdefault('nsb', {})
190 def _set_output_config(self, output_config, file_path):
192 out_type = os.environ['DISPATCHER']
194 output_config['DEFAULT'].setdefault('dispatcher', 'file')
196 output_config['DEFAULT']['dispatcher'] = out_type
198 output_config['dispatcher_file']['file_path'] = file_path
201 target = os.environ['TARGET']
205 k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
206 output_config[k]['target'] = target
208 def _get_format_result(self, testcases):
209 criteria = self._get_task_criteria(testcases)
212 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
213 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
214 'pod_name': os.environ.get('NODE_NAME', 'unknown'),
215 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
221 'criteria': criteria,
222 'task_id': self.task_id,
224 'testcases': testcases
230 def _get_task_criteria(self, testcases):
231 criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
237 def _do_output(self, output_config, result):
238 dispatchers = DispatcherBase.get(output_config)
239 dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
241 for dispatcher in dispatchers:
242 dispatcher.flush_result_data(result)
244 def _run(self, scenarios, run_in_parallel, output_config):
245 """Deploys context and calls runners"""
246 for context in self.contexts:
249 background_runners = []
253 # Start all background scenarios
254 for scenario in filter(_is_background_scenario, scenarios):
255 scenario["runner"] = dict(type="Duration", duration=1000000000)
256 runner = self.run_one_scenario(scenario, output_config)
257 background_runners.append(runner)
261 for scenario in scenarios:
262 if not _is_background_scenario(scenario):
263 runner = self.run_one_scenario(scenario, output_config)
264 runners.append(runner)
266 # Wait for runners to finish
267 for runner in runners:
268 status = runner_join(runner, background_runners, self.outputs, result)
270 LOG.error("%s runner status %s", runner.__execution_type__, status)
272 LOG.info("Runner ended")
275 for scenario in scenarios:
276 if not _is_background_scenario(scenario):
277 runner = self.run_one_scenario(scenario, output_config)
278 status = runner_join(runner, background_runners, self.outputs, result)
280 LOG.error('Scenario NO.%s: "%s" ERROR!',
281 scenarios.index(scenario) + 1,
282 scenario.get('type'))
283 LOG.error("%s runner status %s", runner.__execution_type__, status)
285 LOG.info("Runner ended")
287 # Abort background runners
288 for runner in background_runners:
291 # Wait for background runners to finish
292 for runner in background_runners:
293 status = runner.join(self.outputs, result)
295 # Nuke if it did not stop nicely
296 base_runner.Runner.terminate(runner)
297 runner.join(self.outputs, result)
298 base_runner.Runner.release(runner)
300 print("Background task ended")
301 return task_success, result
303 def atexit_handler(self):
304 """handler for process termination"""
305 base_runner.Runner.terminate_all()
308 LOG.info("Undeploying all contexts")
309 for context in self.contexts[::-1]:
312 def _parse_options(self, op):
313 if isinstance(op, dict):
314 return {k: self._parse_options(v) for k, v in op.items()}
315 elif isinstance(op, list):
316 return [self._parse_options(v) for v in op]
317 elif isinstance(op, six.string_types):
318 return self.outputs.get(op[1:]) if op.startswith('$') else op
322 def _parse_tasks(self, parser, task_files, args, task_args,
327 for i, _ in enumerate(task_files):
328 parser.path = task_files[i]
329 tasks.append(parser.parse_task(self.task_id, task_args[i],
330 task_args_fnames[i]))
331 tasks[i]['case_name'] = os.path.splitext(
332 os.path.basename(task_files[i]))[0]
335 utils.makedirs(args.render_only)
336 for idx, task in enumerate(tasks):
337 output_file_name = os.path.abspath(os.path.join(
339 '{0:03d}-{1}.yml'.format(idx, task['case_name'])))
340 utils.write_file(output_file_name, task['rendered'])
346 def run_one_scenario(self, scenario_cfg, output_config):
347 """run one scenario using context"""
348 runner_cfg = scenario_cfg["runner"]
349 runner_cfg['output_config'] = output_config
351 options = scenario_cfg.get('options', {})
352 scenario_cfg['options'] = self._parse_options(options)
354 # TODO support get multi hosts/vms info
356 options = scenario_cfg.get('options') or {}
357 server_name = options.get('server_name') or {}
359 def config_context_target(cfg):
360 target = cfg['target']
361 if is_ip_addr(target):
362 context_cfg['target'] = {"ipaddr": target}
364 context_cfg['target'] = base_context.Context.get_server(target)
365 if self._is_same_context(cfg["host"], target):
366 context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
368 context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
370 host_name = server_name.get('host', scenario_cfg.get('host'))
372 context_cfg['host'] = base_context.Context.get_server(host_name)
374 for item in [server_name, scenario_cfg]:
376 config_context_target(item)
378 LOG.debug("Got a KeyError in config_context_target(%s)", item)
382 if "targets" in scenario_cfg:
384 for target in scenario_cfg["targets"]:
385 if is_ip_addr(target):
386 ip_list.append(target)
387 context_cfg['target'] = {}
389 context_cfg['target'] = (
390 base_context.Context.get_server(target))
391 if self._is_same_context(scenario_cfg["host"],
393 ip_list.append(context_cfg["target"]["private_ip"])
395 ip_list.append(context_cfg["target"]["ip"])
396 context_cfg['target']['ipaddr'] = ','.join(ip_list)
398 if "nodes" in scenario_cfg:
399 context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
400 context_cfg["networks"] = get_networks_from_nodes(
401 context_cfg["nodes"])
403 runner = base_runner.Runner.get(runner_cfg)
405 LOG.info("Starting runner of type '%s'", runner_cfg["type"])
406 runner.run(scenario_cfg, context_cfg)
410 def _is_same_context(self, host_attr, target_attr):
411 """check if two servers are in the same heat context
412 host_attr: either a name for a server created by yardstick or a dict
413 with attribute name mapping when using external heat templates
414 target_attr: either a name for a server created by yardstick or a dict
415 with attribute name mapping when using external heat templates
417 for context in self.contexts:
418 if context.__context_type__ not in {contexts.CONTEXT_HEAT,
419 contexts.CONTEXT_KUBERNETES}:
422 host = context._get_server(host_attr)
426 target = context._get_server(target_attr)
430 # Both host and target is not None, then they are in the
437 class TaskParser(object): # pragma: no cover
438 """Parser for task config files in yaml format"""
440 def __init__(self, path):
443 def _meet_constraint(self, task, cur_pod, cur_installer):
444 if "constraint" in task:
445 constraint = task.get('constraint', None)
446 if constraint is not None:
447 tc_fit_pod = constraint.get('pod', None)
448 tc_fit_installer = constraint.get('installer', None)
449 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
450 cur_pod, cur_installer, constraint)
451 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
453 if (cur_installer is None) or (tc_fit_installer and cur_installer
454 not in tc_fit_installer):
458 def _get_task_para(self, task, cur_pod):
459 task_args = task.get('task_args', None)
460 if task_args is not None:
461 task_args = task_args.get(cur_pod, task_args.get('default'))
462 task_args_fnames = task.get('task_args_fnames', None)
463 if task_args_fnames is not None:
464 task_args_fnames = task_args_fnames.get(cur_pod, None)
465 return task_args, task_args_fnames
467 def parse_suite(self):
468 """parse the suite file and return a list of task config file paths
469 and lists of optional parameters if present"""
470 LOG.info("\nParsing suite file:%s", self.path)
473 with open(self.path) as stream:
474 cfg = yaml_load(stream)
475 except IOError as ioerror:
478 self._check_schema(cfg["schema"], "suite")
479 LOG.info("\nStarting scenario:%s", cfg["name"])
481 test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
482 test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
484 if test_cases_dir[-1] != os.sep:
485 test_cases_dir += os.sep
487 cur_pod = os.environ.get('NODE_NAME', None)
488 cur_installer = os.environ.get('INSTALLER_TYPE', None)
490 valid_task_files = []
492 valid_task_args_fnames = []
494 for task in cfg["test_cases"]:
496 if "file_name" in task:
497 task_fname = task.get('file_name', None)
498 if task_fname is None:
503 if self._meet_constraint(task, cur_pod, cur_installer):
504 valid_task_files.append(test_cases_dir + task_fname)
507 # 3.fetch task parameters
508 task_args, task_args_fnames = self._get_task_para(task, cur_pod)
509 valid_task_args.append(task_args)
510 valid_task_args_fnames.append(task_args_fnames)
512 return valid_task_files, valid_task_args, valid_task_args_fnames
514 def _render_task(self, task_args, task_args_file):
515 """Render the input task with the given arguments
517 :param task_args: (dict) arguments to render the task
518 :param task_args_file: (str) file containing the arguments to render
520 :return: (str) task file rendered
525 with open(task_args_file) as f:
526 kw.update(parse_task_args('task_args_file', f.read()))
527 kw.update(parse_task_args('task_args', task_args))
529 raise y_exc.TaskRenderArgumentError()
533 with open(self.path) as f:
534 input_task = f.read()
535 rendered_task = task_template.TaskTemplate.render(input_task, **kw)
536 LOG.debug('Input task is:\n%s', rendered_task)
537 parsed_task = yaml_load(rendered_task)
538 except (IOError, OSError):
539 raise y_exc.TaskReadError(task_file=self.path)
541 raise y_exc.TaskRenderError(input_task=input_task)
543 return parsed_task, rendered_task
545 def parse_task(self, task_id, task_args=None, task_args_file=None):
546 """parses the task file and return an context and scenario instances"""
547 LOG.info("Parsing task config: %s", self.path)
549 cfg, rendered = self._render_task(task_args, task_args_file)
550 self._check_schema(cfg["schema"], "task")
551 meet_precondition = self._check_precondition(cfg)
553 # TODO: support one or many contexts? Many would simpler and precise
554 # TODO: support hybrid context type
556 context_cfgs = [cfg["context"]]
557 elif "contexts" in cfg:
558 context_cfgs = cfg["contexts"]
560 context_cfgs = [{"type": contexts.CONTEXT_DUMMY}]
563 for cfg_attrs in context_cfgs:
565 cfg_attrs['task_id'] = task_id
566 # default to Heat context because we are testing OpenStack
567 context_type = cfg_attrs.get("type", contexts.CONTEXT_HEAT)
568 context = base_context.Context.get(context_type)
569 context.init(cfg_attrs)
570 # Update the name in case the context has used the name_suffix
571 cfg_attrs['name'] = context.name
572 _contexts.append(context)
574 run_in_parallel = cfg.get("run_in_parallel", False)
576 # add tc and task id for influxdb extended tags
577 for scenario in cfg["scenarios"]:
578 task_name = os.path.splitext(os.path.basename(self.path))[0]
579 scenario["tc"] = task_name
580 scenario["task_id"] = task_id
581 # embed task path into scenario so we can load other files
582 # relative to task path
583 scenario["task_path"] = os.path.dirname(self.path)
585 self._change_node_names(scenario, _contexts)
587 # TODO we need something better here, a class that represent the file
588 return {'scenarios': cfg['scenarios'],
589 'run_in_parallel': run_in_parallel,
590 'meet_precondition': meet_precondition,
591 'contexts': _contexts,
592 'rendered': rendered}
595 def _change_node_names(scenario, _contexts):
596 """Change the node names in a scenario, depending on the context config
598 The nodes (VMs or physical servers) are referred in the context section
599 with the name of the server and the name of the context:
600 <server name>.<context name>
602 If the context is going to be undeployed at the end of the test, the
603 task ID is suffixed to the name to avoid interferences with previous
604 deployments. If the context needs to be deployed at the end of the
605 test, the name assigned is kept.
607 There are several places where a node name could appear in the scenario
618 server_name: # JIRA: YARDSTICK-810
624 tg__0: tg_0.yardstick
625 vnf__0: vnf_0.yardstick
627 NOTE: in Kubernetes context, the separator character between the server
628 name and the context name is "-":
633 def qualified_name(name):
634 for context in _contexts:
635 host_name, ctx_name = context.split_host_name(name)
636 if context.assigned_name == ctx_name:
637 return '{}{}{}'.format(host_name,
638 context.host_name_separator,
641 raise y_exc.ScenarioConfigContextNameNotFound(host_name=name)
643 if 'host' in scenario:
644 scenario['host'] = qualified_name(scenario['host'])
645 if 'target' in scenario:
646 scenario['target'] = qualified_name(scenario['target'])
647 options = scenario.get('options') or {}
648 server_name = options.get('server_name') or {}
649 if 'host' in server_name:
650 server_name['host'] = qualified_name(server_name['host'])
651 if 'target' in server_name:
652 server_name['target'] = qualified_name(server_name['target'])
653 if 'targets' in scenario:
654 for idx, target in enumerate(scenario['targets']):
655 scenario['targets'][idx] = qualified_name(target)
656 if 'nodes' in scenario:
657 for scenario_node, target in scenario['nodes'].items():
658 scenario['nodes'][scenario_node] = qualified_name(target)
660 def _check_schema(self, cfg_schema, schema_type):
661 """Check if config file is using the correct schema type"""
663 if cfg_schema != "yardstick:" + schema_type + ":0.1":
664 sys.exit("error: file %s has unknown schema %s" % (self.path,
667 def _check_precondition(self, cfg):
668 """Check if the environment meet the precondition"""
670 if "precondition" in cfg:
671 precondition = cfg["precondition"]
672 installer_type = precondition.get("installer_type", None)
673 deploy_scenarios = precondition.get("deploy_scenarios", None)
674 tc_fit_pods = precondition.get("pod_name", None)
675 installer_type_env = os.environ.get('INSTALL_TYPE', None)
676 deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
677 pod_name_env = os.environ.get('NODE_NAME', None)
679 LOG.info("installer_type: %s, installer_type_env: %s",
680 installer_type, installer_type_env)
681 LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
682 deploy_scenarios, deploy_scenario_env)
683 LOG.info("tc_fit_pods: %s, pod_name_env: %s",
684 tc_fit_pods, pod_name_env)
685 if installer_type and installer_type_env:
686 if installer_type_env not in installer_type:
688 if deploy_scenarios and deploy_scenario_env:
689 deploy_scenarios_list = deploy_scenarios.split(',')
690 for deploy_scenario in deploy_scenarios_list:
691 if deploy_scenario_env.startswith(deploy_scenario):
694 if tc_fit_pods and pod_name_env:
695 if pod_name_env not in tc_fit_pods:
700 def is_ip_addr(addr):
701 """check if string addr is an IP address"""
703 addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
704 except AttributeError:
708 ipaddress.ip_address(addr.encode('utf-8'))
715 def _is_background_scenario(scenario):
716 if "run_in_background" in scenario:
717 return scenario["run_in_background"]
722 def parse_nodes_with_context(scenario_cfg):
723 """parse the 'nodes' fields in scenario """
724 # ensure consistency in node instantiation order
725 return OrderedDict((nodename, base_context.Context.get_server(
726 scenario_cfg["nodes"][nodename]))
727 for nodename in sorted(scenario_cfg["nodes"]))
730 def get_networks_from_nodes(nodes):
731 """parse the 'nodes' fields in scenario """
733 for node in nodes.values():
736 interfaces = node.get('interfaces', {})
737 for interface in interfaces.values():
738 # vld_id is network_name
739 network_name = interface.get('network_name')
742 network = base_context.Context.get_network(network_name)
744 networks[network['name']] = network
748 def runner_join(runner, background_runners, outputs, result):
749 """join (wait for) a runner, exit process at runner failure
750 :param background_runners:
751 :type background_runners:
757 while runner.poll() is None:
758 outputs.update(runner.get_output())
759 result.extend(runner.get_result())
760 # drain all the background runner queues
761 for background in background_runners:
762 outputs.update(background.get_output())
763 result.extend(background.get_result())
764 status = runner.join(outputs, result)
765 base_runner.Runner.release(runner)
769 def print_invalid_header(source_name, args):
770 print("Invalid %(source)s passed:\n\n %(args)s\n"
771 % {"source": source_name, "args": args})
774 def parse_task_args(src_name, args):
775 if isinstance(args, collections.Mapping):
779 kw = args and yaml_load(args)
780 kw = {} if kw is None else kw
781 except yaml.parser.ParserError as e:
782 print_invalid_header(src_name, args)
783 print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
784 % {"source": src_name, "err": e})
787 if not isinstance(kw, dict):
788 print_invalid_header(src_name, args)
789 print("%(src)s had to be dict, actually %(src_type)s\n"
790 % {"src": src_name, "src_type": type(kw)})