Merge "Add "render-only" option to "task" command"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import sys
11 import os
12 from collections import OrderedDict
13
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 import uuid
20 import collections
21
22 from six.moves import filter
23 from jinja2 import Environment
24
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.constants import CONF_FILE
28 from yardstick.common.yaml_loader import yaml_load
29 from yardstick.dispatcher.base import Base as DispatcherBase
30 from yardstick.common import constants
31 from yardstick.common import exceptions as y_exc
32 from yardstick.common import task_template
33 from yardstick.common import utils
34 from yardstick.common.html_template import report_template
35
36 output_file_default = "/tmp/yardstick.out"
37 test_cases_dir_default = "tests/opnfv/test_cases/"
38 LOG = logging.getLogger(__name__)
39
40
41 class Task(object):     # pragma: no cover
42     """Task commands.
43
44        Set of commands to manage benchmark tasks.
45     """
46
47     def __init__(self):
48         self.contexts = []
49         self.outputs = {}
50
51     def _set_dispatchers(self, output_config):
52         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
53                                                            'file')
54         out_types = [s.strip() for s in dispatchers.split(',')]
55         output_config['DEFAULT']['dispatcher'] = out_types
56
57     def start(self, args, **kwargs):  # pylint: disable=unused-argument
58         """Start a benchmark scenario."""
59
60         atexit.register(self.atexit_handler)
61
62         task_id = getattr(args, 'task_id')
63         self.task_id = task_id if task_id else str(uuid.uuid4())
64
65         self._set_log()
66
67         try:
68             output_config = utils.parse_ini_file(CONF_FILE)
69         except Exception:  # pylint: disable=broad-except
70             # all error will be ignore, the default value is {}
71             output_config = {}
72
73         self._init_output_config(output_config)
74         self._set_output_config(output_config, args.output_file)
75         LOG.debug('Output configuration is: %s', output_config)
76
77         self._set_dispatchers(output_config)
78
79         # update dispatcher list
80         if 'file' in output_config['DEFAULT']['dispatcher']:
81             result = {'status': 0, 'result': {}}
82             utils.write_json_to_file(args.output_file, result)
83
84         total_start_time = time.time()
85         parser = TaskParser(args.inputfile[0])
86
87         if args.suite:
88             # 1.parse suite, return suite_params info
89             task_files, task_args, task_args_fnames = parser.parse_suite()
90         else:
91             task_files = [parser.path]
92             task_args = [args.task_args]
93             task_args_fnames = [args.task_args_file]
94
95         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
96                   task_files, task_args, task_args_fnames)
97
98         if args.parse_only:
99             sys.exit(0)
100
101         testcases = {}
102         tasks = self._parse_tasks(parser, task_files, args, task_args,
103                                   task_args_fnames)
104
105         # Execute task files.
106         for i, _ in enumerate(task_files):
107             one_task_start_time = time.time()
108             self.contexts.extend(tasks[i]['contexts'])
109             if not tasks[i]['meet_precondition']:
110                 LOG.info('"meet_precondition" is %s, please check environment',
111                          tasks[i]['meet_precondition'])
112                 continue
113
114             try:
115                 data = self._run(tasks[i]['scenarios'],
116                                  tasks[i]['run_in_parallel'],
117                                  output_config)
118             except KeyboardInterrupt:
119                 raise
120             except Exception:  # pylint: disable=broad-except
121                 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
122                           exc_info=True)
123                 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
124                                                     'tc_data': []}
125             else:
126                 LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name'])
127                 testcases[tasks[i]['case_name']] = {'criteria': 'PASS',
128                                                     'tc_data': data}
129
130             if args.keep_deploy:
131                 # keep deployment, forget about stack
132                 # (hide it for exit handler)
133                 self.contexts = []
134             else:
135                 for context in self.contexts[::-1]:
136                     context.undeploy()
137                 self.contexts = []
138             one_task_end_time = time.time()
139             LOG.info("Task %s finished in %d secs", task_files[i],
140                      one_task_end_time - one_task_start_time)
141
142         result = self._get_format_result(testcases)
143
144         self._do_output(output_config, result)
145         self._generate_reporting(result)
146
147         total_end_time = time.time()
148         LOG.info("Total finished in %d secs",
149                  total_end_time - total_start_time)
150
151         LOG.info('To generate report, execute command "yardstick report '
152                  'generate %(task_id)s <yaml_name>s"', self.task_id)
153         LOG.info("Task ALL DONE, exiting")
154         return result
155
156     def _generate_reporting(self, result):
157         env = Environment()
158         with open(constants.REPORTING_FILE, 'w') as f:
159             f.write(env.from_string(report_template).render(result))
160
161         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
162
163     def _set_log(self):
164         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
165         log_formatter = logging.Formatter(log_format)
166
167         utils.makedirs(constants.TASK_LOG_DIR)
168         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
169         log_handler = logging.FileHandler(log_path)
170         log_handler.setFormatter(log_formatter)
171         log_handler.setLevel(logging.DEBUG)
172
173         logging.root.addHandler(log_handler)
174
175     def _init_output_config(self, output_config):
176         output_config.setdefault('DEFAULT', {})
177         output_config.setdefault('dispatcher_http', {})
178         output_config.setdefault('dispatcher_file', {})
179         output_config.setdefault('dispatcher_influxdb', {})
180         output_config.setdefault('nsb', {})
181
182     def _set_output_config(self, output_config, file_path):
183         try:
184             out_type = os.environ['DISPATCHER']
185         except KeyError:
186             output_config['DEFAULT'].setdefault('dispatcher', 'file')
187         else:
188             output_config['DEFAULT']['dispatcher'] = out_type
189
190         output_config['dispatcher_file']['file_path'] = file_path
191
192         try:
193             target = os.environ['TARGET']
194         except KeyError:
195             pass
196         else:
197             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
198             output_config[k]['target'] = target
199
200     def _get_format_result(self, testcases):
201         criteria = self._get_task_criteria(testcases)
202
203         info = {
204             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
205             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
206             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
207             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
208         }
209
210         result = {
211             'status': 1,
212             'result': {
213                 'criteria': criteria,
214                 'task_id': self.task_id,
215                 'info': info,
216                 'testcases': testcases
217             }
218         }
219
220         return result
221
222     def _get_task_criteria(self, testcases):
223         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
224         if criteria:
225             return 'FAIL'
226         else:
227             return 'PASS'
228
229     def _do_output(self, output_config, result):
230         dispatchers = DispatcherBase.get(output_config)
231         dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
232
233         for dispatcher in dispatchers:
234             dispatcher.flush_result_data(result)
235
236     def _run(self, scenarios, run_in_parallel, output_config):
237         """Deploys context and calls runners"""
238         for context in self.contexts:
239             context.deploy()
240
241         background_runners = []
242
243         result = []
244         # Start all background scenarios
245         for scenario in filter(_is_background_scenario, scenarios):
246             scenario["runner"] = dict(type="Duration", duration=1000000000)
247             runner = self.run_one_scenario(scenario, output_config)
248             background_runners.append(runner)
249
250         runners = []
251         if run_in_parallel:
252             for scenario in scenarios:
253                 if not _is_background_scenario(scenario):
254                     runner = self.run_one_scenario(scenario, output_config)
255                     runners.append(runner)
256
257             # Wait for runners to finish
258             for runner in runners:
259                 status = runner_join(runner, background_runners, self.outputs, result)
260                 if status != 0:
261                     raise RuntimeError(
262                         "{0} runner status {1}".format(runner.__execution_type__, status))
263                 LOG.info("Runner ended")
264         else:
265             # run serially
266             for scenario in scenarios:
267                 if not _is_background_scenario(scenario):
268                     runner = self.run_one_scenario(scenario, output_config)
269                     status = runner_join(runner, background_runners, self.outputs, result)
270                     if status != 0:
271                         LOG.error('Scenario NO.%s: "%s" ERROR!',
272                                   scenarios.index(scenario) + 1,
273                                   scenario.get('type'))
274                         raise RuntimeError(
275                             "{0} runner status {1}".format(runner.__execution_type__, status))
276                     LOG.info("Runner ended")
277
278         # Abort background runners
279         for runner in background_runners:
280             runner.abort()
281
282         # Wait for background runners to finish
283         for runner in background_runners:
284             status = runner.join(self.outputs, result)
285             if status is None:
286                 # Nuke if it did not stop nicely
287                 base_runner.Runner.terminate(runner)
288                 runner.join(self.outputs, result)
289             base_runner.Runner.release(runner)
290
291             print("Background task ended")
292         return result
293
294     def atexit_handler(self):
295         """handler for process termination"""
296         base_runner.Runner.terminate_all()
297
298         if self.contexts:
299             LOG.info("Undeploying all contexts")
300             for context in self.contexts[::-1]:
301                 context.undeploy()
302
303     def _parse_options(self, op):
304         if isinstance(op, dict):
305             return {k: self._parse_options(v) for k, v in op.items()}
306         elif isinstance(op, list):
307             return [self._parse_options(v) for v in op]
308         elif isinstance(op, str):
309             return self.outputs.get(op[1:]) if op.startswith('$') else op
310         else:
311             return op
312
313     def _parse_tasks(self, parser, task_files, args, task_args,
314                      task_args_fnames):
315         tasks = []
316
317         # Parse task_files.
318         for i, _ in enumerate(task_files):
319             parser.path = task_files[i]
320             tasks.append(parser.parse_task(self.task_id, task_args[i],
321                                            task_args_fnames[i]))
322             tasks[i]['case_name'] = os.path.splitext(
323                 os.path.basename(task_files[i]))[0]
324
325         if args.render_only:
326             utils.makedirs(args.render_only)
327             for idx, task in enumerate(tasks):
328                 output_file_name = os.path.abspath(os.path.join(
329                     args.render_only,
330                     '{0:03d}-{1}.yml'.format(idx, task['case_name'])))
331                 utils.write_file(output_file_name, task['rendered'])
332
333             sys.exit(0)
334
335         return tasks
336
337     def run_one_scenario(self, scenario_cfg, output_config):
338         """run one scenario using context"""
339         runner_cfg = scenario_cfg["runner"]
340         runner_cfg['output_config'] = output_config
341
342         options = scenario_cfg.get('options', {})
343         scenario_cfg['options'] = self._parse_options(options)
344
345         # TODO support get multi hosts/vms info
346         context_cfg = {}
347         server_name = scenario_cfg.get('options', {}).get('server_name', {})
348
349         def config_context_target(cfg):
350             target = cfg['target']
351             if is_ip_addr(target):
352                 context_cfg['target'] = {"ipaddr": target}
353             else:
354                 context_cfg['target'] = Context.get_server(target)
355                 if self._is_same_context(cfg["host"], target):
356                     context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
357                 else:
358                     context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
359
360         host_name = server_name.get('host', scenario_cfg.get('host'))
361         if host_name:
362             context_cfg['host'] = Context.get_server(host_name)
363
364         for item in [server_name, scenario_cfg]:
365             try:
366                 config_context_target(item)
367             except KeyError:
368                 LOG.debug("Got a KeyError in config_context_target(%s)", item)
369             else:
370                 break
371
372         if "targets" in scenario_cfg:
373             ip_list = []
374             for target in scenario_cfg["targets"]:
375                 if is_ip_addr(target):
376                     ip_list.append(target)
377                     context_cfg['target'] = {}
378                 else:
379                     context_cfg['target'] = Context.get_server(target)
380                     if self._is_same_context(scenario_cfg["host"],
381                                              target):
382                         ip_list.append(context_cfg["target"]["private_ip"])
383                     else:
384                         ip_list.append(context_cfg["target"]["ip"])
385             context_cfg['target']['ipaddr'] = ','.join(ip_list)
386
387         if "nodes" in scenario_cfg:
388             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
389             context_cfg["networks"] = get_networks_from_nodes(
390                 context_cfg["nodes"])
391
392         runner = base_runner.Runner.get(runner_cfg)
393
394         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
395         runner.run(scenario_cfg, context_cfg)
396
397         return runner
398
399     def _is_same_context(self, host_attr, target_attr):
400         """check if two servers are in the same heat context
401         host_attr: either a name for a server created by yardstick or a dict
402         with attribute name mapping when using external heat templates
403         target_attr: either a name for a server created by yardstick or a dict
404         with attribute name mapping when using external heat templates
405         """
406         for context in self.contexts:
407             if context.__context_type__ not in {"Heat", "Kubernetes"}:
408                 continue
409
410             host = context._get_server(host_attr)
411             if host is None:
412                 continue
413
414             target = context._get_server(target_attr)
415             if target is None:
416                 return False
417
418             # Both host and target is not None, then they are in the
419             # same heat context.
420             return True
421
422         return False
423
424
425 class TaskParser(object):       # pragma: no cover
426     """Parser for task config files in yaml format"""
427
428     def __init__(self, path):
429         self.path = path
430
431     def _meet_constraint(self, task, cur_pod, cur_installer):
432         if "constraint" in task:
433             constraint = task.get('constraint', None)
434             if constraint is not None:
435                 tc_fit_pod = constraint.get('pod', None)
436                 tc_fit_installer = constraint.get('installer', None)
437                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
438                          cur_pod, cur_installer, constraint)
439                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
440                     return False
441                 if (cur_installer is None) or (tc_fit_installer and cur_installer
442                                                not in tc_fit_installer):
443                     return False
444         return True
445
446     def _get_task_para(self, task, cur_pod):
447         task_args = task.get('task_args', None)
448         if task_args is not None:
449             task_args = task_args.get(cur_pod, task_args.get('default'))
450         task_args_fnames = task.get('task_args_fnames', None)
451         if task_args_fnames is not None:
452             task_args_fnames = task_args_fnames.get(cur_pod, None)
453         return task_args, task_args_fnames
454
455     def parse_suite(self):
456         """parse the suite file and return a list of task config file paths
457            and lists of optional parameters if present"""
458         LOG.info("\nParsing suite file:%s", self.path)
459
460         try:
461             with open(self.path) as stream:
462                 cfg = yaml_load(stream)
463         except IOError as ioerror:
464             sys.exit(ioerror)
465
466         self._check_schema(cfg["schema"], "suite")
467         LOG.info("\nStarting scenario:%s", cfg["name"])
468
469         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
470         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
471                                       test_cases_dir)
472         if test_cases_dir[-1] != os.sep:
473             test_cases_dir += os.sep
474
475         cur_pod = os.environ.get('NODE_NAME', None)
476         cur_installer = os.environ.get('INSTALLER_TYPE', None)
477
478         valid_task_files = []
479         valid_task_args = []
480         valid_task_args_fnames = []
481
482         for task in cfg["test_cases"]:
483             # 1.check file_name
484             if "file_name" in task:
485                 task_fname = task.get('file_name', None)
486                 if task_fname is None:
487                     continue
488             else:
489                 continue
490             # 2.check constraint
491             if self._meet_constraint(task, cur_pod, cur_installer):
492                 valid_task_files.append(test_cases_dir + task_fname)
493             else:
494                 continue
495             # 3.fetch task parameters
496             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
497             valid_task_args.append(task_args)
498             valid_task_args_fnames.append(task_args_fnames)
499
500         return valid_task_files, valid_task_args, valid_task_args_fnames
501
502     def _render_task(self, task_args, task_args_file):
503         """Render the input task with the given arguments
504
505         :param task_args: (dict) arguments to render the task
506         :param task_args_file: (str) file containing the arguments to render
507                                the task
508         :return: (str) task file rendered
509         """
510         try:
511             kw = {}
512             if task_args_file:
513                 with open(task_args_file) as f:
514                     kw.update(parse_task_args('task_args_file', f.read()))
515             kw.update(parse_task_args('task_args', task_args))
516         except TypeError:
517             raise y_exc.TaskRenderArgumentError()
518
519         input_task = None
520         try:
521             with open(self.path) as f:
522                 input_task = f.read()
523             rendered_task = task_template.TaskTemplate.render(input_task, **kw)
524             LOG.debug('Input task is:\n%s', rendered_task)
525             parsed_task = yaml_load(rendered_task)
526         except (IOError, OSError):
527             raise y_exc.TaskReadError(task_file=self.path)
528         except Exception:
529             raise y_exc.TaskRenderError(input_task=input_task)
530
531         return parsed_task, rendered_task
532
533     def parse_task(self, task_id, task_args=None, task_args_file=None):
534         """parses the task file and return an context and scenario instances"""
535         LOG.info("Parsing task config: %s", self.path)
536
537         cfg, rendered = self._render_task(task_args, task_args_file)
538         self._check_schema(cfg["schema"], "task")
539         meet_precondition = self._check_precondition(cfg)
540
541         # TODO: support one or many contexts? Many would simpler and precise
542         # TODO: support hybrid context type
543         if "context" in cfg:
544             context_cfgs = [cfg["context"]]
545         elif "contexts" in cfg:
546             context_cfgs = cfg["contexts"]
547         else:
548             context_cfgs = [{"type": "Dummy"}]
549
550         contexts = []
551         for cfg_attrs in context_cfgs:
552
553             cfg_attrs['task_id'] = task_id
554             # default to Heat context because we are testing OpenStack
555             context_type = cfg_attrs.get("type", "Heat")
556             context = Context.get(context_type)
557             context.init(cfg_attrs)
558             # Update the name in case the context has used the name_suffix
559             cfg_attrs['name'] = context.name
560             contexts.append(context)
561
562         run_in_parallel = cfg.get("run_in_parallel", False)
563
564         # add tc and task id for influxdb extended tags
565         for scenario in cfg["scenarios"]:
566             task_name = os.path.splitext(os.path.basename(self.path))[0]
567             scenario["tc"] = task_name
568             scenario["task_id"] = task_id
569             # embed task path into scenario so we can load other files
570             # relative to task path
571             scenario["task_path"] = os.path.dirname(self.path)
572
573             self._change_node_names(scenario, contexts)
574
575         # TODO we need something better here, a class that represent the file
576         return {'scenarios': cfg['scenarios'],
577                 'run_in_parallel': run_in_parallel,
578                 'meet_precondition': meet_precondition,
579                 'contexts': contexts,
580                 'rendered': rendered}
581
582     @staticmethod
583     def _change_node_names(scenario, contexts):
584         """Change the node names in a scenario, depending on the context config
585
586         The nodes (VMs or physical servers) are referred in the context section
587         with the name of the server and the name of the context:
588             <server name>.<context name>
589
590         If the context is going to be undeployed at the end of the test, the
591         task ID is suffixed to the name to avoid interferences with previous
592         deployments. If the context needs to be deployed at the end of the
593         test, the name assigned is kept.
594
595         There are several places where a node name could appear in the scenario
596         configuration:
597         scenario:
598           host: athena.demo
599           target: kratos.demo
600           targets:
601             - athena.demo
602             - kratos.demo
603
604         scenario:
605           options:
606             server_name:  # JIRA: YARDSTICK-810
607               host: athena.demo
608               target: kratos.demo
609
610         scenario:
611           nodes:
612             tg__0: tg_0.yardstick
613             vnf__0: vnf_0.yardstick
614         """
615         def qualified_name(name):
616             node_name, context_name = name.split('.')
617             try:
618                 ctx = next((context for context in contexts
619                        if context.assigned_name == context_name))
620             except StopIteration:
621                 raise y_exc.ScenarioConfigContextNameNotFound(
622                     context_name=context_name)
623
624             return '{}.{}'.format(node_name, ctx.name)
625
626         if 'host' in scenario:
627             scenario['host'] = qualified_name(scenario['host'])
628         if 'target' in scenario:
629             scenario['target'] = qualified_name(scenario['target'])
630         server_name = scenario.get('options', {}).get('server_name', {})
631         if 'host' in server_name:
632             server_name['host'] = qualified_name(server_name['host'])
633         if 'target' in server_name:
634             server_name['target'] = qualified_name(server_name['target'])
635         if 'targets' in scenario:
636             for idx, target in enumerate(scenario['targets']):
637                 scenario['targets'][idx] = qualified_name(target)
638         if 'nodes' in scenario:
639             for scenario_node, target in scenario['nodes'].items():
640                 scenario['nodes'][scenario_node] = qualified_name(target)
641
642     def _check_schema(self, cfg_schema, schema_type):
643         """Check if config file is using the correct schema type"""
644
645         if cfg_schema != "yardstick:" + schema_type + ":0.1":
646             sys.exit("error: file %s has unknown schema %s" % (self.path,
647                                                                cfg_schema))
648
649     def _check_precondition(self, cfg):
650         """Check if the environment meet the precondition"""
651
652         if "precondition" in cfg:
653             precondition = cfg["precondition"]
654             installer_type = precondition.get("installer_type", None)
655             deploy_scenarios = precondition.get("deploy_scenarios", None)
656             tc_fit_pods = precondition.get("pod_name", None)
657             installer_type_env = os.environ.get('INSTALL_TYPE', None)
658             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
659             pod_name_env = os.environ.get('NODE_NAME', None)
660
661             LOG.info("installer_type: %s, installer_type_env: %s",
662                      installer_type, installer_type_env)
663             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
664                      deploy_scenarios, deploy_scenario_env)
665             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
666                      tc_fit_pods, pod_name_env)
667             if installer_type and installer_type_env:
668                 if installer_type_env not in installer_type:
669                     return False
670             if deploy_scenarios and deploy_scenario_env:
671                 deploy_scenarios_list = deploy_scenarios.split(',')
672                 for deploy_scenario in deploy_scenarios_list:
673                     if deploy_scenario_env.startswith(deploy_scenario):
674                         return True
675                 return False
676             if tc_fit_pods and pod_name_env:
677                 if pod_name_env not in tc_fit_pods:
678                     return False
679         return True
680
681
682 def is_ip_addr(addr):
683     """check if string addr is an IP address"""
684     try:
685         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
686     except AttributeError:
687         pass
688
689     try:
690         ipaddress.ip_address(addr.encode('utf-8'))
691     except ValueError:
692         return False
693     else:
694         return True
695
696
697 def _is_background_scenario(scenario):
698     if "run_in_background" in scenario:
699         return scenario["run_in_background"]
700     else:
701         return False
702
703
704 def parse_nodes_with_context(scenario_cfg):
705     """parse the 'nodes' fields in scenario """
706     # ensure consistency in node instantiation order
707     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
708                        for nodename in sorted(scenario_cfg["nodes"]))
709
710
711 def get_networks_from_nodes(nodes):
712     """parse the 'nodes' fields in scenario """
713     networks = {}
714     for node in nodes.values():
715         if not node:
716             continue
717         interfaces = node.get('interfaces', {})
718         for interface in interfaces.values():
719             # vld_id is network_name
720             network_name = interface.get('network_name')
721             if not network_name:
722                 continue
723             network = Context.get_network(network_name)
724             if network:
725                 networks[network['name']] = network
726     return networks
727
728
729 def runner_join(runner, background_runners, outputs, result):
730     """join (wait for) a runner, exit process at runner failure
731     :param background_runners:
732     :type background_runners:
733     :param outputs:
734     :type outputs: dict
735     :param result:
736     :type result: list
737     """
738     while runner.poll() is None:
739         outputs.update(runner.get_output())
740         result.extend(runner.get_result())
741         # drain all the background runner queues
742         for background in background_runners:
743             outputs.update(background.get_output())
744             result.extend(background.get_result())
745     status = runner.join(outputs, result)
746     base_runner.Runner.release(runner)
747     return status
748
749
750 def print_invalid_header(source_name, args):
751     print("Invalid %(source)s passed:\n\n %(args)s\n"
752           % {"source": source_name, "args": args})
753
754
755 def parse_task_args(src_name, args):
756     if isinstance(args, collections.Mapping):
757         return args
758
759     try:
760         kw = args and yaml_load(args)
761         kw = {} if kw is None else kw
762     except yaml.parser.ParserError as e:
763         print_invalid_header(src_name, args)
764         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
765               % {"source": src_name, "err": e})
766         raise TypeError()
767
768     if not isinstance(kw, dict):
769         print_invalid_header(src_name, args)
770         print("%(src)s had to be dict, actually %(src_type)s\n"
771               % {"src": src_name, "src_type": type(kw)})
772         raise TypeError()
773     return kw