add yardstick iruya 9.0.0 release notes
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import sys
11 import os
12 from collections import OrderedDict
13
14 import six
15 import yaml
16 import atexit
17 import ipaddress
18 import time
19 import logging
20 import uuid
21 import collections
22
23 from six.moves import filter
24 from jinja2 import Environment
25
26 from yardstick.benchmark import contexts
27 from yardstick.benchmark.contexts import base as base_context
28 from yardstick.benchmark.runners import base as base_runner
29 from yardstick.common.constants import CONF_FILE
30 from yardstick.common.yaml_loader import yaml_load
31 from yardstick.dispatcher.base import Base as DispatcherBase
32 from yardstick.common import constants
33 from yardstick.common import exceptions as y_exc
34 from yardstick.common import task_template
35 from yardstick.common import utils
36 from yardstick.common.html_template import report_template
37
38 output_file_default = "/tmp/yardstick.out"
39 test_cases_dir_default = "tests/opnfv/test_cases/"
40 LOG = logging.getLogger(__name__)
41
42
43 class Task(object):     # pragma: no cover
44     """Task commands.
45
46        Set of commands to manage benchmark tasks.
47     """
48
49     def __init__(self):
50         self.contexts = []
51         self.outputs = {}
52
53     def _set_dispatchers(self, output_config):
54         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
55                                                            'file')
56         out_types = [s.strip() for s in dispatchers.split(',')]
57         output_config['DEFAULT']['dispatcher'] = out_types
58
59     def start(self, args, **kwargs):  # pylint: disable=unused-argument
60         """Start a benchmark scenario."""
61
62         atexit.register(self.atexit_handler)
63
64         task_id = getattr(args, 'task_id')
65         self.task_id = task_id if task_id else str(uuid.uuid4())
66
67         self._set_log()
68
69         try:
70             output_config = utils.parse_ini_file(CONF_FILE)
71         except Exception:  # pylint: disable=broad-except
72             # all error will be ignore, the default value is {}
73             output_config = {}
74
75         self._init_output_config(output_config)
76         self._set_output_config(output_config, args.output_file)
77         LOG.debug('Output configuration is: %s', output_config)
78
79         self._set_dispatchers(output_config)
80
81         # update dispatcher list
82         if 'file' in output_config['DEFAULT']['dispatcher']:
83             result = {'status': 0, 'result': {}}
84             utils.write_json_to_file(args.output_file, result)
85
86         total_start_time = time.time()
87         parser = TaskParser(args.inputfile[0])
88
89         if args.suite:
90             # 1.parse suite, return suite_params info
91             task_files, task_args, task_args_fnames = parser.parse_suite()
92         else:
93             task_files = [parser.path]
94             task_args = [args.task_args]
95             task_args_fnames = [args.task_args_file]
96
97         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
98                   task_files, task_args, task_args_fnames)
99
100         if args.parse_only:
101             sys.exit(0)
102
103         testcases = {}
104         tasks = self._parse_tasks(parser, task_files, args, task_args,
105                                   task_args_fnames)
106
107         # Execute task files.
108         for i, _ in enumerate(task_files):
109             one_task_start_time = time.time()
110             self.contexts.extend(tasks[i]['contexts'])
111             if not tasks[i]['meet_precondition']:
112                 LOG.info('"meet_precondition" is %s, please check environment',
113                          tasks[i]['meet_precondition'])
114                 continue
115
116             try:
117                 success, data = self._run(tasks[i]['scenarios'],
118                                           tasks[i]['run_in_parallel'],
119                                           output_config)
120             except KeyboardInterrupt:
121                 raise
122             except Exception:  # pylint: disable=broad-except
123                 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
124                           exc_info=True)
125                 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
126                                                     'tc_data': []}
127             else:
128                 if success:
129                     LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name'])
130                     testcases[tasks[i]['case_name']] = {'criteria': 'PASS',
131                                                         'tc_data': data}
132                 else:
133                     LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
134                               exc_info=True)
135                     testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
136                                                         'tc_data': data}
137
138             if args.keep_deploy:
139                 # keep deployment, forget about stack
140                 # (hide it for exit handler)
141                 self.contexts = []
142             else:
143                 for context in self.contexts[::-1]:
144                     context.undeploy()
145                 self.contexts = []
146             one_task_end_time = time.time()
147             LOG.info("Task %s finished in %d secs", task_files[i],
148                      one_task_end_time - one_task_start_time)
149
150         result = self._get_format_result(testcases)
151
152         self._do_output(output_config, result)
153         self._generate_reporting(result)
154
155         total_end_time = time.time()
156         LOG.info("Total finished in %d secs",
157                  total_end_time - total_start_time)
158
159         LOG.info('To generate report, execute command "yardstick report '
160                  'generate %s <YAML_NAME>"', self.task_id)
161         LOG.info("Task ALL DONE, exiting")
162         return result
163
164     def _generate_reporting(self, result):
165         env = Environment()
166         with open(constants.REPORTING_FILE, 'w') as f:
167             f.write(env.from_string(report_template).render(result))
168
169         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
170
171     def _set_log(self):
172         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
173         log_formatter = logging.Formatter(log_format)
174
175         utils.makedirs(constants.TASK_LOG_DIR)
176         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
177         log_handler = logging.FileHandler(log_path)
178         log_handler.setFormatter(log_formatter)
179         log_handler.setLevel(logging.DEBUG)
180
181         logging.root.addHandler(log_handler)
182
183     def _init_output_config(self, output_config):
184         output_config.setdefault('DEFAULT', {})
185         output_config.setdefault('dispatcher_http', {})
186         output_config.setdefault('dispatcher_file', {})
187         output_config.setdefault('dispatcher_influxdb', {})
188         output_config.setdefault('nsb', {})
189
190     def _set_output_config(self, output_config, file_path):
191         try:
192             out_type = os.environ['DISPATCHER']
193         except KeyError:
194             output_config['DEFAULT'].setdefault('dispatcher', 'file')
195         else:
196             output_config['DEFAULT']['dispatcher'] = out_type
197
198         output_config['dispatcher_file']['file_path'] = file_path
199
200         try:
201             target = os.environ['TARGET']
202         except KeyError:
203             pass
204         else:
205             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
206             output_config[k]['target'] = target
207
208     def _get_format_result(self, testcases):
209         criteria = self._get_task_criteria(testcases)
210
211         info = {
212             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
213             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
214             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
215             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
216         }
217
218         result = {
219             'status': 1,
220             'result': {
221                 'criteria': criteria,
222                 'task_id': self.task_id,
223                 'info': info,
224                 'testcases': testcases
225             }
226         }
227
228         return result
229
230     def _get_task_criteria(self, testcases):
231         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
232         if criteria:
233             return 'FAIL'
234         else:
235             return 'PASS'
236
237     def _do_output(self, output_config, result):
238         dispatchers = DispatcherBase.get(output_config)
239         dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
240
241         for dispatcher in dispatchers:
242             dispatcher.flush_result_data(result)
243
244     def _run(self, scenarios, run_in_parallel, output_config):
245         """Deploys context and calls runners"""
246         for context in self.contexts:
247             context.deploy()
248
249         background_runners = []
250
251         task_success = True
252         result = []
253         # Start all background scenarios
254         for scenario in filter(_is_background_scenario, scenarios):
255             scenario["runner"] = dict(type="Duration", duration=1000000000)
256             runner = self.run_one_scenario(scenario, output_config)
257             background_runners.append(runner)
258
259         runners = []
260         if run_in_parallel:
261             for scenario in scenarios:
262                 if not _is_background_scenario(scenario):
263                     runner = self.run_one_scenario(scenario, output_config)
264                     runners.append(runner)
265
266             # Wait for runners to finish
267             for runner in runners:
268                 status = runner_join(runner, background_runners, self.outputs, result)
269                 if status != 0:
270                     LOG.error("%s runner status %s", runner.__execution_type__, status)
271                     task_success = False
272                 LOG.info("Runner ended")
273         else:
274             # run serially
275             for scenario in scenarios:
276                 if not _is_background_scenario(scenario):
277                     runner = self.run_one_scenario(scenario, output_config)
278                     status = runner_join(runner, background_runners, self.outputs, result)
279                     if status != 0:
280                         LOG.error('Scenario NO.%s: "%s" ERROR!',
281                                   scenarios.index(scenario) + 1,
282                                   scenario.get('type'))
283                         LOG.error("%s runner status %s", runner.__execution_type__, status)
284                         task_success = False
285                     LOG.info("Runner ended")
286
287         # Abort background runners
288         for runner in background_runners:
289             runner.abort()
290
291         # Wait for background runners to finish
292         for runner in background_runners:
293             status = runner.join(self.outputs, result)
294             if status is None:
295                 # Nuke if it did not stop nicely
296                 base_runner.Runner.terminate(runner)
297                 runner.join(self.outputs, result)
298             base_runner.Runner.release(runner)
299
300             print("Background task ended")
301         return task_success, result
302
303     def atexit_handler(self):
304         """handler for process termination"""
305         base_runner.Runner.terminate_all()
306
307         if self.contexts:
308             LOG.info("Undeploying all contexts")
309             for context in self.contexts[::-1]:
310                 context.undeploy()
311
312     def _parse_options(self, op):
313         if isinstance(op, dict):
314             return {k: self._parse_options(v) for k, v in op.items()}
315         elif isinstance(op, list):
316             return [self._parse_options(v) for v in op]
317         elif isinstance(op, six.string_types):
318             return self.outputs.get(op[1:]) if op.startswith('$') else op
319         else:
320             return op
321
322     def _parse_tasks(self, parser, task_files, args, task_args,
323                      task_args_fnames):
324         tasks = []
325
326         # Parse task_files.
327         for i, _ in enumerate(task_files):
328             parser.path = task_files[i]
329             tasks.append(parser.parse_task(self.task_id, task_args[i],
330                                            task_args_fnames[i]))
331             tasks[i]['case_name'] = os.path.splitext(
332                 os.path.basename(task_files[i]))[0]
333
334         if args.render_only:
335             utils.makedirs(args.render_only)
336             for idx, task in enumerate(tasks):
337                 output_file_name = os.path.abspath(os.path.join(
338                     args.render_only,
339                     '{0:03d}-{1}.yml'.format(idx, task['case_name'])))
340                 utils.write_file(output_file_name, task['rendered'])
341
342             sys.exit(0)
343
344         return tasks
345
346     def run_one_scenario(self, scenario_cfg, output_config):
347         """run one scenario using context"""
348         runner_cfg = scenario_cfg["runner"]
349         runner_cfg['output_config'] = output_config
350
351         options = scenario_cfg.get('options', {})
352         scenario_cfg['options'] = self._parse_options(options)
353
354         # TODO support get multi hosts/vms info
355         context_cfg = {}
356         options = scenario_cfg.get('options') or {}
357         server_name = options.get('server_name') or {}
358
359         def config_context_target(cfg):
360             target = cfg['target']
361             if is_ip_addr(target):
362                 context_cfg['target'] = {"ipaddr": target}
363             else:
364                 context_cfg['target'] = base_context.Context.get_server(target)
365                 if self._is_same_context(cfg["host"], target):
366                     context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
367                 else:
368                     context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
369
370         host_name = server_name.get('host', scenario_cfg.get('host'))
371         if host_name:
372             context_cfg['host'] = base_context.Context.get_server(host_name)
373
374         for item in [server_name, scenario_cfg]:
375             try:
376                 config_context_target(item)
377             except KeyError:
378                 LOG.debug("Got a KeyError in config_context_target(%s)", item)
379             else:
380                 break
381
382         if "targets" in scenario_cfg:
383             ip_list = []
384             for target in scenario_cfg["targets"]:
385                 if is_ip_addr(target):
386                     ip_list.append(target)
387                     context_cfg['target'] = {}
388                 else:
389                     context_cfg['target'] = (
390                         base_context.Context.get_server(target))
391                     if self._is_same_context(scenario_cfg["host"],
392                                              target):
393                         ip_list.append(context_cfg["target"]["private_ip"])
394                     else:
395                         ip_list.append(context_cfg["target"]["ip"])
396             context_cfg['target']['ipaddr'] = ','.join(ip_list)
397
398         if "nodes" in scenario_cfg:
399             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
400             context_cfg["networks"] = get_networks_from_nodes(
401                 context_cfg["nodes"])
402
403         runner = base_runner.Runner.get(runner_cfg)
404
405         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
406         runner.run(scenario_cfg, context_cfg)
407
408         return runner
409
410     def _is_same_context(self, host_attr, target_attr):
411         """check if two servers are in the same heat context
412         host_attr: either a name for a server created by yardstick or a dict
413         with attribute name mapping when using external heat templates
414         target_attr: either a name for a server created by yardstick or a dict
415         with attribute name mapping when using external heat templates
416         """
417         for context in self.contexts:
418             if context.__context_type__ not in {contexts.CONTEXT_HEAT,
419                                                 contexts.CONTEXT_KUBERNETES}:
420                 continue
421
422             host = context._get_server(host_attr)
423             if host is None:
424                 continue
425
426             target = context._get_server(target_attr)
427             if target is None:
428                 return False
429
430             # Both host and target is not None, then they are in the
431             # same heat context.
432             return True
433
434         return False
435
436
437 class TaskParser(object):       # pragma: no cover
438     """Parser for task config files in yaml format"""
439
440     def __init__(self, path):
441         self.path = path
442
443     def _meet_constraint(self, task, cur_pod, cur_installer):
444         if "constraint" in task:
445             constraint = task.get('constraint', None)
446             if constraint is not None:
447                 tc_fit_pod = constraint.get('pod', None)
448                 tc_fit_installer = constraint.get('installer', None)
449                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
450                          cur_pod, cur_installer, constraint)
451                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
452                     return False
453                 if (cur_installer is None) or (tc_fit_installer and cur_installer
454                                                not in tc_fit_installer):
455                     return False
456         return True
457
458     def _get_task_para(self, task, cur_pod):
459         task_args = task.get('task_args', None)
460         if task_args is not None:
461             task_args = task_args.get(cur_pod, task_args.get('default'))
462         task_args_fnames = task.get('task_args_fnames', None)
463         if task_args_fnames is not None:
464             task_args_fnames = task_args_fnames.get(cur_pod, None)
465         return task_args, task_args_fnames
466
467     def parse_suite(self):
468         """parse the suite file and return a list of task config file paths
469            and lists of optional parameters if present"""
470         LOG.info("\nParsing suite file:%s", self.path)
471
472         try:
473             with open(self.path) as stream:
474                 cfg = yaml_load(stream)
475         except IOError as ioerror:
476             sys.exit(ioerror)
477
478         self._check_schema(cfg["schema"], "suite")
479         LOG.info("\nStarting scenario:%s", cfg["name"])
480
481         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
482         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
483                                       test_cases_dir)
484         if test_cases_dir[-1] != os.sep:
485             test_cases_dir += os.sep
486
487         cur_pod = os.environ.get('NODE_NAME', None)
488         cur_installer = os.environ.get('INSTALLER_TYPE', None)
489
490         valid_task_files = []
491         valid_task_args = []
492         valid_task_args_fnames = []
493
494         for task in cfg["test_cases"]:
495             # 1.check file_name
496             if "file_name" in task:
497                 task_fname = task.get('file_name', None)
498                 if task_fname is None:
499                     continue
500             else:
501                 continue
502             # 2.check constraint
503             if self._meet_constraint(task, cur_pod, cur_installer):
504                 valid_task_files.append(test_cases_dir + task_fname)
505             else:
506                 continue
507             # 3.fetch task parameters
508             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
509             valid_task_args.append(task_args)
510             valid_task_args_fnames.append(task_args_fnames)
511
512         return valid_task_files, valid_task_args, valid_task_args_fnames
513
514     def _render_task(self, task_args, task_args_file):
515         """Render the input task with the given arguments
516
517         :param task_args: (dict) arguments to render the task
518         :param task_args_file: (str) file containing the arguments to render
519                                the task
520         :return: (str) task file rendered
521         """
522         try:
523             kw = {}
524             if task_args_file:
525                 with open(task_args_file) as f:
526                     kw.update(parse_task_args('task_args_file', f.read()))
527             kw.update(parse_task_args('task_args', task_args))
528         except TypeError:
529             raise y_exc.TaskRenderArgumentError()
530
531         input_task = None
532         try:
533             with open(self.path) as f:
534                 input_task = f.read()
535             rendered_task = task_template.TaskTemplate.render(input_task, **kw)
536             LOG.debug('Input task is:\n%s', rendered_task)
537             parsed_task = yaml_load(rendered_task)
538         except (IOError, OSError):
539             raise y_exc.TaskReadError(task_file=self.path)
540         except Exception:
541             raise y_exc.TaskRenderError(input_task=input_task)
542
543         return parsed_task, rendered_task
544
545     def parse_task(self, task_id, task_args=None, task_args_file=None):
546         """parses the task file and return an context and scenario instances"""
547         LOG.info("Parsing task config: %s", self.path)
548
549         cfg, rendered = self._render_task(task_args, task_args_file)
550         self._check_schema(cfg["schema"], "task")
551         meet_precondition = self._check_precondition(cfg)
552
553         # TODO: support one or many contexts? Many would simpler and precise
554         # TODO: support hybrid context type
555         if "context" in cfg:
556             context_cfgs = [cfg["context"]]
557         elif "contexts" in cfg:
558             context_cfgs = cfg["contexts"]
559         else:
560             context_cfgs = [{"type": contexts.CONTEXT_DUMMY}]
561
562         _contexts = []
563         for cfg_attrs in context_cfgs:
564
565             cfg_attrs['task_id'] = task_id
566             # default to Heat context because we are testing OpenStack
567             context_type = cfg_attrs.get("type", contexts.CONTEXT_HEAT)
568             context = base_context.Context.get(context_type)
569             context.init(cfg_attrs)
570             # Update the name in case the context has used the name_suffix
571             cfg_attrs['name'] = context.name
572             _contexts.append(context)
573
574         run_in_parallel = cfg.get("run_in_parallel", False)
575
576         # add tc and task id for influxdb extended tags
577         for scenario in cfg["scenarios"]:
578             task_name = os.path.splitext(os.path.basename(self.path))[0]
579             scenario["tc"] = task_name
580             scenario["task_id"] = task_id
581             # embed task path into scenario so we can load other files
582             # relative to task path
583             scenario["task_path"] = os.path.dirname(self.path)
584
585             self._change_node_names(scenario, _contexts)
586
587         # TODO we need something better here, a class that represent the file
588         return {'scenarios': cfg['scenarios'],
589                 'run_in_parallel': run_in_parallel,
590                 'meet_precondition': meet_precondition,
591                 'contexts': _contexts,
592                 'rendered': rendered}
593
594     @staticmethod
595     def _change_node_names(scenario, _contexts):
596         """Change the node names in a scenario, depending on the context config
597
598         The nodes (VMs or physical servers) are referred in the context section
599         with the name of the server and the name of the context:
600             <server name>.<context name>
601
602         If the context is going to be undeployed at the end of the test, the
603         task ID is suffixed to the name to avoid interferences with previous
604         deployments. If the context needs to be deployed at the end of the
605         test, the name assigned is kept.
606
607         There are several places where a node name could appear in the scenario
608         configuration:
609         scenario:
610           host: athena.demo
611           target: kratos.demo
612           targets:
613             - athena.demo
614             - kratos.demo
615
616         scenario:
617           options:
618             server_name:  # JIRA: YARDSTICK-810
619               host: athena.demo
620               target: kratos.demo
621
622         scenario:
623           nodes:
624             tg__0: trafficgen_0.yardstick
625             vnf__0: vnf_0.yardstick
626
627         scenario:
628           nodes:
629             tg__0:
630                 name: trafficgen_0.yardstick
631                 public_ip_attr: "server1_public_ip"
632                 private_ip_attr: "server1_private_ip"
633             vnf__0:
634                 name: vnf_0.yardstick
635                 public_ip_attr: "server2_public_ip"
636                 private_ip_attr: "server2_private_ip"
637         NOTE: in Kubernetes context, the separator character between the server
638         name and the context name is "-":
639         scenario:
640           host: host-k8s
641           target: target-k8s
642         """
643         def qualified_name(name):
644             for context in _contexts:
645                 host_name, ctx_name = context.split_host_name(name)
646                 if context.assigned_name == ctx_name:
647                     return '{}{}{}'.format(host_name,
648                                            context.host_name_separator,
649                                            context.name)
650
651             raise y_exc.ScenarioConfigContextNameNotFound(host_name=name)
652
653         if 'host' in scenario:
654             scenario['host'] = qualified_name(scenario['host'])
655         if 'target' in scenario:
656             scenario['target'] = qualified_name(scenario['target'])
657         options = scenario.get('options') or {}
658         server_name = options.get('server_name') or {}
659         if 'host' in server_name:
660             server_name['host'] = qualified_name(server_name['host'])
661         if 'target' in server_name:
662             server_name['target'] = qualified_name(server_name['target'])
663         if 'targets' in scenario:
664             for idx, target in enumerate(scenario['targets']):
665                 scenario['targets'][idx] = qualified_name(target)
666         if 'nodes' in scenario:
667             for scenario_node, target in scenario['nodes'].items():
668                 if isinstance(target, collections.Mapping):
669                     # Update node info on scenario with context info
670                     # Just update the node name with context
671                     # Append context information
672                     target['name'] = qualified_name(target['name'])
673                     # Then update node
674                     scenario['nodes'][scenario_node] = target
675                 else:
676                     scenario['nodes'][scenario_node] = qualified_name(target)
677
678     def _check_schema(self, cfg_schema, schema_type):
679         """Check if config file is using the correct schema type"""
680
681         if cfg_schema != "yardstick:" + schema_type + ":0.1":
682             sys.exit("error: file %s has unknown schema %s" % (self.path,
683                                                                cfg_schema))
684
685     def _check_precondition(self, cfg):
686         """Check if the environment meet the precondition"""
687
688         if "precondition" in cfg:
689             precondition = cfg["precondition"]
690             installer_type = precondition.get("installer_type", None)
691             deploy_scenarios = precondition.get("deploy_scenarios", None)
692             tc_fit_pods = precondition.get("pod_name", None)
693             installer_type_env = os.environ.get('INSTALL_TYPE', None)
694             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
695             pod_name_env = os.environ.get('NODE_NAME', None)
696
697             LOG.info("installer_type: %s, installer_type_env: %s",
698                      installer_type, installer_type_env)
699             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
700                      deploy_scenarios, deploy_scenario_env)
701             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
702                      tc_fit_pods, pod_name_env)
703             if installer_type and installer_type_env:
704                 if installer_type_env not in installer_type:
705                     return False
706             if deploy_scenarios and deploy_scenario_env:
707                 deploy_scenarios_list = deploy_scenarios.split(',')
708                 for deploy_scenario in deploy_scenarios_list:
709                     if deploy_scenario_env.startswith(deploy_scenario):
710                         return True
711                 return False
712             if tc_fit_pods and pod_name_env:
713                 if pod_name_env not in tc_fit_pods:
714                     return False
715         return True
716
717
718 def is_ip_addr(addr):
719     """check if string addr is an IP address"""
720     try:
721         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
722     except AttributeError:
723         pass
724
725     try:
726         ipaddress.ip_address(addr.encode('utf-8'))
727     except ValueError:
728         return False
729     else:
730         return True
731
732
733 def _is_background_scenario(scenario):
734     if "run_in_background" in scenario:
735         return scenario["run_in_background"]
736     else:
737         return False
738
739
740 def parse_nodes_with_context(scenario_cfg):
741     """parse the 'nodes' fields in scenario """
742     # ensure consistency in node instantiation order
743     return OrderedDict((nodename, base_context.Context.get_server(
744                         scenario_cfg["nodes"][nodename]))
745                        for nodename in sorted(scenario_cfg["nodes"]))
746
747
748 def get_networks_from_nodes(nodes):
749     """parse the 'nodes' fields in scenario """
750     networks = {}
751     for node in nodes.values():
752         if not node:
753             continue
754         interfaces = node.get('interfaces', {})
755         for interface in interfaces.values():
756             # vld_id is network_name
757             network_name = interface.get('network_name')
758             if not network_name:
759                 continue
760             network = base_context.Context.get_network(network_name)
761             if network:
762                 networks[network['name']] = network
763     return networks
764
765
766 def runner_join(runner, background_runners, outputs, result):
767     """join (wait for) a runner, exit process at runner failure
768     :param background_runners:
769     :type background_runners:
770     :param outputs:
771     :type outputs: dict
772     :param result:
773     :type result: list
774     """
775     while runner.poll() is None:
776         outputs.update(runner.get_output())
777         result.extend(runner.get_result())
778         # drain all the background runner queues
779         for background in background_runners:
780             outputs.update(background.get_output())
781             result.extend(background.get_result())
782     status = runner.join(outputs, result)
783     base_runner.Runner.release(runner)
784     return status
785
786
787 def print_invalid_header(source_name, args):
788     print("Invalid %(source)s passed:\n\n %(args)s\n"
789           % {"source": source_name, "args": args})
790
791
792 def parse_task_args(src_name, args):
793     if isinstance(args, collections.Mapping):
794         return args
795
796     try:
797         kw = args and yaml_load(args)
798         kw = {} if kw is None else kw
799     except yaml.parser.ParserError as e:
800         print_invalid_header(src_name, args)
801         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
802               % {"source": src_name, "err": e})
803         raise TypeError()
804
805     if not isinstance(kw, dict):
806         print_invalid_header(src_name, args)
807         print("%(src)s had to be dict, actually %(src_type)s\n"
808               % {"src": src_name, "src_type": type(kw)})
809         raise TypeError()
810     return kw