Merge changes from topic 'report/html_table'
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import sys
11 import os
12 from collections import OrderedDict
13
14 import six
15 import yaml
16 import atexit
17 import ipaddress
18 import time
19 import logging
20 import uuid
21 import collections
22
23 from six.moves import filter
24 from jinja2 import Environment
25
26 from yardstick.benchmark import contexts
27 from yardstick.benchmark.contexts import base as base_context
28 from yardstick.benchmark.runners import base as base_runner
29 from yardstick.common.constants import CONF_FILE
30 from yardstick.common.yaml_loader import yaml_load
31 from yardstick.dispatcher.base import Base as DispatcherBase
32 from yardstick.common import constants
33 from yardstick.common import exceptions as y_exc
34 from yardstick.common import task_template
35 from yardstick.common import utils
36 from yardstick.common.html_template import report_template
37
38 output_file_default = "/tmp/yardstick.out"
39 test_cases_dir_default = "tests/opnfv/test_cases/"
40 LOG = logging.getLogger(__name__)
41
42
43 class Task(object):     # pragma: no cover
44     """Task commands.
45
46        Set of commands to manage benchmark tasks.
47     """
48
49     def __init__(self):
50         self.contexts = []
51         self.outputs = {}
52
53     def _set_dispatchers(self, output_config):
54         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
55                                                            'file')
56         out_types = [s.strip() for s in dispatchers.split(',')]
57         output_config['DEFAULT']['dispatcher'] = out_types
58
59     def start(self, args, **kwargs):  # pylint: disable=unused-argument
60         """Start a benchmark scenario."""
61
62         atexit.register(self.atexit_handler)
63
64         task_id = getattr(args, 'task_id')
65         self.task_id = task_id if task_id else str(uuid.uuid4())
66
67         self._set_log()
68
69         try:
70             output_config = utils.parse_ini_file(CONF_FILE)
71         except Exception:  # pylint: disable=broad-except
72             # all error will be ignore, the default value is {}
73             output_config = {}
74
75         self._init_output_config(output_config)
76         self._set_output_config(output_config, args.output_file)
77         LOG.debug('Output configuration is: %s', output_config)
78
79         self._set_dispatchers(output_config)
80
81         # update dispatcher list
82         if 'file' in output_config['DEFAULT']['dispatcher']:
83             result = {'status': 0, 'result': {}}
84             utils.write_json_to_file(args.output_file, result)
85
86         total_start_time = time.time()
87         parser = TaskParser(args.inputfile[0])
88
89         if args.suite:
90             # 1.parse suite, return suite_params info
91             task_files, task_args, task_args_fnames = parser.parse_suite()
92         else:
93             task_files = [parser.path]
94             task_args = [args.task_args]
95             task_args_fnames = [args.task_args_file]
96
97         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
98                   task_files, task_args, task_args_fnames)
99
100         if args.parse_only:
101             sys.exit(0)
102
103         testcases = {}
104         tasks = self._parse_tasks(parser, task_files, args, task_args,
105                                   task_args_fnames)
106
107         # Execute task files.
108         for i, _ in enumerate(task_files):
109             one_task_start_time = time.time()
110             self.contexts.extend(tasks[i]['contexts'])
111             if not tasks[i]['meet_precondition']:
112                 LOG.info('"meet_precondition" is %s, please check environment',
113                          tasks[i]['meet_precondition'])
114                 continue
115
116             try:
117                 success, data = self._run(tasks[i]['scenarios'],
118                                           tasks[i]['run_in_parallel'],
119                                           output_config)
120             except KeyboardInterrupt:
121                 raise
122             except Exception:  # pylint: disable=broad-except
123                 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
124                           exc_info=True)
125                 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
126                                                     'tc_data': []}
127             else:
128                 if success:
129                     LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name'])
130                     testcases[tasks[i]['case_name']] = {'criteria': 'PASS',
131                                                         'tc_data': data}
132                 else:
133                     LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
134                               exc_info=True)
135                     testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
136                                                         'tc_data': data}
137
138             if args.keep_deploy:
139                 # keep deployment, forget about stack
140                 # (hide it for exit handler)
141                 self.contexts = []
142             else:
143                 for context in self.contexts[::-1]:
144                     context.undeploy()
145                 self.contexts = []
146             one_task_end_time = time.time()
147             LOG.info("Task %s finished in %d secs", task_files[i],
148                      one_task_end_time - one_task_start_time)
149
150         result = self._get_format_result(testcases)
151
152         self._do_output(output_config, result)
153         self._generate_reporting(result)
154
155         total_end_time = time.time()
156         LOG.info("Total finished in %d secs",
157                  total_end_time - total_start_time)
158
159         LOG.info('To generate report, execute command "yardstick report '
160                  'generate %s <YAML_NAME>"', self.task_id)
161         LOG.info("Task ALL DONE, exiting")
162         return result
163
164     def _generate_reporting(self, result):
165         env = Environment()
166         with open(constants.REPORTING_FILE, 'w') as f:
167             f.write(env.from_string(report_template).render(result))
168
169         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
170
171     def _set_log(self):
172         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
173         log_formatter = logging.Formatter(log_format)
174
175         utils.makedirs(constants.TASK_LOG_DIR)
176         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
177         log_handler = logging.FileHandler(log_path)
178         log_handler.setFormatter(log_formatter)
179         log_handler.setLevel(logging.DEBUG)
180
181         logging.root.addHandler(log_handler)
182
183     def _init_output_config(self, output_config):
184         output_config.setdefault('DEFAULT', {})
185         output_config.setdefault('dispatcher_http', {})
186         output_config.setdefault('dispatcher_file', {})
187         output_config.setdefault('dispatcher_influxdb', {})
188         output_config.setdefault('nsb', {})
189
190     def _set_output_config(self, output_config, file_path):
191         try:
192             out_type = os.environ['DISPATCHER']
193         except KeyError:
194             output_config['DEFAULT'].setdefault('dispatcher', 'file')
195         else:
196             output_config['DEFAULT']['dispatcher'] = out_type
197
198         output_config['dispatcher_file']['file_path'] = file_path
199
200         try:
201             target = os.environ['TARGET']
202         except KeyError:
203             pass
204         else:
205             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
206             output_config[k]['target'] = target
207
208     def _get_format_result(self, testcases):
209         criteria = self._get_task_criteria(testcases)
210
211         info = {
212             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
213             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
214             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
215             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
216         }
217
218         result = {
219             'status': 1,
220             'result': {
221                 'criteria': criteria,
222                 'task_id': self.task_id,
223                 'info': info,
224                 'testcases': testcases
225             }
226         }
227
228         return result
229
230     def _get_task_criteria(self, testcases):
231         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
232         if criteria:
233             return 'FAIL'
234         else:
235             return 'PASS'
236
237     def _do_output(self, output_config, result):
238         dispatchers = DispatcherBase.get(output_config)
239         dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
240
241         for dispatcher in dispatchers:
242             dispatcher.flush_result_data(result)
243
244     def _run(self, scenarios, run_in_parallel, output_config):
245         """Deploys context and calls runners"""
246         for context in self.contexts:
247             context.deploy()
248
249         background_runners = []
250
251         task_success = True
252         result = []
253         # Start all background scenarios
254         for scenario in filter(_is_background_scenario, scenarios):
255             scenario["runner"] = dict(type="Duration", duration=1000000000)
256             runner = self.run_one_scenario(scenario, output_config)
257             background_runners.append(runner)
258
259         runners = []
260         if run_in_parallel:
261             for scenario in scenarios:
262                 if not _is_background_scenario(scenario):
263                     runner = self.run_one_scenario(scenario, output_config)
264                     runners.append(runner)
265
266             # Wait for runners to finish
267             for runner in runners:
268                 status = runner_join(runner, background_runners, self.outputs, result)
269                 if status != 0:
270                     LOG.error("%s runner status %s", runner.__execution_type__, status)
271                     task_success = False
272                 LOG.info("Runner ended")
273         else:
274             # run serially
275             for scenario in scenarios:
276                 if not _is_background_scenario(scenario):
277                     runner = self.run_one_scenario(scenario, output_config)
278                     status = runner_join(runner, background_runners, self.outputs, result)
279                     if status != 0:
280                         LOG.error('Scenario NO.%s: "%s" ERROR!',
281                                   scenarios.index(scenario) + 1,
282                                   scenario.get('type'))
283                         LOG.error("%s runner status %s", runner.__execution_type__, status)
284                         task_success = False
285                     LOG.info("Runner ended")
286
287         # Abort background runners
288         for runner in background_runners:
289             runner.abort()
290
291         # Wait for background runners to finish
292         for runner in background_runners:
293             status = runner.join(self.outputs, result)
294             if status is None:
295                 # Nuke if it did not stop nicely
296                 base_runner.Runner.terminate(runner)
297                 runner.join(self.outputs, result)
298             base_runner.Runner.release(runner)
299
300             print("Background task ended")
301         return task_success, result
302
303     def atexit_handler(self):
304         """handler for process termination"""
305         base_runner.Runner.terminate_all()
306
307         if self.contexts:
308             LOG.info("Undeploying all contexts")
309             for context in self.contexts[::-1]:
310                 context.undeploy()
311
312     def _parse_options(self, op):
313         if isinstance(op, dict):
314             return {k: self._parse_options(v) for k, v in op.items()}
315         elif isinstance(op, list):
316             return [self._parse_options(v) for v in op]
317         elif isinstance(op, six.string_types):
318             return self.outputs.get(op[1:]) if op.startswith('$') else op
319         else:
320             return op
321
322     def _parse_tasks(self, parser, task_files, args, task_args,
323                      task_args_fnames):
324         tasks = []
325
326         # Parse task_files.
327         for i, _ in enumerate(task_files):
328             parser.path = task_files[i]
329             tasks.append(parser.parse_task(self.task_id, task_args[i],
330                                            task_args_fnames[i]))
331             tasks[i]['case_name'] = os.path.splitext(
332                 os.path.basename(task_files[i]))[0]
333
334         if args.render_only:
335             utils.makedirs(args.render_only)
336             for idx, task in enumerate(tasks):
337                 output_file_name = os.path.abspath(os.path.join(
338                     args.render_only,
339                     '{0:03d}-{1}.yml'.format(idx, task['case_name'])))
340                 utils.write_file(output_file_name, task['rendered'])
341
342             sys.exit(0)
343
344         return tasks
345
346     def run_one_scenario(self, scenario_cfg, output_config):
347         """run one scenario using context"""
348         runner_cfg = scenario_cfg["runner"]
349         runner_cfg['output_config'] = output_config
350
351         options = scenario_cfg.get('options', {})
352         scenario_cfg['options'] = self._parse_options(options)
353
354         # TODO support get multi hosts/vms info
355         context_cfg = {}
356         options = scenario_cfg.get('options') or {}
357         server_name = options.get('server_name') or {}
358
359         def config_context_target(cfg):
360             target = cfg['target']
361             if is_ip_addr(target):
362                 context_cfg['target'] = {"ipaddr": target}
363             else:
364                 context_cfg['target'] = base_context.Context.get_server(target)
365                 if self._is_same_context(cfg["host"], target):
366                     context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
367                 else:
368                     context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
369
370         host_name = server_name.get('host', scenario_cfg.get('host'))
371         if host_name:
372             context_cfg['host'] = base_context.Context.get_server(host_name)
373
374         for item in [server_name, scenario_cfg]:
375             try:
376                 config_context_target(item)
377             except KeyError:
378                 LOG.debug("Got a KeyError in config_context_target(%s)", item)
379             else:
380                 break
381
382         if "targets" in scenario_cfg:
383             ip_list = []
384             for target in scenario_cfg["targets"]:
385                 if is_ip_addr(target):
386                     ip_list.append(target)
387                     context_cfg['target'] = {}
388                 else:
389                     context_cfg['target'] = (
390                         base_context.Context.get_server(target))
391                     if self._is_same_context(scenario_cfg["host"],
392                                              target):
393                         ip_list.append(context_cfg["target"]["private_ip"])
394                     else:
395                         ip_list.append(context_cfg["target"]["ip"])
396             context_cfg['target']['ipaddr'] = ','.join(ip_list)
397
398         if "nodes" in scenario_cfg:
399             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
400             context_cfg["networks"] = get_networks_from_nodes(
401                 context_cfg["nodes"])
402
403         runner = base_runner.Runner.get(runner_cfg)
404
405         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
406         runner.run(scenario_cfg, context_cfg)
407
408         return runner
409
410     def _is_same_context(self, host_attr, target_attr):
411         """check if two servers are in the same heat context
412         host_attr: either a name for a server created by yardstick or a dict
413         with attribute name mapping when using external heat templates
414         target_attr: either a name for a server created by yardstick or a dict
415         with attribute name mapping when using external heat templates
416         """
417         for context in self.contexts:
418             if context.__context_type__ not in {contexts.CONTEXT_HEAT,
419                                                 contexts.CONTEXT_KUBERNETES}:
420                 continue
421
422             host = context._get_server(host_attr)
423             if host is None:
424                 continue
425
426             target = context._get_server(target_attr)
427             if target is None:
428                 return False
429
430             # Both host and target is not None, then they are in the
431             # same heat context.
432             return True
433
434         return False
435
436
437 class TaskParser(object):       # pragma: no cover
438     """Parser for task config files in yaml format"""
439
440     def __init__(self, path):
441         self.path = path
442
443     def _meet_constraint(self, task, cur_pod, cur_installer):
444         if "constraint" in task:
445             constraint = task.get('constraint', None)
446             if constraint is not None:
447                 tc_fit_pod = constraint.get('pod', None)
448                 tc_fit_installer = constraint.get('installer', None)
449                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
450                          cur_pod, cur_installer, constraint)
451                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
452                     return False
453                 if (cur_installer is None) or (tc_fit_installer and cur_installer
454                                                not in tc_fit_installer):
455                     return False
456         return True
457
458     def _get_task_para(self, task, cur_pod):
459         task_args = task.get('task_args', None)
460         if task_args is not None:
461             task_args = task_args.get(cur_pod, task_args.get('default'))
462         task_args_fnames = task.get('task_args_fnames', None)
463         if task_args_fnames is not None:
464             task_args_fnames = task_args_fnames.get(cur_pod, None)
465         return task_args, task_args_fnames
466
467     def parse_suite(self):
468         """parse the suite file and return a list of task config file paths
469            and lists of optional parameters if present"""
470         LOG.info("\nParsing suite file:%s", self.path)
471
472         try:
473             with open(self.path) as stream:
474                 cfg = yaml_load(stream)
475         except IOError as ioerror:
476             sys.exit(ioerror)
477
478         self._check_schema(cfg["schema"], "suite")
479         LOG.info("\nStarting scenario:%s", cfg["name"])
480
481         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
482         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
483                                       test_cases_dir)
484         if test_cases_dir[-1] != os.sep:
485             test_cases_dir += os.sep
486
487         cur_pod = os.environ.get('NODE_NAME', None)
488         cur_installer = os.environ.get('INSTALLER_TYPE', None)
489
490         valid_task_files = []
491         valid_task_args = []
492         valid_task_args_fnames = []
493
494         for task in cfg["test_cases"]:
495             # 1.check file_name
496             if "file_name" in task:
497                 task_fname = task.get('file_name', None)
498                 if task_fname is None:
499                     continue
500             else:
501                 continue
502             # 2.check constraint
503             if self._meet_constraint(task, cur_pod, cur_installer):
504                 valid_task_files.append(test_cases_dir + task_fname)
505             else:
506                 continue
507             # 3.fetch task parameters
508             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
509             valid_task_args.append(task_args)
510             valid_task_args_fnames.append(task_args_fnames)
511
512         return valid_task_files, valid_task_args, valid_task_args_fnames
513
514     def _render_task(self, task_args, task_args_file):
515         """Render the input task with the given arguments
516
517         :param task_args: (dict) arguments to render the task
518         :param task_args_file: (str) file containing the arguments to render
519                                the task
520         :return: (str) task file rendered
521         """
522         try:
523             kw = {}
524             if task_args_file:
525                 with open(task_args_file) as f:
526                     kw.update(parse_task_args('task_args_file', f.read()))
527             kw.update(parse_task_args('task_args', task_args))
528         except TypeError:
529             raise y_exc.TaskRenderArgumentError()
530
531         input_task = None
532         try:
533             with open(self.path) as f:
534                 input_task = f.read()
535             rendered_task = task_template.TaskTemplate.render(input_task, **kw)
536             LOG.debug('Input task is:\n%s', rendered_task)
537             parsed_task = yaml_load(rendered_task)
538         except (IOError, OSError):
539             raise y_exc.TaskReadError(task_file=self.path)
540         except Exception:
541             raise y_exc.TaskRenderError(input_task=input_task)
542
543         return parsed_task, rendered_task
544
545     def parse_task(self, task_id, task_args=None, task_args_file=None):
546         """parses the task file and return an context and scenario instances"""
547         LOG.info("Parsing task config: %s", self.path)
548
549         cfg, rendered = self._render_task(task_args, task_args_file)
550         self._check_schema(cfg["schema"], "task")
551         meet_precondition = self._check_precondition(cfg)
552
553         # TODO: support one or many contexts? Many would simpler and precise
554         # TODO: support hybrid context type
555         if "context" in cfg:
556             context_cfgs = [cfg["context"]]
557         elif "contexts" in cfg:
558             context_cfgs = cfg["contexts"]
559         else:
560             context_cfgs = [{"type": contexts.CONTEXT_DUMMY}]
561
562         _contexts = []
563         for cfg_attrs in context_cfgs:
564
565             cfg_attrs['task_id'] = task_id
566             # default to Heat context because we are testing OpenStack
567             context_type = cfg_attrs.get("type", contexts.CONTEXT_HEAT)
568             context = base_context.Context.get(context_type)
569             context.init(cfg_attrs)
570             # Update the name in case the context has used the name_suffix
571             cfg_attrs['name'] = context.name
572             _contexts.append(context)
573
574         run_in_parallel = cfg.get("run_in_parallel", False)
575
576         # add tc and task id for influxdb extended tags
577         for scenario in cfg["scenarios"]:
578             task_name = os.path.splitext(os.path.basename(self.path))[0]
579             scenario["tc"] = task_name
580             scenario["task_id"] = task_id
581             # embed task path into scenario so we can load other files
582             # relative to task path
583             scenario["task_path"] = os.path.dirname(self.path)
584
585             self._change_node_names(scenario, _contexts)
586
587         # TODO we need something better here, a class that represent the file
588         return {'scenarios': cfg['scenarios'],
589                 'run_in_parallel': run_in_parallel,
590                 'meet_precondition': meet_precondition,
591                 'contexts': _contexts,
592                 'rendered': rendered}
593
594     @staticmethod
595     def _change_node_names(scenario, _contexts):
596         """Change the node names in a scenario, depending on the context config
597
598         The nodes (VMs or physical servers) are referred in the context section
599         with the name of the server and the name of the context:
600             <server name>.<context name>
601
602         If the context is going to be undeployed at the end of the test, the
603         task ID is suffixed to the name to avoid interferences with previous
604         deployments. If the context needs to be deployed at the end of the
605         test, the name assigned is kept.
606
607         There are several places where a node name could appear in the scenario
608         configuration:
609         scenario:
610           host: athena.demo
611           target: kratos.demo
612           targets:
613             - athena.demo
614             - kratos.demo
615
616         scenario:
617           options:
618             server_name:  # JIRA: YARDSTICK-810
619               host: athena.demo
620               target: kratos.demo
621
622         scenario:
623           nodes:
624             tg__0: tg_0.yardstick
625             vnf__0: vnf_0.yardstick
626
627         NOTE: in Kubernetes context, the separator character between the server
628         name and the context name is "-":
629         scenario:
630           host: host-k8s
631           target: target-k8s
632         """
633         def qualified_name(name):
634             for context in _contexts:
635                 host_name, ctx_name = context.split_host_name(name)
636                 if context.assigned_name == ctx_name:
637                     return '{}{}{}'.format(host_name,
638                                            context.host_name_separator,
639                                            context.name)
640
641             raise y_exc.ScenarioConfigContextNameNotFound(host_name=name)
642
643         if 'host' in scenario:
644             scenario['host'] = qualified_name(scenario['host'])
645         if 'target' in scenario:
646             scenario['target'] = qualified_name(scenario['target'])
647         options = scenario.get('options') or {}
648         server_name = options.get('server_name') or {}
649         if 'host' in server_name:
650             server_name['host'] = qualified_name(server_name['host'])
651         if 'target' in server_name:
652             server_name['target'] = qualified_name(server_name['target'])
653         if 'targets' in scenario:
654             for idx, target in enumerate(scenario['targets']):
655                 scenario['targets'][idx] = qualified_name(target)
656         if 'nodes' in scenario:
657             for scenario_node, target in scenario['nodes'].items():
658                 scenario['nodes'][scenario_node] = qualified_name(target)
659
660     def _check_schema(self, cfg_schema, schema_type):
661         """Check if config file is using the correct schema type"""
662
663         if cfg_schema != "yardstick:" + schema_type + ":0.1":
664             sys.exit("error: file %s has unknown schema %s" % (self.path,
665                                                                cfg_schema))
666
667     def _check_precondition(self, cfg):
668         """Check if the environment meet the precondition"""
669
670         if "precondition" in cfg:
671             precondition = cfg["precondition"]
672             installer_type = precondition.get("installer_type", None)
673             deploy_scenarios = precondition.get("deploy_scenarios", None)
674             tc_fit_pods = precondition.get("pod_name", None)
675             installer_type_env = os.environ.get('INSTALL_TYPE', None)
676             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
677             pod_name_env = os.environ.get('NODE_NAME', None)
678
679             LOG.info("installer_type: %s, installer_type_env: %s",
680                      installer_type, installer_type_env)
681             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
682                      deploy_scenarios, deploy_scenario_env)
683             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
684                      tc_fit_pods, pod_name_env)
685             if installer_type and installer_type_env:
686                 if installer_type_env not in installer_type:
687                     return False
688             if deploy_scenarios and deploy_scenario_env:
689                 deploy_scenarios_list = deploy_scenarios.split(',')
690                 for deploy_scenario in deploy_scenarios_list:
691                     if deploy_scenario_env.startswith(deploy_scenario):
692                         return True
693                 return False
694             if tc_fit_pods and pod_name_env:
695                 if pod_name_env not in tc_fit_pods:
696                     return False
697         return True
698
699
700 def is_ip_addr(addr):
701     """check if string addr is an IP address"""
702     try:
703         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
704     except AttributeError:
705         pass
706
707     try:
708         ipaddress.ip_address(addr.encode('utf-8'))
709     except ValueError:
710         return False
711     else:
712         return True
713
714
715 def _is_background_scenario(scenario):
716     if "run_in_background" in scenario:
717         return scenario["run_in_background"]
718     else:
719         return False
720
721
722 def parse_nodes_with_context(scenario_cfg):
723     """parse the 'nodes' fields in scenario """
724     # ensure consistency in node instantiation order
725     return OrderedDict((nodename, base_context.Context.get_server(
726                         scenario_cfg["nodes"][nodename]))
727                        for nodename in sorted(scenario_cfg["nodes"]))
728
729
730 def get_networks_from_nodes(nodes):
731     """parse the 'nodes' fields in scenario """
732     networks = {}
733     for node in nodes.values():
734         if not node:
735             continue
736         interfaces = node.get('interfaces', {})
737         for interface in interfaces.values():
738             # vld_id is network_name
739             network_name = interface.get('network_name')
740             if not network_name:
741                 continue
742             network = base_context.Context.get_network(network_name)
743             if network:
744                 networks[network['name']] = network
745     return networks
746
747
748 def runner_join(runner, background_runners, outputs, result):
749     """join (wait for) a runner, exit process at runner failure
750     :param background_runners:
751     :type background_runners:
752     :param outputs:
753     :type outputs: dict
754     :param result:
755     :type result: list
756     """
757     while runner.poll() is None:
758         outputs.update(runner.get_output())
759         result.extend(runner.get_result())
760         # drain all the background runner queues
761         for background in background_runners:
762             outputs.update(background.get_output())
763             result.extend(background.get_result())
764     status = runner.join(outputs, result)
765     base_runner.Runner.release(runner)
766     return status
767
768
769 def print_invalid_header(source_name, args):
770     print("Invalid %(source)s passed:\n\n %(args)s\n"
771           % {"source": source_name, "args": args})
772
773
774 def parse_task_args(src_name, args):
775     if isinstance(args, collections.Mapping):
776         return args
777
778     try:
779         kw = args and yaml_load(args)
780         kw = {} if kw is None else kw
781     except yaml.parser.ParserError as e:
782         print_invalid_header(src_name, args)
783         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
784               % {"source": src_name, "err": e})
785         raise TypeError()
786
787     if not isinstance(kw, dict):
788         print_invalid_header(src_name, args)
789         print("%(src)s had to be dict, actually %(src_type)s\n"
790               % {"src": src_name, "src_type": type(kw)})
791         raise TypeError()
792     return kw