Merge "Install RabitMQ for RPC messaging between processes"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import sys
11 import os
12 from collections import OrderedDict
13
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 import uuid
20 import collections
21
22 from six.moves import filter
23 from jinja2 import Environment
24
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.constants import CONF_FILE
28 from yardstick.common.yaml_loader import yaml_load
29 from yardstick.dispatcher.base import Base as DispatcherBase
30 from yardstick.common import constants
31 from yardstick.common import exceptions as y_exc
32 from yardstick.common import task_template
33 from yardstick.common import utils
34 from yardstick.common.html_template import report_template
35
36 output_file_default = "/tmp/yardstick.out"
37 test_cases_dir_default = "tests/opnfv/test_cases/"
38 LOG = logging.getLogger(__name__)
39
40
41 class Task(object):     # pragma: no cover
42     """Task commands.
43
44        Set of commands to manage benchmark tasks.
45     """
46
47     def __init__(self):
48         self.contexts = []
49         self.outputs = {}
50
51     def _set_dispatchers(self, output_config):
52         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
53                                                            'file')
54         out_types = [s.strip() for s in dispatchers.split(',')]
55         output_config['DEFAULT']['dispatcher'] = out_types
56
57     def start(self, args, **kwargs):  # pylint: disable=unused-argument
58         """Start a benchmark scenario."""
59
60         atexit.register(self.atexit_handler)
61
62         task_id = getattr(args, 'task_id')
63         self.task_id = task_id if task_id else str(uuid.uuid4())
64
65         self._set_log()
66
67         try:
68             output_config = utils.parse_ini_file(CONF_FILE)
69         except Exception:  # pylint: disable=broad-except
70             # all error will be ignore, the default value is {}
71             output_config = {}
72
73         self._init_output_config(output_config)
74         self._set_output_config(output_config, args.output_file)
75         LOG.debug('Output configuration is: %s', output_config)
76
77         self._set_dispatchers(output_config)
78
79         # update dispatcher list
80         if 'file' in output_config['DEFAULT']['dispatcher']:
81             result = {'status': 0, 'result': {}}
82             utils.write_json_to_file(args.output_file, result)
83
84         total_start_time = time.time()
85         parser = TaskParser(args.inputfile[0])
86
87         if args.suite:
88             # 1.parse suite, return suite_params info
89             task_files, task_args, task_args_fnames = parser.parse_suite()
90         else:
91             task_files = [parser.path]
92             task_args = [args.task_args]
93             task_args_fnames = [args.task_args_file]
94
95         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
96                   task_files, task_args, task_args_fnames)
97
98         if args.parse_only:
99             sys.exit(0)
100
101         testcases = {}
102         tasks = self._parse_tasks(parser, task_files, args, task_args,
103                                   task_args_fnames)
104
105         # Execute task files.
106         for i, _ in enumerate(task_files):
107             one_task_start_time = time.time()
108             self.contexts.extend(tasks[i]['contexts'])
109             if not tasks[i]['meet_precondition']:
110                 LOG.info('"meet_precondition" is %s, please check environment',
111                          tasks[i]['meet_precondition'])
112                 continue
113
114             try:
115                 data = self._run(tasks[i]['scenarios'],
116                                  tasks[i]['run_in_parallel'],
117                                  output_config)
118             except KeyboardInterrupt:
119                 raise
120             except Exception:  # pylint: disable=broad-except
121                 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
122                           exc_info=True)
123                 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
124                                                     'tc_data': []}
125             else:
126                 LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name'])
127                 testcases[tasks[i]['case_name']] = {'criteria': 'PASS',
128                                                     'tc_data': data}
129
130             if args.keep_deploy:
131                 # keep deployment, forget about stack
132                 # (hide it for exit handler)
133                 self.contexts = []
134             else:
135                 for context in self.contexts[::-1]:
136                     context.undeploy()
137                 self.contexts = []
138             one_task_end_time = time.time()
139             LOG.info("Task %s finished in %d secs", task_files[i],
140                      one_task_end_time - one_task_start_time)
141
142         result = self._get_format_result(testcases)
143
144         self._do_output(output_config, result)
145         self._generate_reporting(result)
146
147         total_end_time = time.time()
148         LOG.info("Total finished in %d secs",
149                  total_end_time - total_start_time)
150
151         LOG.info('To generate report, execute command "yardstick report '
152                  'generate %s <YAML_NAME>"', self.task_id)
153         LOG.info("Task ALL DONE, exiting")
154         return result
155
156     def _generate_reporting(self, result):
157         env = Environment()
158         with open(constants.REPORTING_FILE, 'w') as f:
159             f.write(env.from_string(report_template).render(result))
160
161         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
162
163     def _set_log(self):
164         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
165         log_formatter = logging.Formatter(log_format)
166
167         utils.makedirs(constants.TASK_LOG_DIR)
168         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
169         log_handler = logging.FileHandler(log_path)
170         log_handler.setFormatter(log_formatter)
171         log_handler.setLevel(logging.DEBUG)
172
173         logging.root.addHandler(log_handler)
174
175     def _init_output_config(self, output_config):
176         output_config.setdefault('DEFAULT', {})
177         output_config.setdefault('dispatcher_http', {})
178         output_config.setdefault('dispatcher_file', {})
179         output_config.setdefault('dispatcher_influxdb', {})
180         output_config.setdefault('nsb', {})
181
182     def _set_output_config(self, output_config, file_path):
183         try:
184             out_type = os.environ['DISPATCHER']
185         except KeyError:
186             output_config['DEFAULT'].setdefault('dispatcher', 'file')
187         else:
188             output_config['DEFAULT']['dispatcher'] = out_type
189
190         output_config['dispatcher_file']['file_path'] = file_path
191
192         try:
193             target = os.environ['TARGET']
194         except KeyError:
195             pass
196         else:
197             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
198             output_config[k]['target'] = target
199
200     def _get_format_result(self, testcases):
201         criteria = self._get_task_criteria(testcases)
202
203         info = {
204             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
205             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
206             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
207             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
208         }
209
210         result = {
211             'status': 1,
212             'result': {
213                 'criteria': criteria,
214                 'task_id': self.task_id,
215                 'info': info,
216                 'testcases': testcases
217             }
218         }
219
220         return result
221
222     def _get_task_criteria(self, testcases):
223         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
224         if criteria:
225             return 'FAIL'
226         else:
227             return 'PASS'
228
229     def _do_output(self, output_config, result):
230         dispatchers = DispatcherBase.get(output_config)
231         dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
232
233         for dispatcher in dispatchers:
234             dispatcher.flush_result_data(result)
235
236     def _run(self, scenarios, run_in_parallel, output_config):
237         """Deploys context and calls runners"""
238         for context in self.contexts:
239             context.deploy()
240
241         background_runners = []
242
243         result = []
244         # Start all background scenarios
245         for scenario in filter(_is_background_scenario, scenarios):
246             scenario["runner"] = dict(type="Duration", duration=1000000000)
247             runner = self.run_one_scenario(scenario, output_config)
248             background_runners.append(runner)
249
250         runners = []
251         if run_in_parallel:
252             for scenario in scenarios:
253                 if not _is_background_scenario(scenario):
254                     runner = self.run_one_scenario(scenario, output_config)
255                     runners.append(runner)
256
257             # Wait for runners to finish
258             for runner in runners:
259                 status = runner_join(runner, background_runners, self.outputs, result)
260                 if status != 0:
261                     raise RuntimeError(
262                         "{0} runner status {1}".format(runner.__execution_type__, status))
263                 LOG.info("Runner ended")
264         else:
265             # run serially
266             for scenario in scenarios:
267                 if not _is_background_scenario(scenario):
268                     runner = self.run_one_scenario(scenario, output_config)
269                     status = runner_join(runner, background_runners, self.outputs, result)
270                     if status != 0:
271                         LOG.error('Scenario NO.%s: "%s" ERROR!',
272                                   scenarios.index(scenario) + 1,
273                                   scenario.get('type'))
274                         raise RuntimeError(
275                             "{0} runner status {1}".format(runner.__execution_type__, status))
276                     LOG.info("Runner ended")
277
278         # Abort background runners
279         for runner in background_runners:
280             runner.abort()
281
282         # Wait for background runners to finish
283         for runner in background_runners:
284             status = runner.join(self.outputs, result)
285             if status is None:
286                 # Nuke if it did not stop nicely
287                 base_runner.Runner.terminate(runner)
288                 runner.join(self.outputs, result)
289             base_runner.Runner.release(runner)
290
291             print("Background task ended")
292         return result
293
294     def atexit_handler(self):
295         """handler for process termination"""
296         base_runner.Runner.terminate_all()
297
298         if self.contexts:
299             LOG.info("Undeploying all contexts")
300             for context in self.contexts[::-1]:
301                 context.undeploy()
302
303     def _parse_options(self, op):
304         if isinstance(op, dict):
305             return {k: self._parse_options(v) for k, v in op.items()}
306         elif isinstance(op, list):
307             return [self._parse_options(v) for v in op]
308         elif isinstance(op, str):
309             return self.outputs.get(op[1:]) if op.startswith('$') else op
310         else:
311             return op
312
313     def _parse_tasks(self, parser, task_files, args, task_args,
314                      task_args_fnames):
315         tasks = []
316
317         # Parse task_files.
318         for i, _ in enumerate(task_files):
319             parser.path = task_files[i]
320             tasks.append(parser.parse_task(self.task_id, task_args[i],
321                                            task_args_fnames[i]))
322             tasks[i]['case_name'] = os.path.splitext(
323                 os.path.basename(task_files[i]))[0]
324
325         if args.render_only:
326             utils.makedirs(args.render_only)
327             for idx, task in enumerate(tasks):
328                 output_file_name = os.path.abspath(os.path.join(
329                     args.render_only,
330                     '{0:03d}-{1}.yml'.format(idx, task['case_name'])))
331                 utils.write_file(output_file_name, task['rendered'])
332
333             sys.exit(0)
334
335         return tasks
336
337     def run_one_scenario(self, scenario_cfg, output_config):
338         """run one scenario using context"""
339         runner_cfg = scenario_cfg["runner"]
340         runner_cfg['output_config'] = output_config
341
342         options = scenario_cfg.get('options', {})
343         scenario_cfg['options'] = self._parse_options(options)
344
345         # TODO support get multi hosts/vms info
346         context_cfg = {}
347         options = scenario_cfg.get('options') or {}
348         server_name = options.get('server_name') or {}
349
350         def config_context_target(cfg):
351             target = cfg['target']
352             if is_ip_addr(target):
353                 context_cfg['target'] = {"ipaddr": target}
354             else:
355                 context_cfg['target'] = Context.get_server(target)
356                 if self._is_same_context(cfg["host"], target):
357                     context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
358                 else:
359                     context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
360
361         host_name = server_name.get('host', scenario_cfg.get('host'))
362         if host_name:
363             context_cfg['host'] = Context.get_server(host_name)
364
365         for item in [server_name, scenario_cfg]:
366             try:
367                 config_context_target(item)
368             except KeyError:
369                 LOG.debug("Got a KeyError in config_context_target(%s)", item)
370             else:
371                 break
372
373         if "targets" in scenario_cfg:
374             ip_list = []
375             for target in scenario_cfg["targets"]:
376                 if is_ip_addr(target):
377                     ip_list.append(target)
378                     context_cfg['target'] = {}
379                 else:
380                     context_cfg['target'] = Context.get_server(target)
381                     if self._is_same_context(scenario_cfg["host"],
382                                              target):
383                         ip_list.append(context_cfg["target"]["private_ip"])
384                     else:
385                         ip_list.append(context_cfg["target"]["ip"])
386             context_cfg['target']['ipaddr'] = ','.join(ip_list)
387
388         if "nodes" in scenario_cfg:
389             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
390             context_cfg["networks"] = get_networks_from_nodes(
391                 context_cfg["nodes"])
392
393         runner = base_runner.Runner.get(runner_cfg)
394
395         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
396         runner.run(scenario_cfg, context_cfg)
397
398         return runner
399
400     def _is_same_context(self, host_attr, target_attr):
401         """check if two servers are in the same heat context
402         host_attr: either a name for a server created by yardstick or a dict
403         with attribute name mapping when using external heat templates
404         target_attr: either a name for a server created by yardstick or a dict
405         with attribute name mapping when using external heat templates
406         """
407         for context in self.contexts:
408             if context.__context_type__ not in {"Heat", "Kubernetes"}:
409                 continue
410
411             host = context._get_server(host_attr)
412             if host is None:
413                 continue
414
415             target = context._get_server(target_attr)
416             if target is None:
417                 return False
418
419             # Both host and target is not None, then they are in the
420             # same heat context.
421             return True
422
423         return False
424
425
426 class TaskParser(object):       # pragma: no cover
427     """Parser for task config files in yaml format"""
428
429     def __init__(self, path):
430         self.path = path
431
432     def _meet_constraint(self, task, cur_pod, cur_installer):
433         if "constraint" in task:
434             constraint = task.get('constraint', None)
435             if constraint is not None:
436                 tc_fit_pod = constraint.get('pod', None)
437                 tc_fit_installer = constraint.get('installer', None)
438                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
439                          cur_pod, cur_installer, constraint)
440                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
441                     return False
442                 if (cur_installer is None) or (tc_fit_installer and cur_installer
443                                                not in tc_fit_installer):
444                     return False
445         return True
446
447     def _get_task_para(self, task, cur_pod):
448         task_args = task.get('task_args', None)
449         if task_args is not None:
450             task_args = task_args.get(cur_pod, task_args.get('default'))
451         task_args_fnames = task.get('task_args_fnames', None)
452         if task_args_fnames is not None:
453             task_args_fnames = task_args_fnames.get(cur_pod, None)
454         return task_args, task_args_fnames
455
456     def parse_suite(self):
457         """parse the suite file and return a list of task config file paths
458            and lists of optional parameters if present"""
459         LOG.info("\nParsing suite file:%s", self.path)
460
461         try:
462             with open(self.path) as stream:
463                 cfg = yaml_load(stream)
464         except IOError as ioerror:
465             sys.exit(ioerror)
466
467         self._check_schema(cfg["schema"], "suite")
468         LOG.info("\nStarting scenario:%s", cfg["name"])
469
470         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
471         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
472                                       test_cases_dir)
473         if test_cases_dir[-1] != os.sep:
474             test_cases_dir += os.sep
475
476         cur_pod = os.environ.get('NODE_NAME', None)
477         cur_installer = os.environ.get('INSTALLER_TYPE', None)
478
479         valid_task_files = []
480         valid_task_args = []
481         valid_task_args_fnames = []
482
483         for task in cfg["test_cases"]:
484             # 1.check file_name
485             if "file_name" in task:
486                 task_fname = task.get('file_name', None)
487                 if task_fname is None:
488                     continue
489             else:
490                 continue
491             # 2.check constraint
492             if self._meet_constraint(task, cur_pod, cur_installer):
493                 valid_task_files.append(test_cases_dir + task_fname)
494             else:
495                 continue
496             # 3.fetch task parameters
497             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
498             valid_task_args.append(task_args)
499             valid_task_args_fnames.append(task_args_fnames)
500
501         return valid_task_files, valid_task_args, valid_task_args_fnames
502
503     def _render_task(self, task_args, task_args_file):
504         """Render the input task with the given arguments
505
506         :param task_args: (dict) arguments to render the task
507         :param task_args_file: (str) file containing the arguments to render
508                                the task
509         :return: (str) task file rendered
510         """
511         try:
512             kw = {}
513             if task_args_file:
514                 with open(task_args_file) as f:
515                     kw.update(parse_task_args('task_args_file', f.read()))
516             kw.update(parse_task_args('task_args', task_args))
517         except TypeError:
518             raise y_exc.TaskRenderArgumentError()
519
520         input_task = None
521         try:
522             with open(self.path) as f:
523                 input_task = f.read()
524             rendered_task = task_template.TaskTemplate.render(input_task, **kw)
525             LOG.debug('Input task is:\n%s', rendered_task)
526             parsed_task = yaml_load(rendered_task)
527         except (IOError, OSError):
528             raise y_exc.TaskReadError(task_file=self.path)
529         except Exception:
530             raise y_exc.TaskRenderError(input_task=input_task)
531
532         return parsed_task, rendered_task
533
534     def parse_task(self, task_id, task_args=None, task_args_file=None):
535         """parses the task file and return an context and scenario instances"""
536         LOG.info("Parsing task config: %s", self.path)
537
538         cfg, rendered = self._render_task(task_args, task_args_file)
539         self._check_schema(cfg["schema"], "task")
540         meet_precondition = self._check_precondition(cfg)
541
542         # TODO: support one or many contexts? Many would simpler and precise
543         # TODO: support hybrid context type
544         if "context" in cfg:
545             context_cfgs = [cfg["context"]]
546         elif "contexts" in cfg:
547             context_cfgs = cfg["contexts"]
548         else:
549             context_cfgs = [{"type": "Dummy"}]
550
551         contexts = []
552         for cfg_attrs in context_cfgs:
553
554             cfg_attrs['task_id'] = task_id
555             # default to Heat context because we are testing OpenStack
556             context_type = cfg_attrs.get("type", "Heat")
557             context = Context.get(context_type)
558             context.init(cfg_attrs)
559             # Update the name in case the context has used the name_suffix
560             cfg_attrs['name'] = context.name
561             contexts.append(context)
562
563         run_in_parallel = cfg.get("run_in_parallel", False)
564
565         # add tc and task id for influxdb extended tags
566         for scenario in cfg["scenarios"]:
567             task_name = os.path.splitext(os.path.basename(self.path))[0]
568             scenario["tc"] = task_name
569             scenario["task_id"] = task_id
570             # embed task path into scenario so we can load other files
571             # relative to task path
572             scenario["task_path"] = os.path.dirname(self.path)
573
574             self._change_node_names(scenario, contexts)
575
576         # TODO we need something better here, a class that represent the file
577         return {'scenarios': cfg['scenarios'],
578                 'run_in_parallel': run_in_parallel,
579                 'meet_precondition': meet_precondition,
580                 'contexts': contexts,
581                 'rendered': rendered}
582
583     @staticmethod
584     def _change_node_names(scenario, contexts):
585         """Change the node names in a scenario, depending on the context config
586
587         The nodes (VMs or physical servers) are referred in the context section
588         with the name of the server and the name of the context:
589             <server name>.<context name>
590
591         If the context is going to be undeployed at the end of the test, the
592         task ID is suffixed to the name to avoid interferences with previous
593         deployments. If the context needs to be deployed at the end of the
594         test, the name assigned is kept.
595
596         There are several places where a node name could appear in the scenario
597         configuration:
598         scenario:
599           host: athena.demo
600           target: kratos.demo
601           targets:
602             - athena.demo
603             - kratos.demo
604
605         scenario:
606           options:
607             server_name:  # JIRA: YARDSTICK-810
608               host: athena.demo
609               target: kratos.demo
610
611         scenario:
612           nodes:
613             tg__0: tg_0.yardstick
614             vnf__0: vnf_0.yardstick
615         """
616         def qualified_name(name):
617             try:
618                 # for openstack
619                 node_name, context_name = name.split('.')
620                 sep = '.'
621             except ValueError:
622                 # for kubernetes, some kubernetes resources don't support
623                 # name format like 'xxx.xxx', so we use '-' instead
624                 # need unified later
625                 node_name, context_name = name.split('-')
626                 sep = '-'
627
628             try:
629                 ctx = next((context for context in contexts
630                             if context.assigned_name == context_name))
631             except StopIteration:
632                 raise y_exc.ScenarioConfigContextNameNotFound(
633                     context_name=context_name)
634
635             return '{}{}{}'.format(node_name, sep, ctx.name)
636
637         if 'host' in scenario:
638             scenario['host'] = qualified_name(scenario['host'])
639         if 'target' in scenario:
640             scenario['target'] = qualified_name(scenario['target'])
641         options = scenario.get('options') or {}
642         server_name = options.get('server_name') or {}
643         if 'host' in server_name:
644             server_name['host'] = qualified_name(server_name['host'])
645         if 'target' in server_name:
646             server_name['target'] = qualified_name(server_name['target'])
647         if 'targets' in scenario:
648             for idx, target in enumerate(scenario['targets']):
649                 scenario['targets'][idx] = qualified_name(target)
650         if 'nodes' in scenario:
651             for scenario_node, target in scenario['nodes'].items():
652                 scenario['nodes'][scenario_node] = qualified_name(target)
653
654     def _check_schema(self, cfg_schema, schema_type):
655         """Check if config file is using the correct schema type"""
656
657         if cfg_schema != "yardstick:" + schema_type + ":0.1":
658             sys.exit("error: file %s has unknown schema %s" % (self.path,
659                                                                cfg_schema))
660
661     def _check_precondition(self, cfg):
662         """Check if the environment meet the precondition"""
663
664         if "precondition" in cfg:
665             precondition = cfg["precondition"]
666             installer_type = precondition.get("installer_type", None)
667             deploy_scenarios = precondition.get("deploy_scenarios", None)
668             tc_fit_pods = precondition.get("pod_name", None)
669             installer_type_env = os.environ.get('INSTALL_TYPE', None)
670             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
671             pod_name_env = os.environ.get('NODE_NAME', None)
672
673             LOG.info("installer_type: %s, installer_type_env: %s",
674                      installer_type, installer_type_env)
675             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
676                      deploy_scenarios, deploy_scenario_env)
677             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
678                      tc_fit_pods, pod_name_env)
679             if installer_type and installer_type_env:
680                 if installer_type_env not in installer_type:
681                     return False
682             if deploy_scenarios and deploy_scenario_env:
683                 deploy_scenarios_list = deploy_scenarios.split(',')
684                 for deploy_scenario in deploy_scenarios_list:
685                     if deploy_scenario_env.startswith(deploy_scenario):
686                         return True
687                 return False
688             if tc_fit_pods and pod_name_env:
689                 if pod_name_env not in tc_fit_pods:
690                     return False
691         return True
692
693
694 def is_ip_addr(addr):
695     """check if string addr is an IP address"""
696     try:
697         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
698     except AttributeError:
699         pass
700
701     try:
702         ipaddress.ip_address(addr.encode('utf-8'))
703     except ValueError:
704         return False
705     else:
706         return True
707
708
709 def _is_background_scenario(scenario):
710     if "run_in_background" in scenario:
711         return scenario["run_in_background"]
712     else:
713         return False
714
715
716 def parse_nodes_with_context(scenario_cfg):
717     """parse the 'nodes' fields in scenario """
718     # ensure consistency in node instantiation order
719     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
720                        for nodename in sorted(scenario_cfg["nodes"]))
721
722
723 def get_networks_from_nodes(nodes):
724     """parse the 'nodes' fields in scenario """
725     networks = {}
726     for node in nodes.values():
727         if not node:
728             continue
729         interfaces = node.get('interfaces', {})
730         for interface in interfaces.values():
731             # vld_id is network_name
732             network_name = interface.get('network_name')
733             if not network_name:
734                 continue
735             network = Context.get_network(network_name)
736             if network:
737                 networks[network['name']] = network
738     return networks
739
740
741 def runner_join(runner, background_runners, outputs, result):
742     """join (wait for) a runner, exit process at runner failure
743     :param background_runners:
744     :type background_runners:
745     :param outputs:
746     :type outputs: dict
747     :param result:
748     :type result: list
749     """
750     while runner.poll() is None:
751         outputs.update(runner.get_output())
752         result.extend(runner.get_result())
753         # drain all the background runner queues
754         for background in background_runners:
755             outputs.update(background.get_output())
756             result.extend(background.get_result())
757     status = runner.join(outputs, result)
758     base_runner.Runner.release(runner)
759     return status
760
761
762 def print_invalid_header(source_name, args):
763     print("Invalid %(source)s passed:\n\n %(args)s\n"
764           % {"source": source_name, "args": args})
765
766
767 def parse_task_args(src_name, args):
768     if isinstance(args, collections.Mapping):
769         return args
770
771     try:
772         kw = args and yaml_load(args)
773         kw = {} if kw is None else kw
774     except yaml.parser.ParserError as e:
775         print_invalid_header(src_name, args)
776         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
777               % {"source": src_name, "err": e})
778         raise TypeError()
779
780     if not isinstance(kw, dict):
781         print_invalid_header(src_name, args)
782         print("%(src)s had to be dict, actually %(src_type)s\n"
783               % {"src": src_name, "src_type": type(kw)})
784         raise TypeError()
785     return kw