Merge "Install dependencies: bare-metal, standalone"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import sys
11 import os
12 from collections import OrderedDict
13
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 import uuid
20 import collections
21
22 from six.moves import filter
23 from jinja2 import Environment
24
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.constants import CONF_FILE
28 from yardstick.common.yaml_loader import yaml_load
29 from yardstick.dispatcher.base import Base as DispatcherBase
30 from yardstick.common import constants
31 from yardstick.common import exceptions as y_exc
32 from yardstick.common import task_template
33 from yardstick.common import utils
34 from yardstick.common.html_template import report_template
35
36 output_file_default = "/tmp/yardstick.out"
37 test_cases_dir_default = "tests/opnfv/test_cases/"
38 LOG = logging.getLogger(__name__)
39
40
41 class Task(object):     # pragma: no cover
42     """Task commands.
43
44        Set of commands to manage benchmark tasks.
45     """
46
47     def __init__(self):
48         self.contexts = []
49         self.outputs = {}
50
51     def _set_dispatchers(self, output_config):
52         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
53                                                            'file')
54         out_types = [s.strip() for s in dispatchers.split(',')]
55         output_config['DEFAULT']['dispatcher'] = out_types
56
57     def start(self, args, **kwargs):  # pylint: disable=unused-argument
58         """Start a benchmark scenario."""
59
60         atexit.register(self.atexit_handler)
61
62         task_id = getattr(args, 'task_id')
63         self.task_id = task_id if task_id else str(uuid.uuid4())
64
65         self._set_log()
66
67         try:
68             output_config = utils.parse_ini_file(CONF_FILE)
69         except Exception:  # pylint: disable=broad-except
70             # all error will be ignore, the default value is {}
71             output_config = {}
72
73         self._init_output_config(output_config)
74         self._set_output_config(output_config, args.output_file)
75         LOG.debug('Output configuration is: %s', output_config)
76
77         self._set_dispatchers(output_config)
78
79         # update dispatcher list
80         if 'file' in output_config['DEFAULT']['dispatcher']:
81             result = {'status': 0, 'result': {}}
82             utils.write_json_to_file(args.output_file, result)
83
84         total_start_time = time.time()
85         parser = TaskParser(args.inputfile[0])
86
87         if args.suite:
88             # 1.parse suite, return suite_params info
89             task_files, task_args, task_args_fnames = parser.parse_suite()
90         else:
91             task_files = [parser.path]
92             task_args = [args.task_args]
93             task_args_fnames = [args.task_args_file]
94
95         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
96                   task_files, task_args, task_args_fnames)
97
98         if args.parse_only:
99             sys.exit(0)
100
101         testcases = {}
102         tasks = self._parse_tasks(parser, task_files, args, task_args,
103                                   task_args_fnames)
104
105         # Execute task files.
106         for i, _ in enumerate(task_files):
107             one_task_start_time = time.time()
108             self.contexts.extend(tasks[i]['contexts'])
109             if not tasks[i]['meet_precondition']:
110                 LOG.info('"meet_precondition" is %s, please check environment',
111                          tasks[i]['meet_precondition'])
112                 continue
113
114             try:
115                 success, data = self._run(tasks[i]['scenarios'],
116                                           tasks[i]['run_in_parallel'],
117                                           output_config)
118             except KeyboardInterrupt:
119                 raise
120             except Exception:  # pylint: disable=broad-except
121                 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
122                           exc_info=True)
123                 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
124                                                     'tc_data': []}
125             else:
126                 if success:
127                     LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name'])
128                     testcases[tasks[i]['case_name']] = {'criteria': 'PASS',
129                                                         'tc_data': data}
130                 else:
131                     LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
132                               exc_info=True)
133                     testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
134                                                         'tc_data': data}
135
136             if args.keep_deploy:
137                 # keep deployment, forget about stack
138                 # (hide it for exit handler)
139                 self.contexts = []
140             else:
141                 for context in self.contexts[::-1]:
142                     context.undeploy()
143                 self.contexts = []
144             one_task_end_time = time.time()
145             LOG.info("Task %s finished in %d secs", task_files[i],
146                      one_task_end_time - one_task_start_time)
147
148         result = self._get_format_result(testcases)
149
150         self._do_output(output_config, result)
151         self._generate_reporting(result)
152
153         total_end_time = time.time()
154         LOG.info("Total finished in %d secs",
155                  total_end_time - total_start_time)
156
157         LOG.info('To generate report, execute command "yardstick report '
158                  'generate %s <YAML_NAME>"', self.task_id)
159         LOG.info("Task ALL DONE, exiting")
160         return result
161
162     def _generate_reporting(self, result):
163         env = Environment()
164         with open(constants.REPORTING_FILE, 'w') as f:
165             f.write(env.from_string(report_template).render(result))
166
167         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
168
169     def _set_log(self):
170         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
171         log_formatter = logging.Formatter(log_format)
172
173         utils.makedirs(constants.TASK_LOG_DIR)
174         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
175         log_handler = logging.FileHandler(log_path)
176         log_handler.setFormatter(log_formatter)
177         log_handler.setLevel(logging.DEBUG)
178
179         logging.root.addHandler(log_handler)
180
181     def _init_output_config(self, output_config):
182         output_config.setdefault('DEFAULT', {})
183         output_config.setdefault('dispatcher_http', {})
184         output_config.setdefault('dispatcher_file', {})
185         output_config.setdefault('dispatcher_influxdb', {})
186         output_config.setdefault('nsb', {})
187
188     def _set_output_config(self, output_config, file_path):
189         try:
190             out_type = os.environ['DISPATCHER']
191         except KeyError:
192             output_config['DEFAULT'].setdefault('dispatcher', 'file')
193         else:
194             output_config['DEFAULT']['dispatcher'] = out_type
195
196         output_config['dispatcher_file']['file_path'] = file_path
197
198         try:
199             target = os.environ['TARGET']
200         except KeyError:
201             pass
202         else:
203             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
204             output_config[k]['target'] = target
205
206     def _get_format_result(self, testcases):
207         criteria = self._get_task_criteria(testcases)
208
209         info = {
210             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
211             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
212             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
213             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
214         }
215
216         result = {
217             'status': 1,
218             'result': {
219                 'criteria': criteria,
220                 'task_id': self.task_id,
221                 'info': info,
222                 'testcases': testcases
223             }
224         }
225
226         return result
227
228     def _get_task_criteria(self, testcases):
229         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
230         if criteria:
231             return 'FAIL'
232         else:
233             return 'PASS'
234
235     def _do_output(self, output_config, result):
236         dispatchers = DispatcherBase.get(output_config)
237         dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
238
239         for dispatcher in dispatchers:
240             dispatcher.flush_result_data(result)
241
242     def _run(self, scenarios, run_in_parallel, output_config):
243         """Deploys context and calls runners"""
244         for context in self.contexts:
245             context.deploy()
246
247         background_runners = []
248
249         task_success = True
250         result = []
251         # Start all background scenarios
252         for scenario in filter(_is_background_scenario, scenarios):
253             scenario["runner"] = dict(type="Duration", duration=1000000000)
254             runner = self.run_one_scenario(scenario, output_config)
255             background_runners.append(runner)
256
257         runners = []
258         if run_in_parallel:
259             for scenario in scenarios:
260                 if not _is_background_scenario(scenario):
261                     runner = self.run_one_scenario(scenario, output_config)
262                     runners.append(runner)
263
264             # Wait for runners to finish
265             for runner in runners:
266                 status = runner_join(runner, background_runners, self.outputs, result)
267                 if status != 0:
268                     LOG.error("%s runner status %s", runner.__execution_type__, status)
269                     task_success = False
270                 LOG.info("Runner ended")
271         else:
272             # run serially
273             for scenario in scenarios:
274                 if not _is_background_scenario(scenario):
275                     runner = self.run_one_scenario(scenario, output_config)
276                     status = runner_join(runner, background_runners, self.outputs, result)
277                     if status != 0:
278                         LOG.error('Scenario NO.%s: "%s" ERROR!',
279                                   scenarios.index(scenario) + 1,
280                                   scenario.get('type'))
281                         LOG.error("%s runner status %s", runner.__execution_type__, status)
282                         task_success = False
283                     LOG.info("Runner ended")
284
285         # Abort background runners
286         for runner in background_runners:
287             runner.abort()
288
289         # Wait for background runners to finish
290         for runner in background_runners:
291             status = runner.join(self.outputs, result)
292             if status is None:
293                 # Nuke if it did not stop nicely
294                 base_runner.Runner.terminate(runner)
295                 runner.join(self.outputs, result)
296             base_runner.Runner.release(runner)
297
298             print("Background task ended")
299         return task_success, result
300
301     def atexit_handler(self):
302         """handler for process termination"""
303         base_runner.Runner.terminate_all()
304
305         if self.contexts:
306             LOG.info("Undeploying all contexts")
307             for context in self.contexts[::-1]:
308                 context.undeploy()
309
310     def _parse_options(self, op):
311         if isinstance(op, dict):
312             return {k: self._parse_options(v) for k, v in op.items()}
313         elif isinstance(op, list):
314             return [self._parse_options(v) for v in op]
315         elif isinstance(op, str):
316             return self.outputs.get(op[1:]) if op.startswith('$') else op
317         else:
318             return op
319
320     def _parse_tasks(self, parser, task_files, args, task_args,
321                      task_args_fnames):
322         tasks = []
323
324         # Parse task_files.
325         for i, _ in enumerate(task_files):
326             parser.path = task_files[i]
327             tasks.append(parser.parse_task(self.task_id, task_args[i],
328                                            task_args_fnames[i]))
329             tasks[i]['case_name'] = os.path.splitext(
330                 os.path.basename(task_files[i]))[0]
331
332         if args.render_only:
333             utils.makedirs(args.render_only)
334             for idx, task in enumerate(tasks):
335                 output_file_name = os.path.abspath(os.path.join(
336                     args.render_only,
337                     '{0:03d}-{1}.yml'.format(idx, task['case_name'])))
338                 utils.write_file(output_file_name, task['rendered'])
339
340             sys.exit(0)
341
342         return tasks
343
344     def run_one_scenario(self, scenario_cfg, output_config):
345         """run one scenario using context"""
346         runner_cfg = scenario_cfg["runner"]
347         runner_cfg['output_config'] = output_config
348
349         options = scenario_cfg.get('options', {})
350         scenario_cfg['options'] = self._parse_options(options)
351
352         # TODO support get multi hosts/vms info
353         context_cfg = {}
354         options = scenario_cfg.get('options') or {}
355         server_name = options.get('server_name') or {}
356
357         def config_context_target(cfg):
358             target = cfg['target']
359             if is_ip_addr(target):
360                 context_cfg['target'] = {"ipaddr": target}
361             else:
362                 context_cfg['target'] = Context.get_server(target)
363                 if self._is_same_context(cfg["host"], target):
364                     context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
365                 else:
366                     context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
367
368         host_name = server_name.get('host', scenario_cfg.get('host'))
369         if host_name:
370             context_cfg['host'] = Context.get_server(host_name)
371
372         for item in [server_name, scenario_cfg]:
373             try:
374                 config_context_target(item)
375             except KeyError:
376                 LOG.debug("Got a KeyError in config_context_target(%s)", item)
377             else:
378                 break
379
380         if "targets" in scenario_cfg:
381             ip_list = []
382             for target in scenario_cfg["targets"]:
383                 if is_ip_addr(target):
384                     ip_list.append(target)
385                     context_cfg['target'] = {}
386                 else:
387                     context_cfg['target'] = Context.get_server(target)
388                     if self._is_same_context(scenario_cfg["host"],
389                                              target):
390                         ip_list.append(context_cfg["target"]["private_ip"])
391                     else:
392                         ip_list.append(context_cfg["target"]["ip"])
393             context_cfg['target']['ipaddr'] = ','.join(ip_list)
394
395         if "nodes" in scenario_cfg:
396             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
397             context_cfg["networks"] = get_networks_from_nodes(
398                 context_cfg["nodes"])
399
400         runner = base_runner.Runner.get(runner_cfg)
401
402         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
403         runner.run(scenario_cfg, context_cfg)
404
405         return runner
406
407     def _is_same_context(self, host_attr, target_attr):
408         """check if two servers are in the same heat context
409         host_attr: either a name for a server created by yardstick or a dict
410         with attribute name mapping when using external heat templates
411         target_attr: either a name for a server created by yardstick or a dict
412         with attribute name mapping when using external heat templates
413         """
414         for context in self.contexts:
415             if context.__context_type__ not in {"Heat", "Kubernetes"}:
416                 continue
417
418             host = context._get_server(host_attr)
419             if host is None:
420                 continue
421
422             target = context._get_server(target_attr)
423             if target is None:
424                 return False
425
426             # Both host and target is not None, then they are in the
427             # same heat context.
428             return True
429
430         return False
431
432
433 class TaskParser(object):       # pragma: no cover
434     """Parser for task config files in yaml format"""
435
436     def __init__(self, path):
437         self.path = path
438
439     def _meet_constraint(self, task, cur_pod, cur_installer):
440         if "constraint" in task:
441             constraint = task.get('constraint', None)
442             if constraint is not None:
443                 tc_fit_pod = constraint.get('pod', None)
444                 tc_fit_installer = constraint.get('installer', None)
445                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
446                          cur_pod, cur_installer, constraint)
447                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
448                     return False
449                 if (cur_installer is None) or (tc_fit_installer and cur_installer
450                                                not in tc_fit_installer):
451                     return False
452         return True
453
454     def _get_task_para(self, task, cur_pod):
455         task_args = task.get('task_args', None)
456         if task_args is not None:
457             task_args = task_args.get(cur_pod, task_args.get('default'))
458         task_args_fnames = task.get('task_args_fnames', None)
459         if task_args_fnames is not None:
460             task_args_fnames = task_args_fnames.get(cur_pod, None)
461         return task_args, task_args_fnames
462
463     def parse_suite(self):
464         """parse the suite file and return a list of task config file paths
465            and lists of optional parameters if present"""
466         LOG.info("\nParsing suite file:%s", self.path)
467
468         try:
469             with open(self.path) as stream:
470                 cfg = yaml_load(stream)
471         except IOError as ioerror:
472             sys.exit(ioerror)
473
474         self._check_schema(cfg["schema"], "suite")
475         LOG.info("\nStarting scenario:%s", cfg["name"])
476
477         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
478         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
479                                       test_cases_dir)
480         if test_cases_dir[-1] != os.sep:
481             test_cases_dir += os.sep
482
483         cur_pod = os.environ.get('NODE_NAME', None)
484         cur_installer = os.environ.get('INSTALLER_TYPE', None)
485
486         valid_task_files = []
487         valid_task_args = []
488         valid_task_args_fnames = []
489
490         for task in cfg["test_cases"]:
491             # 1.check file_name
492             if "file_name" in task:
493                 task_fname = task.get('file_name', None)
494                 if task_fname is None:
495                     continue
496             else:
497                 continue
498             # 2.check constraint
499             if self._meet_constraint(task, cur_pod, cur_installer):
500                 valid_task_files.append(test_cases_dir + task_fname)
501             else:
502                 continue
503             # 3.fetch task parameters
504             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
505             valid_task_args.append(task_args)
506             valid_task_args_fnames.append(task_args_fnames)
507
508         return valid_task_files, valid_task_args, valid_task_args_fnames
509
510     def _render_task(self, task_args, task_args_file):
511         """Render the input task with the given arguments
512
513         :param task_args: (dict) arguments to render the task
514         :param task_args_file: (str) file containing the arguments to render
515                                the task
516         :return: (str) task file rendered
517         """
518         try:
519             kw = {}
520             if task_args_file:
521                 with open(task_args_file) as f:
522                     kw.update(parse_task_args('task_args_file', f.read()))
523             kw.update(parse_task_args('task_args', task_args))
524         except TypeError:
525             raise y_exc.TaskRenderArgumentError()
526
527         input_task = None
528         try:
529             with open(self.path) as f:
530                 input_task = f.read()
531             rendered_task = task_template.TaskTemplate.render(input_task, **kw)
532             LOG.debug('Input task is:\n%s', rendered_task)
533             parsed_task = yaml_load(rendered_task)
534         except (IOError, OSError):
535             raise y_exc.TaskReadError(task_file=self.path)
536         except Exception:
537             raise y_exc.TaskRenderError(input_task=input_task)
538
539         return parsed_task, rendered_task
540
541     def parse_task(self, task_id, task_args=None, task_args_file=None):
542         """parses the task file and return an context and scenario instances"""
543         LOG.info("Parsing task config: %s", self.path)
544
545         cfg, rendered = self._render_task(task_args, task_args_file)
546         self._check_schema(cfg["schema"], "task")
547         meet_precondition = self._check_precondition(cfg)
548
549         # TODO: support one or many contexts? Many would simpler and precise
550         # TODO: support hybrid context type
551         if "context" in cfg:
552             context_cfgs = [cfg["context"]]
553         elif "contexts" in cfg:
554             context_cfgs = cfg["contexts"]
555         else:
556             context_cfgs = [{"type": "Dummy"}]
557
558         contexts = []
559         for cfg_attrs in context_cfgs:
560
561             cfg_attrs['task_id'] = task_id
562             # default to Heat context because we are testing OpenStack
563             context_type = cfg_attrs.get("type", "Heat")
564             context = Context.get(context_type)
565             context.init(cfg_attrs)
566             # Update the name in case the context has used the name_suffix
567             cfg_attrs['name'] = context.name
568             contexts.append(context)
569
570         run_in_parallel = cfg.get("run_in_parallel", False)
571
572         # add tc and task id for influxdb extended tags
573         for scenario in cfg["scenarios"]:
574             task_name = os.path.splitext(os.path.basename(self.path))[0]
575             scenario["tc"] = task_name
576             scenario["task_id"] = task_id
577             # embed task path into scenario so we can load other files
578             # relative to task path
579             scenario["task_path"] = os.path.dirname(self.path)
580
581             self._change_node_names(scenario, contexts)
582
583         # TODO we need something better here, a class that represent the file
584         return {'scenarios': cfg['scenarios'],
585                 'run_in_parallel': run_in_parallel,
586                 'meet_precondition': meet_precondition,
587                 'contexts': contexts,
588                 'rendered': rendered}
589
590     @staticmethod
591     def _change_node_names(scenario, contexts):
592         """Change the node names in a scenario, depending on the context config
593
594         The nodes (VMs or physical servers) are referred in the context section
595         with the name of the server and the name of the context:
596             <server name>.<context name>
597
598         If the context is going to be undeployed at the end of the test, the
599         task ID is suffixed to the name to avoid interferences with previous
600         deployments. If the context needs to be deployed at the end of the
601         test, the name assigned is kept.
602
603         There are several places where a node name could appear in the scenario
604         configuration:
605         scenario:
606           host: athena.demo
607           target: kratos.demo
608           targets:
609             - athena.demo
610             - kratos.demo
611
612         scenario:
613           options:
614             server_name:  # JIRA: YARDSTICK-810
615               host: athena.demo
616               target: kratos.demo
617
618         scenario:
619           nodes:
620             tg__0: tg_0.yardstick
621             vnf__0: vnf_0.yardstick
622         """
623         def qualified_name(name):
624             try:
625                 # for openstack
626                 node_name, context_name = name.split('.')
627                 sep = '.'
628             except ValueError:
629                 # for kubernetes, some kubernetes resources don't support
630                 # name format like 'xxx.xxx', so we use '-' instead
631                 # need unified later
632                 node_name, context_name = name.split('-')
633                 sep = '-'
634
635             try:
636                 ctx = next((context for context in contexts
637                             if context.assigned_name == context_name))
638             except StopIteration:
639                 raise y_exc.ScenarioConfigContextNameNotFound(
640                     context_name=context_name)
641
642             return '{}{}{}'.format(node_name, sep, ctx.name)
643
644         if 'host' in scenario:
645             scenario['host'] = qualified_name(scenario['host'])
646         if 'target' in scenario:
647             scenario['target'] = qualified_name(scenario['target'])
648         options = scenario.get('options') or {}
649         server_name = options.get('server_name') or {}
650         if 'host' in server_name:
651             server_name['host'] = qualified_name(server_name['host'])
652         if 'target' in server_name:
653             server_name['target'] = qualified_name(server_name['target'])
654         if 'targets' in scenario:
655             for idx, target in enumerate(scenario['targets']):
656                 scenario['targets'][idx] = qualified_name(target)
657         if 'nodes' in scenario:
658             for scenario_node, target in scenario['nodes'].items():
659                 scenario['nodes'][scenario_node] = qualified_name(target)
660
661     def _check_schema(self, cfg_schema, schema_type):
662         """Check if config file is using the correct schema type"""
663
664         if cfg_schema != "yardstick:" + schema_type + ":0.1":
665             sys.exit("error: file %s has unknown schema %s" % (self.path,
666                                                                cfg_schema))
667
668     def _check_precondition(self, cfg):
669         """Check if the environment meet the precondition"""
670
671         if "precondition" in cfg:
672             precondition = cfg["precondition"]
673             installer_type = precondition.get("installer_type", None)
674             deploy_scenarios = precondition.get("deploy_scenarios", None)
675             tc_fit_pods = precondition.get("pod_name", None)
676             installer_type_env = os.environ.get('INSTALL_TYPE', None)
677             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
678             pod_name_env = os.environ.get('NODE_NAME', None)
679
680             LOG.info("installer_type: %s, installer_type_env: %s",
681                      installer_type, installer_type_env)
682             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
683                      deploy_scenarios, deploy_scenario_env)
684             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
685                      tc_fit_pods, pod_name_env)
686             if installer_type and installer_type_env:
687                 if installer_type_env not in installer_type:
688                     return False
689             if deploy_scenarios and deploy_scenario_env:
690                 deploy_scenarios_list = deploy_scenarios.split(',')
691                 for deploy_scenario in deploy_scenarios_list:
692                     if deploy_scenario_env.startswith(deploy_scenario):
693                         return True
694                 return False
695             if tc_fit_pods and pod_name_env:
696                 if pod_name_env not in tc_fit_pods:
697                     return False
698         return True
699
700
701 def is_ip_addr(addr):
702     """check if string addr is an IP address"""
703     try:
704         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
705     except AttributeError:
706         pass
707
708     try:
709         ipaddress.ip_address(addr.encode('utf-8'))
710     except ValueError:
711         return False
712     else:
713         return True
714
715
716 def _is_background_scenario(scenario):
717     if "run_in_background" in scenario:
718         return scenario["run_in_background"]
719     else:
720         return False
721
722
723 def parse_nodes_with_context(scenario_cfg):
724     """parse the 'nodes' fields in scenario """
725     # ensure consistency in node instantiation order
726     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
727                        for nodename in sorted(scenario_cfg["nodes"]))
728
729
730 def get_networks_from_nodes(nodes):
731     """parse the 'nodes' fields in scenario """
732     networks = {}
733     for node in nodes.values():
734         if not node:
735             continue
736         interfaces = node.get('interfaces', {})
737         for interface in interfaces.values():
738             # vld_id is network_name
739             network_name = interface.get('network_name')
740             if not network_name:
741                 continue
742             network = Context.get_network(network_name)
743             if network:
744                 networks[network['name']] = network
745     return networks
746
747
748 def runner_join(runner, background_runners, outputs, result):
749     """join (wait for) a runner, exit process at runner failure
750     :param background_runners:
751     :type background_runners:
752     :param outputs:
753     :type outputs: dict
754     :param result:
755     :type result: list
756     """
757     while runner.poll() is None:
758         outputs.update(runner.get_output())
759         result.extend(runner.get_result())
760         # drain all the background runner queues
761         for background in background_runners:
762             outputs.update(background.get_output())
763             result.extend(background.get_result())
764     status = runner.join(outputs, result)
765     base_runner.Runner.release(runner)
766     return status
767
768
769 def print_invalid_header(source_name, args):
770     print("Invalid %(source)s passed:\n\n %(args)s\n"
771           % {"source": source_name, "args": args})
772
773
774 def parse_task_args(src_name, args):
775     if isinstance(args, collections.Mapping):
776         return args
777
778     try:
779         kw = args and yaml_load(args)
780         kw = {} if kw is None else kw
781     except yaml.parser.ParserError as e:
782         print_invalid_header(src_name, args)
783         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
784               % {"source": src_name, "err": e})
785         raise TypeError()
786
787     if not isinstance(kw, dict):
788         print_invalid_header(src_name, args)
789         print("%(src)s had to be dict, actually %(src_type)s\n"
790               % {"src": src_name, "src_type": type(kw)})
791         raise TypeError()
792     return kw