2c3edfe1338ebce4e94eb94d63f3de29d697a6ff
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10
11 import sys
12 import os
13 from collections import OrderedDict
14
15 import yaml
16 import atexit
17 import ipaddress
18 import time
19 import logging
20 import uuid
21 import collections
22
23 from six.moves import filter
24 from jinja2 import Environment
25
26 from yardstick.benchmark.contexts.base import Context
27 from yardstick.benchmark.runners import base as base_runner
28 from yardstick.common.constants import CONF_FILE
29 from yardstick.common.yaml_loader import yaml_load
30 from yardstick.dispatcher.base import Base as DispatcherBase
31 from yardstick.common.task_template import TaskTemplate
32 from yardstick.common import utils
33 from yardstick.common import constants
34 from yardstick.common import exceptions
35 from yardstick.common.html_template import report_template
36
37 output_file_default = "/tmp/yardstick.out"
38 test_cases_dir_default = "tests/opnfv/test_cases/"
39 LOG = logging.getLogger(__name__)
40
41
42 class Task(object):     # pragma: no cover
43     """Task commands.
44
45        Set of commands to manage benchmark tasks.
46     """
47
48     def __init__(self):
49         self.contexts = []
50         self.outputs = {}
51
52     def _set_dispatchers(self, output_config):
53         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
54                                                            'file')
55         out_types = [s.strip() for s in dispatchers.split(',')]
56         output_config['DEFAULT']['dispatcher'] = out_types
57
58     def start(self, args):
59         """Start a benchmark scenario."""
60
61         atexit.register(self.atexit_handler)
62
63         task_id = getattr(args, 'task_id')
64         self.task_id = task_id if task_id else str(uuid.uuid4())
65
66         self._set_log()
67
68         try:
69             output_config = utils.parse_ini_file(CONF_FILE)
70         except Exception:  # pylint: disable=broad-except
71             # all error will be ignore, the default value is {}
72             output_config = {}
73
74         self._init_output_config(output_config)
75         self._set_output_config(output_config, args.output_file)
76         LOG.debug('Output configuration is: %s', output_config)
77
78         self._set_dispatchers(output_config)
79
80         # update dispatcher list
81         if 'file' in output_config['DEFAULT']['dispatcher']:
82             result = {'status': 0, 'result': {}}
83             utils.write_json_to_file(args.output_file, result)
84
85         total_start_time = time.time()
86         parser = TaskParser(args.inputfile[0])
87
88         if args.suite:
89             # 1.parse suite, return suite_params info
90             task_files, task_args, task_args_fnames = \
91                 parser.parse_suite()
92         else:
93             task_files = [parser.path]
94             task_args = [args.task_args]
95             task_args_fnames = [args.task_args_file]
96
97         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
98                   task_files, task_args, task_args_fnames)
99
100         if args.parse_only:
101             sys.exit(0)
102
103         testcases = {}
104         # parse task_files
105         for i in range(0, len(task_files)):
106             one_task_start_time = time.time()
107             parser.path = task_files[i]
108             scenarios, run_in_parallel, meet_precondition, contexts = \
109                 parser.parse_task(self.task_id, task_args[i],
110                                   task_args_fnames[i])
111
112             self.contexts.extend(contexts)
113
114             if not meet_precondition:
115                 LOG.info("meet_precondition is %s, please check envrionment",
116                          meet_precondition)
117                 continue
118
119             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
120             try:
121                 data = self._run(scenarios, run_in_parallel, args.output_file)
122             except KeyboardInterrupt:
123                 raise
124             except Exception:  # pylint: disable=broad-except
125                 LOG.error('Testcase: "%s" FAILED!!!', case_name, exc_info=True)
126                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
127             else:
128                 LOG.info('Testcase: "%s" SUCCESS!!!', case_name)
129                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
130
131             if args.keep_deploy:
132                 # keep deployment, forget about stack
133                 # (hide it for exit handler)
134                 self.contexts = []
135             else:
136                 for context in self.contexts[::-1]:
137                     context.undeploy()
138                 self.contexts = []
139             one_task_end_time = time.time()
140             LOG.info("Task %s finished in %d secs", task_files[i],
141                      one_task_end_time - one_task_start_time)
142
143         result = self._get_format_result(testcases)
144
145         self._do_output(output_config, result)
146         self._generate_reporting(result)
147
148         total_end_time = time.time()
149         LOG.info("Total finished in %d secs",
150                  total_end_time - total_start_time)
151
152         scenario = scenarios[0]
153         LOG.info("To generate report, execute command "
154                  "'yardstick report generate %(task_id)s %(tc)s'", scenario)
155         LOG.info("Task ALL DONE, exiting")
156         return result
157
158     def _generate_reporting(self, result):
159         env = Environment()
160         with open(constants.REPORTING_FILE, 'w') as f:
161             f.write(env.from_string(report_template).render(result))
162
163         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
164
165     def _set_log(self):
166         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
167         log_formatter = logging.Formatter(log_format)
168
169         utils.makedirs(constants.TASK_LOG_DIR)
170         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
171         log_handler = logging.FileHandler(log_path)
172         log_handler.setFormatter(log_formatter)
173         log_handler.setLevel(logging.DEBUG)
174
175         logging.root.addHandler(log_handler)
176
177     def _init_output_config(self, output_config):
178         output_config.setdefault('DEFAULT', {})
179         output_config.setdefault('dispatcher_http', {})
180         output_config.setdefault('dispatcher_file', {})
181         output_config.setdefault('dispatcher_influxdb', {})
182         output_config.setdefault('nsb', {})
183
184     def _set_output_config(self, output_config, file_path):
185         try:
186             out_type = os.environ['DISPATCHER']
187         except KeyError:
188             output_config['DEFAULT'].setdefault('dispatcher', 'file')
189         else:
190             output_config['DEFAULT']['dispatcher'] = out_type
191
192         output_config['dispatcher_file']['file_path'] = file_path
193
194         try:
195             target = os.environ['TARGET']
196         except KeyError:
197             pass
198         else:
199             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
200             output_config[k]['target'] = target
201
202     def _get_format_result(self, testcases):
203         criteria = self._get_task_criteria(testcases)
204
205         info = {
206             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
207             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
208             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
209             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
210         }
211
212         result = {
213             'status': 1,
214             'result': {
215                 'criteria': criteria,
216                 'task_id': self.task_id,
217                 'info': info,
218                 'testcases': testcases
219             }
220         }
221
222         return result
223
224     def _get_task_criteria(self, testcases):
225         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
226         if criteria:
227             return 'FAIL'
228         else:
229             return 'PASS'
230
231     def _do_output(self, output_config, result):
232         dispatchers = DispatcherBase.get(output_config)
233
234         for dispatcher in dispatchers:
235             dispatcher.flush_result_data(result)
236
237     def _run(self, scenarios, run_in_parallel, output_file):
238         """Deploys context and calls runners"""
239         for context in self.contexts:
240             context.deploy()
241
242         background_runners = []
243
244         result = []
245         # Start all background scenarios
246         for scenario in filter(_is_background_scenario, scenarios):
247             scenario["runner"] = dict(type="Duration", duration=1000000000)
248             runner = self.run_one_scenario(scenario, output_file)
249             background_runners.append(runner)
250
251         runners = []
252         if run_in_parallel:
253             for scenario in scenarios:
254                 if not _is_background_scenario(scenario):
255                     runner = self.run_one_scenario(scenario, output_file)
256                     runners.append(runner)
257
258             # Wait for runners to finish
259             for runner in runners:
260                 status = runner_join(runner, background_runners, self.outputs, result)
261                 if status != 0:
262                     raise RuntimeError(
263                         "{0} runner status {1}".format(runner.__execution_type__, status))
264                 LOG.info("Runner ended, output in %s", output_file)
265         else:
266             # run serially
267             for scenario in scenarios:
268                 if not _is_background_scenario(scenario):
269                     runner = self.run_one_scenario(scenario, output_file)
270                     status = runner_join(runner, background_runners, self.outputs, result)
271                     if status != 0:
272                         LOG.error('Scenario NO.%s: "%s" ERROR!',
273                                   scenarios.index(scenario) + 1,
274                                   scenario.get('type'))
275                         raise RuntimeError(
276                             "{0} runner status {1}".format(runner.__execution_type__, status))
277                     LOG.info("Runner ended, output in %s", output_file)
278
279         # Abort background runners
280         for runner in background_runners:
281             runner.abort()
282
283         # Wait for background runners to finish
284         for runner in background_runners:
285             status = runner.join(self.outputs, result)
286             if status is None:
287                 # Nuke if it did not stop nicely
288                 base_runner.Runner.terminate(runner)
289                 runner.join(self.outputs, result)
290             base_runner.Runner.release(runner)
291
292             print("Background task ended")
293         return result
294
295     def atexit_handler(self):
296         """handler for process termination"""
297         base_runner.Runner.terminate_all()
298
299         if self.contexts:
300             LOG.info("Undeploying all contexts")
301             for context in self.contexts[::-1]:
302                 context.undeploy()
303
304     def _parse_options(self, op):
305         if isinstance(op, dict):
306             return {k: self._parse_options(v) for k, v in op.items()}
307         elif isinstance(op, list):
308             return [self._parse_options(v) for v in op]
309         elif isinstance(op, str):
310             return self.outputs.get(op[1:]) if op.startswith('$') else op
311         else:
312             return op
313
314     def run_one_scenario(self, scenario_cfg, output_file):
315         """run one scenario using context"""
316         runner_cfg = scenario_cfg["runner"]
317         runner_cfg['output_filename'] = output_file
318
319         options = scenario_cfg.get('options', {})
320         scenario_cfg['options'] = self._parse_options(options)
321
322         # TODO support get multi hosts/vms info
323         context_cfg = {}
324         server_name = scenario_cfg.get('options', {}).get('server_name', {})
325
326         def config_context_target(cfg):
327             target = cfg['target']
328             if is_ip_addr(target):
329                 context_cfg['target'] = {"ipaddr": target}
330             else:
331                 context_cfg['target'] = Context.get_server(target)
332                 if self._is_same_context(cfg["host"], target):
333                     context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
334                 else:
335                     context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
336
337         host_name = server_name.get('host', scenario_cfg.get('host'))
338         if host_name:
339             context_cfg['host'] = Context.get_server(host_name)
340
341         for item in [server_name, scenario_cfg]:
342             try:
343                 config_context_target(item)
344             except KeyError:
345                 LOG.debug("Got a KeyError in config_context_target(%s)", item)
346             else:
347                 break
348
349         if "targets" in scenario_cfg:
350             ip_list = []
351             for target in scenario_cfg["targets"]:
352                 if is_ip_addr(target):
353                     ip_list.append(target)
354                     context_cfg['target'] = {}
355                 else:
356                     context_cfg['target'] = Context.get_server(target)
357                     if self._is_same_context(scenario_cfg["host"],
358                                              target):
359                         ip_list.append(context_cfg["target"]["private_ip"])
360                     else:
361                         ip_list.append(context_cfg["target"]["ip"])
362             context_cfg['target']['ipaddr'] = ','.join(ip_list)
363
364         if "nodes" in scenario_cfg:
365             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
366             context_cfg["networks"] = get_networks_from_nodes(
367                 context_cfg["nodes"])
368
369         runner = base_runner.Runner.get(runner_cfg)
370
371         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
372         runner.run(scenario_cfg, context_cfg)
373
374         return runner
375
376     def _is_same_context(self, host_attr, target_attr):
377         """check if two servers are in the same heat context
378         host_attr: either a name for a server created by yardstick or a dict
379         with attribute name mapping when using external heat templates
380         target_attr: either a name for a server created by yardstick or a dict
381         with attribute name mapping when using external heat templates
382         """
383         for context in self.contexts:
384             if context.__context_type__ not in {"Heat", "Kubernetes"}:
385                 continue
386
387             host = context._get_server(host_attr)
388             if host is None:
389                 continue
390
391             target = context._get_server(target_attr)
392             if target is None:
393                 return False
394
395             # Both host and target is not None, then they are in the
396             # same heat context.
397             return True
398
399         return False
400
401
402 class TaskParser(object):       # pragma: no cover
403     """Parser for task config files in yaml format"""
404
405     def __init__(self, path):
406         self.path = path
407
408     def _meet_constraint(self, task, cur_pod, cur_installer):
409         if "constraint" in task:
410             constraint = task.get('constraint', None)
411             if constraint is not None:
412                 tc_fit_pod = constraint.get('pod', None)
413                 tc_fit_installer = constraint.get('installer', None)
414                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
415                          cur_pod, cur_installer, constraint)
416                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
417                     return False
418                 if (cur_installer is None) or (tc_fit_installer and cur_installer
419                                                not in tc_fit_installer):
420                     return False
421         return True
422
423     def _get_task_para(self, task, cur_pod):
424         task_args = task.get('task_args', None)
425         if task_args is not None:
426             task_args = task_args.get(cur_pod, task_args.get('default'))
427         task_args_fnames = task.get('task_args_fnames', None)
428         if task_args_fnames is not None:
429             task_args_fnames = task_args_fnames.get(cur_pod, None)
430         return task_args, task_args_fnames
431
432     def parse_suite(self):
433         """parse the suite file and return a list of task config file paths
434            and lists of optional parameters if present"""
435         LOG.info("\nParsing suite file:%s", self.path)
436
437         try:
438             with open(self.path) as stream:
439                 cfg = yaml_load(stream)
440         except IOError as ioerror:
441             sys.exit(ioerror)
442
443         self._check_schema(cfg["schema"], "suite")
444         LOG.info("\nStarting scenario:%s", cfg["name"])
445
446         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
447         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
448                                       test_cases_dir)
449         if test_cases_dir[-1] != os.sep:
450             test_cases_dir += os.sep
451
452         cur_pod = os.environ.get('NODE_NAME', None)
453         cur_installer = os.environ.get('INSTALLER_TYPE', None)
454
455         valid_task_files = []
456         valid_task_args = []
457         valid_task_args_fnames = []
458
459         for task in cfg["test_cases"]:
460             # 1.check file_name
461             if "file_name" in task:
462                 task_fname = task.get('file_name', None)
463                 if task_fname is None:
464                     continue
465             else:
466                 continue
467             # 2.check constraint
468             if self._meet_constraint(task, cur_pod, cur_installer):
469                 valid_task_files.append(test_cases_dir + task_fname)
470             else:
471                 continue
472             # 3.fetch task parameters
473             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
474             valid_task_args.append(task_args)
475             valid_task_args_fnames.append(task_args_fnames)
476
477         return valid_task_files, valid_task_args, valid_task_args_fnames
478
479     def parse_task(self, task_id, task_args=None, task_args_file=None):
480         """parses the task file and return an context and scenario instances"""
481         LOG.info("Parsing task config: %s", self.path)
482
483         try:
484             kw = {}
485             if task_args_file:
486                 with open(task_args_file) as f:
487                     kw.update(parse_task_args("task_args_file", f.read()))
488             kw.update(parse_task_args("task_args", task_args))
489         except TypeError:
490             raise TypeError()
491
492         try:
493             with open(self.path) as f:
494                 try:
495                     input_task = f.read()
496                     rendered_task = TaskTemplate.render(input_task, **kw)
497                 except Exception as e:
498                     LOG.exception('Failed to render template:\n%s\n', input_task)
499                     raise e
500                 LOG.debug("Input task is:\n%s\n", rendered_task)
501
502                 cfg = yaml_load(rendered_task)
503         except IOError as ioerror:
504             sys.exit(ioerror)
505
506         self._check_schema(cfg["schema"], "task")
507         meet_precondition = self._check_precondition(cfg)
508
509         # TODO: support one or many contexts? Many would simpler and precise
510         # TODO: support hybrid context type
511         if "context" in cfg:
512             context_cfgs = [cfg["context"]]
513         elif "contexts" in cfg:
514             context_cfgs = cfg["contexts"]
515         else:
516             context_cfgs = [{"type": "Dummy"}]
517
518         contexts = []
519         for cfg_attrs in context_cfgs:
520
521             cfg_attrs['task_id'] = task_id
522             # default to Heat context because we are testing OpenStack
523             context_type = cfg_attrs.get("type", "Heat")
524             context = Context.get(context_type)
525             context.init(cfg_attrs)
526             # Update the name in case the context has used the name_suffix
527             cfg_attrs['name'] = context.name
528             contexts.append(context)
529
530         run_in_parallel = cfg.get("run_in_parallel", False)
531
532         # add tc and task id for influxdb extended tags
533         for scenario in cfg["scenarios"]:
534             task_name = os.path.splitext(os.path.basename(self.path))[0]
535             scenario["tc"] = task_name
536             scenario["task_id"] = task_id
537             # embed task path into scenario so we can load other files
538             # relative to task path
539             scenario["task_path"] = os.path.dirname(self.path)
540
541             self._change_node_names(scenario, contexts)
542
543         # TODO we need something better here, a class that represent the file
544         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
545
546     @staticmethod
547     def _change_node_names(scenario, contexts):
548         """Change the node names in a scenario, depending on the context config
549
550         The nodes (VMs or physical servers) are referred in the context section
551         with the name of the server and the name of the context:
552             <server name>.<context name>
553
554         If the context is going to be undeployed at the end of the test, the
555         task ID is suffixed to the name to avoid interferences with previous
556         deployments. If the context needs to be deployed at the end of the
557         test, the name assigned is kept.
558
559         There are several places where a node name could appear in the scenario
560         configuration:
561         scenario:
562           host: athena.demo
563           target: kratos.demo
564           targets:
565             - athena.demo
566             - kratos.demo
567
568         scenario:
569           options:
570             server_name:  # JIRA: YARDSTICK-810
571               host: athena.demo
572               target: kratos.demo
573
574         scenario:
575           nodes:
576             tg__0: tg_0.yardstick
577             vnf__0: vnf_0.yardstick
578         """
579         def qualified_name(name):
580             node_name, context_name = name.split('.')
581             try:
582                 ctx = next((context for context in contexts
583                        if context.assigned_name == context_name))
584             except StopIteration:
585                 raise exceptions.ScenarioConfigContextNameNotFound(
586                     context_name=context_name)
587
588             return '{}.{}'.format(node_name, ctx.name)
589
590         if 'host' in scenario:
591             scenario['host'] = qualified_name(scenario['host'])
592         if 'target' in scenario:
593             scenario['target'] = qualified_name(scenario['target'])
594         server_name = scenario.get('options', {}).get('server_name', {})
595         if 'host' in server_name:
596             server_name['host'] = qualified_name(server_name['host'])
597         if 'target' in server_name:
598             server_name['target'] = qualified_name(server_name['target'])
599         if 'targets' in scenario:
600             for idx, target in enumerate(scenario['targets']):
601                 scenario['targets'][idx] = qualified_name(target)
602         if 'nodes' in scenario:
603             for scenario_node, target in scenario['nodes'].items():
604                 scenario['nodes'][scenario_node] = qualified_name(target)
605
606     def _check_schema(self, cfg_schema, schema_type):
607         """Check if config file is using the correct schema type"""
608
609         if cfg_schema != "yardstick:" + schema_type + ":0.1":
610             sys.exit("error: file %s has unknown schema %s" % (self.path,
611                                                                cfg_schema))
612
613     def _check_precondition(self, cfg):
614         """Check if the environment meet the precondition"""
615
616         if "precondition" in cfg:
617             precondition = cfg["precondition"]
618             installer_type = precondition.get("installer_type", None)
619             deploy_scenarios = precondition.get("deploy_scenarios", None)
620             tc_fit_pods = precondition.get("pod_name", None)
621             installer_type_env = os.environ.get('INSTALL_TYPE', None)
622             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
623             pod_name_env = os.environ.get('NODE_NAME', None)
624
625             LOG.info("installer_type: %s, installer_type_env: %s",
626                      installer_type, installer_type_env)
627             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
628                      deploy_scenarios, deploy_scenario_env)
629             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
630                      tc_fit_pods, pod_name_env)
631             if installer_type and installer_type_env:
632                 if installer_type_env not in installer_type:
633                     return False
634             if deploy_scenarios and deploy_scenario_env:
635                 deploy_scenarios_list = deploy_scenarios.split(',')
636                 for deploy_scenario in deploy_scenarios_list:
637                     if deploy_scenario_env.startswith(deploy_scenario):
638                         return True
639                 return False
640             if tc_fit_pods and pod_name_env:
641                 if pod_name_env not in tc_fit_pods:
642                     return False
643         return True
644
645
646 def is_ip_addr(addr):
647     """check if string addr is an IP address"""
648     try:
649         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
650     except AttributeError:
651         pass
652
653     try:
654         ipaddress.ip_address(addr.encode('utf-8'))
655     except ValueError:
656         return False
657     else:
658         return True
659
660
661 def _is_background_scenario(scenario):
662     if "run_in_background" in scenario:
663         return scenario["run_in_background"]
664     else:
665         return False
666
667
668 def parse_nodes_with_context(scenario_cfg):
669     """parse the 'nodes' fields in scenario """
670     # ensure consistency in node instantiation order
671     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
672                        for nodename in sorted(scenario_cfg["nodes"]))
673
674
675 def get_networks_from_nodes(nodes):
676     """parse the 'nodes' fields in scenario """
677     networks = {}
678     for node in nodes.values():
679         if not node:
680             continue
681         interfaces = node.get('interfaces', {})
682         for interface in interfaces.values():
683             # vld_id is network_name
684             network_name = interface.get('network_name')
685             if not network_name:
686                 continue
687             network = Context.get_network(network_name)
688             if network:
689                 networks[network['name']] = network
690     return networks
691
692
693 def runner_join(runner, background_runners, outputs, result):
694     """join (wait for) a runner, exit process at runner failure
695     :param background_runners:
696     :type background_runners:
697     :param outputs:
698     :type outputs: dict
699     :param result:
700     :type result: list
701     """
702     while runner.poll() is None:
703         outputs.update(runner.get_output())
704         result.extend(runner.get_result())
705         # drain all the background runner queues
706         for background in background_runners:
707             outputs.update(background.get_output())
708             result.extend(background.get_result())
709     status = runner.join(outputs, result)
710     base_runner.Runner.release(runner)
711     return status
712
713
714 def print_invalid_header(source_name, args):
715     print("Invalid %(source)s passed:\n\n %(args)s\n"
716           % {"source": source_name, "args": args})
717
718
719 def parse_task_args(src_name, args):
720     if isinstance(args, collections.Mapping):
721         return args
722
723     try:
724         kw = args and yaml_load(args)
725         kw = {} if kw is None else kw
726     except yaml.parser.ParserError as e:
727         print_invalid_header(src_name, args)
728         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
729               % {"source": src_name, "err": e})
730         raise TypeError()
731
732     if not isinstance(kw, dict):
733         print_invalid_header(src_name, args)
734         print("%(src)s had to be dict, actually %(src_type)s\n"
735               % {"src": src_name, "src_type": type(kw)})
736         raise TypeError()
737     return kw