1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 from collections import OrderedDict
23 from six.moves import filter
24 from jinja2 import Environment
26 from yardstick.benchmark import contexts
27 from yardstick.benchmark.contexts import base as base_context
28 from yardstick.benchmark.runners import base as base_runner
29 from yardstick.common.constants import CONF_FILE
30 from yardstick.common.yaml_loader import yaml_load
31 from yardstick.dispatcher.base import Base as DispatcherBase
32 from yardstick.common import constants
33 from yardstick.common import exceptions as y_exc
34 from yardstick.common import task_template
35 from yardstick.common import utils
36 from yardstick.common.html_template import report_template
38 output_file_default = "/tmp/yardstick.out"
39 test_cases_dir_default = "tests/opnfv/test_cases/"
40 LOG = logging.getLogger(__name__)
43 class Task(object): # pragma: no cover
46 Set of commands to manage benchmark tasks.
53 def _set_dispatchers(self, output_config):
54 dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
56 out_types = [s.strip() for s in dispatchers.split(',')]
57 output_config['DEFAULT']['dispatcher'] = out_types
59 def start(self, args, **kwargs): # pylint: disable=unused-argument
60 """Start a benchmark scenario."""
62 atexit.register(self.atexit_handler)
64 task_id = getattr(args, 'task_id')
65 self.task_id = task_id if task_id else str(uuid.uuid4())
70 output_config = utils.parse_ini_file(CONF_FILE)
71 except Exception: # pylint: disable=broad-except
72 # all error will be ignore, the default value is {}
75 self._init_output_config(output_config)
76 self._set_output_config(output_config, args.output_file)
77 LOG.debug('Output configuration is: %s', output_config)
79 self._set_dispatchers(output_config)
81 # update dispatcher list
82 if 'file' in output_config['DEFAULT']['dispatcher']:
83 result = {'status': 0, 'result': {}}
84 utils.write_json_to_file(args.output_file, result)
86 total_start_time = time.time()
87 parser = TaskParser(args.inputfile[0])
90 # 1.parse suite, return suite_params info
91 task_files, task_args, task_args_fnames = parser.parse_suite()
93 task_files = [parser.path]
94 task_args = [args.task_args]
95 task_args_fnames = [args.task_args_file]
97 LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
98 task_files, task_args, task_args_fnames)
104 tasks = self._parse_tasks(parser, task_files, args, task_args,
107 # Execute task files.
108 for i, _ in enumerate(task_files):
109 one_task_start_time = time.time()
110 self.contexts.extend(tasks[i]['contexts'])
111 if not tasks[i]['meet_precondition']:
112 LOG.info('"meet_precondition" is %s, please check environment',
113 tasks[i]['meet_precondition'])
117 success, data = self._run(tasks[i]['scenarios'],
118 tasks[i]['run_in_parallel'],
120 except KeyboardInterrupt:
122 except Exception: # pylint: disable=broad-except
123 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
125 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
129 LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name'])
130 testcases[tasks[i]['case_name']] = {'criteria': 'PASS',
133 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
135 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
139 # keep deployment, forget about stack
140 # (hide it for exit handler)
143 for context in self.contexts[::-1]:
146 one_task_end_time = time.time()
147 LOG.info("Task %s finished in %d secs", task_files[i],
148 one_task_end_time - one_task_start_time)
150 result = self._get_format_result(testcases)
152 self._do_output(output_config, result)
153 self._generate_reporting(result)
155 total_end_time = time.time()
156 LOG.info("Total finished in %d secs",
157 total_end_time - total_start_time)
159 LOG.info('To generate report, execute command "yardstick report '
160 'generate %s <YAML_NAME>"', self.task_id)
161 LOG.info("Task ALL DONE, exiting")
164 def _generate_reporting(self, result):
166 with open(constants.REPORTING_FILE, 'w') as f:
167 f.write(env.from_string(report_template).render(result))
169 LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
172 log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
173 log_formatter = logging.Formatter(log_format)
175 utils.makedirs(constants.TASK_LOG_DIR)
176 log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
177 log_handler = logging.FileHandler(log_path)
178 log_handler.setFormatter(log_formatter)
179 log_handler.setLevel(logging.DEBUG)
181 logging.root.addHandler(log_handler)
183 def _init_output_config(self, output_config):
184 output_config.setdefault('DEFAULT', {})
185 output_config.setdefault('dispatcher_http', {})
186 output_config.setdefault('dispatcher_file', {})
187 output_config.setdefault('dispatcher_influxdb', {})
188 output_config.setdefault('nsb', {})
190 def _set_output_config(self, output_config, file_path):
192 out_type = os.environ['DISPATCHER']
194 output_config['DEFAULT'].setdefault('dispatcher', 'file')
196 output_config['DEFAULT']['dispatcher'] = out_type
198 output_config['dispatcher_file']['file_path'] = file_path
201 target = os.environ['TARGET']
205 k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
206 output_config[k]['target'] = target
208 def _get_format_result(self, testcases):
209 criteria = self._get_task_criteria(testcases)
212 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
213 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
214 'pod_name': os.environ.get('NODE_NAME', 'unknown'),
215 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
221 'criteria': criteria,
222 'task_id': self.task_id,
224 'testcases': testcases
230 def _get_task_criteria(self, testcases):
231 criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
237 def _do_output(self, output_config, result):
238 dispatchers = DispatcherBase.get(output_config)
239 dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
241 for dispatcher in dispatchers:
242 dispatcher.flush_result_data(result)
244 def _run(self, scenarios, run_in_parallel, output_config):
245 """Deploys context and calls runners"""
246 for context in self.contexts:
249 background_runners = []
253 # Start all background scenarios
254 for scenario in filter(_is_background_scenario, scenarios):
255 scenario["runner"] = dict(type="Duration", duration=1000000000)
256 runner = self.run_one_scenario(scenario, output_config)
257 background_runners.append(runner)
261 for scenario in scenarios:
262 if not _is_background_scenario(scenario):
263 runner = self.run_one_scenario(scenario, output_config)
264 runners.append(runner)
266 # Wait for runners to finish
267 for runner in runners:
268 status = runner_join(runner, background_runners, self.outputs, result)
270 LOG.error("%s runner status %s", runner.__execution_type__, status)
272 LOG.info("Runner ended")
275 for scenario in scenarios:
276 if not _is_background_scenario(scenario):
277 runner = self.run_one_scenario(scenario, output_config)
278 status = runner_join(runner, background_runners, self.outputs, result)
280 LOG.error('Scenario NO.%s: "%s" ERROR!',
281 scenarios.index(scenario) + 1,
282 scenario.get('type'))
283 LOG.error("%s runner status %s", runner.__execution_type__, status)
285 LOG.info("Runner ended")
287 # Abort background runners
288 for runner in background_runners:
291 # Wait for background runners to finish
292 for runner in background_runners:
293 status = runner.join(self.outputs, result)
295 # Nuke if it did not stop nicely
296 base_runner.Runner.terminate(runner)
297 runner.join(self.outputs, result)
298 base_runner.Runner.release(runner)
300 print("Background task ended")
301 return task_success, result
303 def atexit_handler(self):
304 """handler for process termination"""
305 base_runner.Runner.terminate_all()
308 LOG.info("Undeploying all contexts")
309 for context in self.contexts[::-1]:
312 def _parse_options(self, op):
313 if isinstance(op, dict):
314 return {k: self._parse_options(v) for k, v in op.items()}
315 elif isinstance(op, list):
316 return [self._parse_options(v) for v in op]
317 elif isinstance(op, six.string_types):
318 return self.outputs.get(op[1:]) if op.startswith('$') else op
322 def _parse_tasks(self, parser, task_files, args, task_args,
327 for i, _ in enumerate(task_files):
328 parser.path = task_files[i]
329 tasks.append(parser.parse_task(self.task_id, task_args[i],
330 task_args_fnames[i]))
331 tasks[i]['case_name'] = os.path.splitext(
332 os.path.basename(task_files[i]))[0]
335 utils.makedirs(args.render_only)
336 for idx, task in enumerate(tasks):
337 output_file_name = os.path.abspath(os.path.join(
339 '{0:03d}-{1}.yml'.format(idx, task['case_name'])))
340 utils.write_file(output_file_name, task['rendered'])
346 def run_one_scenario(self, scenario_cfg, output_config):
347 """run one scenario using context"""
348 runner_cfg = scenario_cfg["runner"]
349 runner_cfg['output_config'] = output_config
351 options = scenario_cfg.get('options', {})
352 scenario_cfg['options'] = self._parse_options(options)
354 # TODO support get multi hosts/vms info
356 options = scenario_cfg.get('options') or {}
357 server_name = options.get('server_name') or {}
359 def config_context_target(cfg):
360 target = cfg['target']
361 if is_ip_addr(target):
362 context_cfg['target'] = {"ipaddr": target}
364 context_cfg['target'] = base_context.Context.get_server(target)
365 if self._is_same_context(cfg["host"], target):
366 context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
368 context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
370 host_name = server_name.get('host', scenario_cfg.get('host'))
372 context_cfg['host'] = base_context.Context.get_server(host_name)
374 for item in [server_name, scenario_cfg]:
376 config_context_target(item)
378 LOG.debug("Got a KeyError in config_context_target(%s)", item)
382 if "targets" in scenario_cfg:
384 for target in scenario_cfg["targets"]:
385 if is_ip_addr(target):
386 ip_list.append(target)
387 context_cfg['target'] = {}
389 context_cfg['target'] = (
390 base_context.Context.get_server(target))
391 if self._is_same_context(scenario_cfg["host"],
393 ip_list.append(context_cfg["target"]["private_ip"])
395 ip_list.append(context_cfg["target"]["ip"])
396 context_cfg['target']['ipaddr'] = ','.join(ip_list)
398 if "nodes" in scenario_cfg:
399 context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
400 context_cfg["networks"] = get_networks_from_nodes(
401 context_cfg["nodes"])
403 runner = base_runner.Runner.get(runner_cfg)
405 LOG.info("Starting runner of type '%s'", runner_cfg["type"])
406 runner.run(scenario_cfg, context_cfg)
410 def _is_same_context(self, host_attr, target_attr):
411 """check if two servers are in the same heat context
412 host_attr: either a name for a server created by yardstick or a dict
413 with attribute name mapping when using external heat templates
414 target_attr: either a name for a server created by yardstick or a dict
415 with attribute name mapping when using external heat templates
417 for context in self.contexts:
418 if context.__context_type__ not in {contexts.CONTEXT_HEAT,
419 contexts.CONTEXT_KUBERNETES}:
422 host = context._get_server(host_attr)
426 target = context._get_server(target_attr)
430 # Both host and target is not None, then they are in the
437 class TaskParser(object): # pragma: no cover
438 """Parser for task config files in yaml format"""
440 def __init__(self, path):
443 def _meet_constraint(self, task, cur_pod, cur_installer):
444 if "constraint" in task:
445 constraint = task.get('constraint', None)
446 if constraint is not None:
447 tc_fit_pod = constraint.get('pod', None)
448 tc_fit_installer = constraint.get('installer', None)
449 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
450 cur_pod, cur_installer, constraint)
451 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
453 if (cur_installer is None) or (tc_fit_installer and cur_installer
454 not in tc_fit_installer):
458 def _get_task_para(self, task, cur_pod):
459 task_args = task.get('task_args', None)
460 if task_args is not None:
461 task_args = task_args.get(cur_pod, task_args.get('default'))
462 task_args_fnames = task.get('task_args_fnames', None)
463 if task_args_fnames is not None:
464 task_args_fnames = task_args_fnames.get(cur_pod, None)
465 return task_args, task_args_fnames
467 def parse_suite(self):
468 """parse the suite file and return a list of task config file paths
469 and lists of optional parameters if present"""
470 LOG.info("\nParsing suite file:%s", self.path)
473 with open(self.path) as stream:
474 cfg = yaml_load(stream)
475 except IOError as ioerror:
478 self._check_schema(cfg["schema"], "suite")
479 LOG.info("\nStarting scenario:%s", cfg["name"])
481 test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
482 test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
484 if test_cases_dir[-1] != os.sep:
485 test_cases_dir += os.sep
487 cur_pod = os.environ.get('NODE_NAME', None)
488 cur_installer = os.environ.get('INSTALLER_TYPE', None)
490 valid_task_files = []
492 valid_task_args_fnames = []
494 for task in cfg["test_cases"]:
496 if "file_name" in task:
497 task_fname = task.get('file_name', None)
498 if task_fname is None:
503 if self._meet_constraint(task, cur_pod, cur_installer):
504 valid_task_files.append(test_cases_dir + task_fname)
507 # 3.fetch task parameters
508 task_args, task_args_fnames = self._get_task_para(task, cur_pod)
509 valid_task_args.append(task_args)
510 valid_task_args_fnames.append(task_args_fnames)
512 return valid_task_files, valid_task_args, valid_task_args_fnames
514 def _render_task(self, task_args, task_args_file):
515 """Render the input task with the given arguments
517 :param task_args: (dict) arguments to render the task
518 :param task_args_file: (str) file containing the arguments to render
520 :return: (str) task file rendered
525 with open(task_args_file) as f:
526 kw.update(parse_task_args('task_args_file', f.read()))
527 kw.update(parse_task_args('task_args', task_args))
529 raise y_exc.TaskRenderArgumentError()
533 with open(self.path) as f:
534 input_task = f.read()
535 rendered_task = task_template.TaskTemplate.render(input_task, **kw)
536 LOG.debug('Input task is:\n%s', rendered_task)
537 parsed_task = yaml_load(rendered_task)
538 except (IOError, OSError):
539 raise y_exc.TaskReadError(task_file=self.path)
541 raise y_exc.TaskRenderError(input_task=input_task)
543 return parsed_task, rendered_task
545 def parse_task(self, task_id, task_args=None, task_args_file=None):
546 """parses the task file and return an context and scenario instances"""
547 LOG.info("Parsing task config: %s", self.path)
549 cfg, rendered = self._render_task(task_args, task_args_file)
550 self._check_schema(cfg["schema"], "task")
551 meet_precondition = self._check_precondition(cfg)
553 # TODO: support one or many contexts? Many would simpler and precise
554 # TODO: support hybrid context type
556 context_cfgs = [cfg["context"]]
557 elif "contexts" in cfg:
558 context_cfgs = cfg["contexts"]
560 context_cfgs = [{"type": contexts.CONTEXT_DUMMY}]
563 for cfg_attrs in context_cfgs:
565 cfg_attrs['task_id'] = task_id
566 # default to Heat context because we are testing OpenStack
567 context_type = cfg_attrs.get("type", contexts.CONTEXT_HEAT)
568 context = base_context.Context.get(context_type)
569 context.init(cfg_attrs)
570 # Update the name in case the context has used the name_suffix
571 cfg_attrs['name'] = context.name
572 _contexts.append(context)
574 run_in_parallel = cfg.get("run_in_parallel", False)
576 # add tc and task id for influxdb extended tags
577 for scenario in cfg["scenarios"]:
578 task_name = os.path.splitext(os.path.basename(self.path))[0]
579 scenario["tc"] = task_name
580 scenario["task_id"] = task_id
581 # embed task path into scenario so we can load other files
582 # relative to task path
583 scenario["task_path"] = os.path.dirname(self.path)
585 self._change_node_names(scenario, _contexts)
587 # TODO we need something better here, a class that represent the file
588 return {'scenarios': cfg['scenarios'],
589 'run_in_parallel': run_in_parallel,
590 'meet_precondition': meet_precondition,
591 'contexts': _contexts,
592 'rendered': rendered}
595 def _change_node_names(scenario, _contexts):
596 """Change the node names in a scenario, depending on the context config
598 The nodes (VMs or physical servers) are referred in the context section
599 with the name of the server and the name of the context:
600 <server name>.<context name>
602 If the context is going to be undeployed at the end of the test, the
603 task ID is suffixed to the name to avoid interferences with previous
604 deployments. If the context needs to be deployed at the end of the
605 test, the name assigned is kept.
607 There are several places where a node name could appear in the scenario
618 server_name: # JIRA: YARDSTICK-810
624 tg__0: trafficgen_0.yardstick
625 vnf__0: vnf_0.yardstick
630 name: trafficgen_0.yardstick
631 public_ip_attr: "server1_public_ip"
632 private_ip_attr: "server1_private_ip"
634 name: vnf_0.yardstick
635 public_ip_attr: "server2_public_ip"
636 private_ip_attr: "server2_private_ip"
637 NOTE: in Kubernetes context, the separator character between the server
638 name and the context name is "-":
643 def qualified_name(name):
644 for context in _contexts:
645 host_name, ctx_name = context.split_host_name(name)
646 if context.assigned_name == ctx_name:
647 return '{}{}{}'.format(host_name,
648 context.host_name_separator,
651 raise y_exc.ScenarioConfigContextNameNotFound(host_name=name)
653 if 'host' in scenario:
654 scenario['host'] = qualified_name(scenario['host'])
655 if 'target' in scenario:
656 scenario['target'] = qualified_name(scenario['target'])
657 options = scenario.get('options') or {}
658 server_name = options.get('server_name') or {}
659 if 'host' in server_name:
660 server_name['host'] = qualified_name(server_name['host'])
661 if 'target' in server_name:
662 server_name['target'] = qualified_name(server_name['target'])
663 if 'targets' in scenario:
664 for idx, target in enumerate(scenario['targets']):
665 scenario['targets'][idx] = qualified_name(target)
666 if 'nodes' in scenario:
667 for scenario_node, target in scenario['nodes'].items():
668 if isinstance(target, collections.Mapping):
669 # Update node info on scenario with context info
670 # Just update the node name with context
671 # Append context information
672 target['name'] = qualified_name(target['name'])
674 scenario['nodes'][scenario_node] = target
676 scenario['nodes'][scenario_node] = qualified_name(target)
678 def _check_schema(self, cfg_schema, schema_type):
679 """Check if config file is using the correct schema type"""
681 if cfg_schema != "yardstick:" + schema_type + ":0.1":
682 sys.exit("error: file %s has unknown schema %s" % (self.path,
685 def _check_precondition(self, cfg):
686 """Check if the environment meet the precondition"""
688 if "precondition" in cfg:
689 precondition = cfg["precondition"]
690 installer_type = precondition.get("installer_type", None)
691 deploy_scenarios = precondition.get("deploy_scenarios", None)
692 tc_fit_pods = precondition.get("pod_name", None)
693 installer_type_env = os.environ.get('INSTALL_TYPE', None)
694 deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
695 pod_name_env = os.environ.get('NODE_NAME', None)
697 LOG.info("installer_type: %s, installer_type_env: %s",
698 installer_type, installer_type_env)
699 LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
700 deploy_scenarios, deploy_scenario_env)
701 LOG.info("tc_fit_pods: %s, pod_name_env: %s",
702 tc_fit_pods, pod_name_env)
703 if installer_type and installer_type_env:
704 if installer_type_env not in installer_type:
706 if deploy_scenarios and deploy_scenario_env:
707 deploy_scenarios_list = deploy_scenarios.split(',')
708 for deploy_scenario in deploy_scenarios_list:
709 if deploy_scenario_env.startswith(deploy_scenario):
712 if tc_fit_pods and pod_name_env:
713 if pod_name_env not in tc_fit_pods:
718 def is_ip_addr(addr):
719 """check if string addr is an IP address"""
721 addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
722 except AttributeError:
726 ipaddress.ip_address(addr.encode('utf-8'))
733 def _is_background_scenario(scenario):
734 if "run_in_background" in scenario:
735 return scenario["run_in_background"]
740 def parse_nodes_with_context(scenario_cfg):
741 """parse the 'nodes' fields in scenario """
742 # ensure consistency in node instantiation order
743 return OrderedDict((nodename, base_context.Context.get_server(
744 scenario_cfg["nodes"][nodename]))
745 for nodename in sorted(scenario_cfg["nodes"]))
748 def get_networks_from_nodes(nodes):
749 """parse the 'nodes' fields in scenario """
751 for node in nodes.values():
754 interfaces = node.get('interfaces', {})
755 for interface in interfaces.values():
756 # vld_id is network_name
757 network_name = interface.get('network_name')
760 network = base_context.Context.get_network(network_name)
762 networks[network['name']] = network
766 def runner_join(runner, background_runners, outputs, result):
767 """join (wait for) a runner, exit process at runner failure
768 :param background_runners:
769 :type background_runners:
775 while runner.poll() is None:
776 outputs.update(runner.get_output())
777 result.extend(runner.get_result())
778 # drain all the background runner queues
779 for background in background_runners:
780 outputs.update(background.get_output())
781 result.extend(background.get_result())
782 status = runner.join(outputs, result)
783 base_runner.Runner.release(runner)
787 def print_invalid_header(source_name, args):
788 print("Invalid %(source)s passed:\n\n %(args)s\n"
789 % {"source": source_name, "args": args})
792 def parse_task_args(src_name, args):
793 if isinstance(args, collections.Mapping):
797 kw = args and yaml_load(args)
798 kw = {} if kw is None else kw
799 except yaml.parser.ParserError as e:
800 print_invalid_header(src_name, args)
801 print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
802 % {"source": src_name, "err": e})
805 if not isinstance(kw, dict):
806 print_invalid_header(src_name, args)
807 print("%(src)s had to be dict, actually %(src_type)s\n"
808 % {"src": src_name, "src_type": type(kw)})