Merge "[workaround] Disable gui for aarch64"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import sys
11 import os
12 from collections import OrderedDict
13
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 import uuid
20 import collections
21
22 from six.moves import filter
23 from jinja2 import Environment
24
25 from yardstick.benchmark import contexts
26 from yardstick.benchmark.contexts import base as base_context
27 from yardstick.benchmark.runners import base as base_runner
28 from yardstick.common.constants import CONF_FILE
29 from yardstick.common.yaml_loader import yaml_load
30 from yardstick.dispatcher.base import Base as DispatcherBase
31 from yardstick.common import constants
32 from yardstick.common import exceptions as y_exc
33 from yardstick.common import task_template
34 from yardstick.common import utils
35 from yardstick.common.html_template import report_template
36
37 output_file_default = "/tmp/yardstick.out"
38 test_cases_dir_default = "tests/opnfv/test_cases/"
39 LOG = logging.getLogger(__name__)
40
41
42 class Task(object):     # pragma: no cover
43     """Task commands.
44
45        Set of commands to manage benchmark tasks.
46     """
47
48     def __init__(self):
49         self.contexts = []
50         self.outputs = {}
51
52     def _set_dispatchers(self, output_config):
53         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
54                                                            'file')
55         out_types = [s.strip() for s in dispatchers.split(',')]
56         output_config['DEFAULT']['dispatcher'] = out_types
57
58     def start(self, args, **kwargs):  # pylint: disable=unused-argument
59         """Start a benchmark scenario."""
60
61         atexit.register(self.atexit_handler)
62
63         task_id = getattr(args, 'task_id')
64         self.task_id = task_id if task_id else str(uuid.uuid4())
65
66         self._set_log()
67
68         try:
69             output_config = utils.parse_ini_file(CONF_FILE)
70         except Exception:  # pylint: disable=broad-except
71             # all error will be ignore, the default value is {}
72             output_config = {}
73
74         self._init_output_config(output_config)
75         self._set_output_config(output_config, args.output_file)
76         LOG.debug('Output configuration is: %s', output_config)
77
78         self._set_dispatchers(output_config)
79
80         # update dispatcher list
81         if 'file' in output_config['DEFAULT']['dispatcher']:
82             result = {'status': 0, 'result': {}}
83             utils.write_json_to_file(args.output_file, result)
84
85         total_start_time = time.time()
86         parser = TaskParser(args.inputfile[0])
87
88         if args.suite:
89             # 1.parse suite, return suite_params info
90             task_files, task_args, task_args_fnames = parser.parse_suite()
91         else:
92             task_files = [parser.path]
93             task_args = [args.task_args]
94             task_args_fnames = [args.task_args_file]
95
96         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
97                   task_files, task_args, task_args_fnames)
98
99         if args.parse_only:
100             sys.exit(0)
101
102         testcases = {}
103         tasks = self._parse_tasks(parser, task_files, args, task_args,
104                                   task_args_fnames)
105
106         # Execute task files.
107         for i, _ in enumerate(task_files):
108             one_task_start_time = time.time()
109             self.contexts.extend(tasks[i]['contexts'])
110             if not tasks[i]['meet_precondition']:
111                 LOG.info('"meet_precondition" is %s, please check environment',
112                          tasks[i]['meet_precondition'])
113                 continue
114
115             try:
116                 success, data = self._run(tasks[i]['scenarios'],
117                                           tasks[i]['run_in_parallel'],
118                                           output_config)
119             except KeyboardInterrupt:
120                 raise
121             except Exception:  # pylint: disable=broad-except
122                 LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
123                           exc_info=True)
124                 testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
125                                                     'tc_data': []}
126             else:
127                 if success:
128                     LOG.info('Testcase: "%s" SUCCESS!!!', tasks[i]['case_name'])
129                     testcases[tasks[i]['case_name']] = {'criteria': 'PASS',
130                                                         'tc_data': data}
131                 else:
132                     LOG.error('Testcase: "%s" FAILED!!!', tasks[i]['case_name'],
133                               exc_info=True)
134                     testcases[tasks[i]['case_name']] = {'criteria': 'FAIL',
135                                                         'tc_data': data}
136
137             if args.keep_deploy:
138                 # keep deployment, forget about stack
139                 # (hide it for exit handler)
140                 self.contexts = []
141             else:
142                 for context in self.contexts[::-1]:
143                     context.undeploy()
144                 self.contexts = []
145             one_task_end_time = time.time()
146             LOG.info("Task %s finished in %d secs", task_files[i],
147                      one_task_end_time - one_task_start_time)
148
149         result = self._get_format_result(testcases)
150
151         self._do_output(output_config, result)
152         self._generate_reporting(result)
153
154         total_end_time = time.time()
155         LOG.info("Total finished in %d secs",
156                  total_end_time - total_start_time)
157
158         LOG.info('To generate report, execute command "yardstick report '
159                  'generate %s <YAML_NAME>"', self.task_id)
160         LOG.info("Task ALL DONE, exiting")
161         return result
162
163     def _generate_reporting(self, result):
164         env = Environment()
165         with open(constants.REPORTING_FILE, 'w') as f:
166             f.write(env.from_string(report_template).render(result))
167
168         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
169
170     def _set_log(self):
171         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
172         log_formatter = logging.Formatter(log_format)
173
174         utils.makedirs(constants.TASK_LOG_DIR)
175         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
176         log_handler = logging.FileHandler(log_path)
177         log_handler.setFormatter(log_formatter)
178         log_handler.setLevel(logging.DEBUG)
179
180         logging.root.addHandler(log_handler)
181
182     def _init_output_config(self, output_config):
183         output_config.setdefault('DEFAULT', {})
184         output_config.setdefault('dispatcher_http', {})
185         output_config.setdefault('dispatcher_file', {})
186         output_config.setdefault('dispatcher_influxdb', {})
187         output_config.setdefault('nsb', {})
188
189     def _set_output_config(self, output_config, file_path):
190         try:
191             out_type = os.environ['DISPATCHER']
192         except KeyError:
193             output_config['DEFAULT'].setdefault('dispatcher', 'file')
194         else:
195             output_config['DEFAULT']['dispatcher'] = out_type
196
197         output_config['dispatcher_file']['file_path'] = file_path
198
199         try:
200             target = os.environ['TARGET']
201         except KeyError:
202             pass
203         else:
204             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
205             output_config[k]['target'] = target
206
207     def _get_format_result(self, testcases):
208         criteria = self._get_task_criteria(testcases)
209
210         info = {
211             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
212             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
213             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
214             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
215         }
216
217         result = {
218             'status': 1,
219             'result': {
220                 'criteria': criteria,
221                 'task_id': self.task_id,
222                 'info': info,
223                 'testcases': testcases
224             }
225         }
226
227         return result
228
229     def _get_task_criteria(self, testcases):
230         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
231         if criteria:
232             return 'FAIL'
233         else:
234             return 'PASS'
235
236     def _do_output(self, output_config, result):
237         dispatchers = DispatcherBase.get(output_config)
238         dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
239
240         for dispatcher in dispatchers:
241             dispatcher.flush_result_data(result)
242
243     def _run(self, scenarios, run_in_parallel, output_config):
244         """Deploys context and calls runners"""
245         for context in self.contexts:
246             context.deploy()
247
248         background_runners = []
249
250         task_success = True
251         result = []
252         # Start all background scenarios
253         for scenario in filter(_is_background_scenario, scenarios):
254             scenario["runner"] = dict(type="Duration", duration=1000000000)
255             runner = self.run_one_scenario(scenario, output_config)
256             background_runners.append(runner)
257
258         runners = []
259         if run_in_parallel:
260             for scenario in scenarios:
261                 if not _is_background_scenario(scenario):
262                     runner = self.run_one_scenario(scenario, output_config)
263                     runners.append(runner)
264
265             # Wait for runners to finish
266             for runner in runners:
267                 status = runner_join(runner, background_runners, self.outputs, result)
268                 if status != 0:
269                     LOG.error("%s runner status %s", runner.__execution_type__, status)
270                     task_success = False
271                 LOG.info("Runner ended")
272         else:
273             # run serially
274             for scenario in scenarios:
275                 if not _is_background_scenario(scenario):
276                     runner = self.run_one_scenario(scenario, output_config)
277                     status = runner_join(runner, background_runners, self.outputs, result)
278                     if status != 0:
279                         LOG.error('Scenario NO.%s: "%s" ERROR!',
280                                   scenarios.index(scenario) + 1,
281                                   scenario.get('type'))
282                         LOG.error("%s runner status %s", runner.__execution_type__, status)
283                         task_success = False
284                     LOG.info("Runner ended")
285
286         # Abort background runners
287         for runner in background_runners:
288             runner.abort()
289
290         # Wait for background runners to finish
291         for runner in background_runners:
292             status = runner.join(self.outputs, result)
293             if status is None:
294                 # Nuke if it did not stop nicely
295                 base_runner.Runner.terminate(runner)
296                 runner.join(self.outputs, result)
297             base_runner.Runner.release(runner)
298
299             print("Background task ended")
300         return task_success, result
301
302     def atexit_handler(self):
303         """handler for process termination"""
304         base_runner.Runner.terminate_all()
305
306         if self.contexts:
307             LOG.info("Undeploying all contexts")
308             for context in self.contexts[::-1]:
309                 context.undeploy()
310
311     def _parse_options(self, op):
312         if isinstance(op, dict):
313             return {k: self._parse_options(v) for k, v in op.items()}
314         elif isinstance(op, list):
315             return [self._parse_options(v) for v in op]
316         elif isinstance(op, str):
317             return self.outputs.get(op[1:]) if op.startswith('$') else op
318         else:
319             return op
320
321     def _parse_tasks(self, parser, task_files, args, task_args,
322                      task_args_fnames):
323         tasks = []
324
325         # Parse task_files.
326         for i, _ in enumerate(task_files):
327             parser.path = task_files[i]
328             tasks.append(parser.parse_task(self.task_id, task_args[i],
329                                            task_args_fnames[i]))
330             tasks[i]['case_name'] = os.path.splitext(
331                 os.path.basename(task_files[i]))[0]
332
333         if args.render_only:
334             utils.makedirs(args.render_only)
335             for idx, task in enumerate(tasks):
336                 output_file_name = os.path.abspath(os.path.join(
337                     args.render_only,
338                     '{0:03d}-{1}.yml'.format(idx, task['case_name'])))
339                 utils.write_file(output_file_name, task['rendered'])
340
341             sys.exit(0)
342
343         return tasks
344
345     def run_one_scenario(self, scenario_cfg, output_config):
346         """run one scenario using context"""
347         runner_cfg = scenario_cfg["runner"]
348         runner_cfg['output_config'] = output_config
349
350         options = scenario_cfg.get('options', {})
351         scenario_cfg['options'] = self._parse_options(options)
352
353         # TODO support get multi hosts/vms info
354         context_cfg = {}
355         options = scenario_cfg.get('options') or {}
356         server_name = options.get('server_name') or {}
357
358         def config_context_target(cfg):
359             target = cfg['target']
360             if is_ip_addr(target):
361                 context_cfg['target'] = {"ipaddr": target}
362             else:
363                 context_cfg['target'] = base_context.Context.get_server(target)
364                 if self._is_same_context(cfg["host"], target):
365                     context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
366                 else:
367                     context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
368
369         host_name = server_name.get('host', scenario_cfg.get('host'))
370         if host_name:
371             context_cfg['host'] = base_context.Context.get_server(host_name)
372
373         for item in [server_name, scenario_cfg]:
374             try:
375                 config_context_target(item)
376             except KeyError:
377                 LOG.debug("Got a KeyError in config_context_target(%s)", item)
378             else:
379                 break
380
381         if "targets" in scenario_cfg:
382             ip_list = []
383             for target in scenario_cfg["targets"]:
384                 if is_ip_addr(target):
385                     ip_list.append(target)
386                     context_cfg['target'] = {}
387                 else:
388                     context_cfg['target'] = (
389                         base_context.Context.get_server(target))
390                     if self._is_same_context(scenario_cfg["host"],
391                                              target):
392                         ip_list.append(context_cfg["target"]["private_ip"])
393                     else:
394                         ip_list.append(context_cfg["target"]["ip"])
395             context_cfg['target']['ipaddr'] = ','.join(ip_list)
396
397         if "nodes" in scenario_cfg:
398             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
399             context_cfg["networks"] = get_networks_from_nodes(
400                 context_cfg["nodes"])
401
402         runner = base_runner.Runner.get(runner_cfg)
403
404         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
405         runner.run(scenario_cfg, context_cfg)
406
407         return runner
408
409     def _is_same_context(self, host_attr, target_attr):
410         """check if two servers are in the same heat context
411         host_attr: either a name for a server created by yardstick or a dict
412         with attribute name mapping when using external heat templates
413         target_attr: either a name for a server created by yardstick or a dict
414         with attribute name mapping when using external heat templates
415         """
416         for context in self.contexts:
417             if context.__context_type__ not in {contexts.CONTEXT_HEAT,
418                                                 contexts.CONTEXT_KUBERNETES}:
419                 continue
420
421             host = context._get_server(host_attr)
422             if host is None:
423                 continue
424
425             target = context._get_server(target_attr)
426             if target is None:
427                 return False
428
429             # Both host and target is not None, then they are in the
430             # same heat context.
431             return True
432
433         return False
434
435
436 class TaskParser(object):       # pragma: no cover
437     """Parser for task config files in yaml format"""
438
439     def __init__(self, path):
440         self.path = path
441
442     def _meet_constraint(self, task, cur_pod, cur_installer):
443         if "constraint" in task:
444             constraint = task.get('constraint', None)
445             if constraint is not None:
446                 tc_fit_pod = constraint.get('pod', None)
447                 tc_fit_installer = constraint.get('installer', None)
448                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
449                          cur_pod, cur_installer, constraint)
450                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
451                     return False
452                 if (cur_installer is None) or (tc_fit_installer and cur_installer
453                                                not in tc_fit_installer):
454                     return False
455         return True
456
457     def _get_task_para(self, task, cur_pod):
458         task_args = task.get('task_args', None)
459         if task_args is not None:
460             task_args = task_args.get(cur_pod, task_args.get('default'))
461         task_args_fnames = task.get('task_args_fnames', None)
462         if task_args_fnames is not None:
463             task_args_fnames = task_args_fnames.get(cur_pod, None)
464         return task_args, task_args_fnames
465
466     def parse_suite(self):
467         """parse the suite file and return a list of task config file paths
468            and lists of optional parameters if present"""
469         LOG.info("\nParsing suite file:%s", self.path)
470
471         try:
472             with open(self.path) as stream:
473                 cfg = yaml_load(stream)
474         except IOError as ioerror:
475             sys.exit(ioerror)
476
477         self._check_schema(cfg["schema"], "suite")
478         LOG.info("\nStarting scenario:%s", cfg["name"])
479
480         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
481         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
482                                       test_cases_dir)
483         if test_cases_dir[-1] != os.sep:
484             test_cases_dir += os.sep
485
486         cur_pod = os.environ.get('NODE_NAME', None)
487         cur_installer = os.environ.get('INSTALLER_TYPE', None)
488
489         valid_task_files = []
490         valid_task_args = []
491         valid_task_args_fnames = []
492
493         for task in cfg["test_cases"]:
494             # 1.check file_name
495             if "file_name" in task:
496                 task_fname = task.get('file_name', None)
497                 if task_fname is None:
498                     continue
499             else:
500                 continue
501             # 2.check constraint
502             if self._meet_constraint(task, cur_pod, cur_installer):
503                 valid_task_files.append(test_cases_dir + task_fname)
504             else:
505                 continue
506             # 3.fetch task parameters
507             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
508             valid_task_args.append(task_args)
509             valid_task_args_fnames.append(task_args_fnames)
510
511         return valid_task_files, valid_task_args, valid_task_args_fnames
512
513     def _render_task(self, task_args, task_args_file):
514         """Render the input task with the given arguments
515
516         :param task_args: (dict) arguments to render the task
517         :param task_args_file: (str) file containing the arguments to render
518                                the task
519         :return: (str) task file rendered
520         """
521         try:
522             kw = {}
523             if task_args_file:
524                 with open(task_args_file) as f:
525                     kw.update(parse_task_args('task_args_file', f.read()))
526             kw.update(parse_task_args('task_args', task_args))
527         except TypeError:
528             raise y_exc.TaskRenderArgumentError()
529
530         input_task = None
531         try:
532             with open(self.path) as f:
533                 input_task = f.read()
534             rendered_task = task_template.TaskTemplate.render(input_task, **kw)
535             LOG.debug('Input task is:\n%s', rendered_task)
536             parsed_task = yaml_load(rendered_task)
537         except (IOError, OSError):
538             raise y_exc.TaskReadError(task_file=self.path)
539         except Exception:
540             raise y_exc.TaskRenderError(input_task=input_task)
541
542         return parsed_task, rendered_task
543
544     def parse_task(self, task_id, task_args=None, task_args_file=None):
545         """parses the task file and return an context and scenario instances"""
546         LOG.info("Parsing task config: %s", self.path)
547
548         cfg, rendered = self._render_task(task_args, task_args_file)
549         self._check_schema(cfg["schema"], "task")
550         meet_precondition = self._check_precondition(cfg)
551
552         # TODO: support one or many contexts? Many would simpler and precise
553         # TODO: support hybrid context type
554         if "context" in cfg:
555             context_cfgs = [cfg["context"]]
556         elif "contexts" in cfg:
557             context_cfgs = cfg["contexts"]
558         else:
559             context_cfgs = [{"type": contexts.CONTEXT_DUMMY}]
560
561         _contexts = []
562         for cfg_attrs in context_cfgs:
563
564             cfg_attrs['task_id'] = task_id
565             # default to Heat context because we are testing OpenStack
566             context_type = cfg_attrs.get("type", contexts.CONTEXT_HEAT)
567             context = base_context.Context.get(context_type)
568             context.init(cfg_attrs)
569             # Update the name in case the context has used the name_suffix
570             cfg_attrs['name'] = context.name
571             _contexts.append(context)
572
573         run_in_parallel = cfg.get("run_in_parallel", False)
574
575         # add tc and task id for influxdb extended tags
576         for scenario in cfg["scenarios"]:
577             task_name = os.path.splitext(os.path.basename(self.path))[0]
578             scenario["tc"] = task_name
579             scenario["task_id"] = task_id
580             # embed task path into scenario so we can load other files
581             # relative to task path
582             scenario["task_path"] = os.path.dirname(self.path)
583
584             self._change_node_names(scenario, _contexts)
585
586         # TODO we need something better here, a class that represent the file
587         return {'scenarios': cfg['scenarios'],
588                 'run_in_parallel': run_in_parallel,
589                 'meet_precondition': meet_precondition,
590                 'contexts': _contexts,
591                 'rendered': rendered}
592
593     @staticmethod
594     def _change_node_names(scenario, _contexts):
595         """Change the node names in a scenario, depending on the context config
596
597         The nodes (VMs or physical servers) are referred in the context section
598         with the name of the server and the name of the context:
599             <server name>.<context name>
600
601         If the context is going to be undeployed at the end of the test, the
602         task ID is suffixed to the name to avoid interferences with previous
603         deployments. If the context needs to be deployed at the end of the
604         test, the name assigned is kept.
605
606         There are several places where a node name could appear in the scenario
607         configuration:
608         scenario:
609           host: athena.demo
610           target: kratos.demo
611           targets:
612             - athena.demo
613             - kratos.demo
614
615         scenario:
616           options:
617             server_name:  # JIRA: YARDSTICK-810
618               host: athena.demo
619               target: kratos.demo
620
621         scenario:
622           nodes:
623             tg__0: tg_0.yardstick
624             vnf__0: vnf_0.yardstick
625
626         NOTE: in Kubernetes context, the separator character between the server
627         name and the context name is "-":
628         scenario:
629           host: host-k8s
630           target: target-k8s
631         """
632         def qualified_name(name):
633             for context in _contexts:
634                 host_name, ctx_name = context.split_host_name(name)
635                 if context.assigned_name == ctx_name:
636                     return '{}{}{}'.format(host_name,
637                                            context.host_name_separator,
638                                            context.name)
639
640             raise y_exc.ScenarioConfigContextNameNotFound(host_name=name)
641
642         if 'host' in scenario:
643             scenario['host'] = qualified_name(scenario['host'])
644         if 'target' in scenario:
645             scenario['target'] = qualified_name(scenario['target'])
646         options = scenario.get('options') or {}
647         server_name = options.get('server_name') or {}
648         if 'host' in server_name:
649             server_name['host'] = qualified_name(server_name['host'])
650         if 'target' in server_name:
651             server_name['target'] = qualified_name(server_name['target'])
652         if 'targets' in scenario:
653             for idx, target in enumerate(scenario['targets']):
654                 scenario['targets'][idx] = qualified_name(target)
655         if 'nodes' in scenario:
656             for scenario_node, target in scenario['nodes'].items():
657                 scenario['nodes'][scenario_node] = qualified_name(target)
658
659     def _check_schema(self, cfg_schema, schema_type):
660         """Check if config file is using the correct schema type"""
661
662         if cfg_schema != "yardstick:" + schema_type + ":0.1":
663             sys.exit("error: file %s has unknown schema %s" % (self.path,
664                                                                cfg_schema))
665
666     def _check_precondition(self, cfg):
667         """Check if the environment meet the precondition"""
668
669         if "precondition" in cfg:
670             precondition = cfg["precondition"]
671             installer_type = precondition.get("installer_type", None)
672             deploy_scenarios = precondition.get("deploy_scenarios", None)
673             tc_fit_pods = precondition.get("pod_name", None)
674             installer_type_env = os.environ.get('INSTALL_TYPE', None)
675             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
676             pod_name_env = os.environ.get('NODE_NAME', None)
677
678             LOG.info("installer_type: %s, installer_type_env: %s",
679                      installer_type, installer_type_env)
680             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
681                      deploy_scenarios, deploy_scenario_env)
682             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
683                      tc_fit_pods, pod_name_env)
684             if installer_type and installer_type_env:
685                 if installer_type_env not in installer_type:
686                     return False
687             if deploy_scenarios and deploy_scenario_env:
688                 deploy_scenarios_list = deploy_scenarios.split(',')
689                 for deploy_scenario in deploy_scenarios_list:
690                     if deploy_scenario_env.startswith(deploy_scenario):
691                         return True
692                 return False
693             if tc_fit_pods and pod_name_env:
694                 if pod_name_env not in tc_fit_pods:
695                     return False
696         return True
697
698
699 def is_ip_addr(addr):
700     """check if string addr is an IP address"""
701     try:
702         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
703     except AttributeError:
704         pass
705
706     try:
707         ipaddress.ip_address(addr.encode('utf-8'))
708     except ValueError:
709         return False
710     else:
711         return True
712
713
714 def _is_background_scenario(scenario):
715     if "run_in_background" in scenario:
716         return scenario["run_in_background"]
717     else:
718         return False
719
720
721 def parse_nodes_with_context(scenario_cfg):
722     """parse the 'nodes' fields in scenario """
723     # ensure consistency in node instantiation order
724     return OrderedDict((nodename, base_context.Context.get_server(
725                         scenario_cfg["nodes"][nodename]))
726                        for nodename in sorted(scenario_cfg["nodes"]))
727
728
729 def get_networks_from_nodes(nodes):
730     """parse the 'nodes' fields in scenario """
731     networks = {}
732     for node in nodes.values():
733         if not node:
734             continue
735         interfaces = node.get('interfaces', {})
736         for interface in interfaces.values():
737             # vld_id is network_name
738             network_name = interface.get('network_name')
739             if not network_name:
740                 continue
741             network = base_context.Context.get_network(network_name)
742             if network:
743                 networks[network['name']] = network
744     return networks
745
746
747 def runner_join(runner, background_runners, outputs, result):
748     """join (wait for) a runner, exit process at runner failure
749     :param background_runners:
750     :type background_runners:
751     :param outputs:
752     :type outputs: dict
753     :param result:
754     :type result: list
755     """
756     while runner.poll() is None:
757         outputs.update(runner.get_output())
758         result.extend(runner.get_result())
759         # drain all the background runner queues
760         for background in background_runners:
761             outputs.update(background.get_output())
762             result.extend(background.get_result())
763     status = runner.join(outputs, result)
764     base_runner.Runner.release(runner)
765     return status
766
767
768 def print_invalid_header(source_name, args):
769     print("Invalid %(source)s passed:\n\n %(args)s\n"
770           % {"source": source_name, "args": args})
771
772
773 def parse_task_args(src_name, args):
774     if isinstance(args, collections.Mapping):
775         return args
776
777     try:
778         kw = args and yaml_load(args)
779         kw = {} if kw is None else kw
780     except yaml.parser.ParserError as e:
781         print_invalid_header(src_name, args)
782         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
783               % {"source": src_name, "err": e})
784         raise TypeError()
785
786     if not isinstance(kw, dict):
787         print_invalid_header(src_name, args)
788         print("%(src)s had to be dict, actually %(src_type)s\n"
789               % {"src": src_name, "src_type": type(kw)})
790         raise TypeError()
791     return kw