Update TaskParser to deal with qualified name in Context
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10
11 import sys
12 import os
13 from collections import OrderedDict
14
15 import yaml
16 import atexit
17 import ipaddress
18 import time
19 import logging
20 import uuid
21 import collections
22
23 from six.moves import filter
24 from jinja2 import Environment
25
26 from yardstick.benchmark.contexts.base import Context
27 from yardstick.benchmark.runners import base as base_runner
28 from yardstick.common.constants import CONF_FILE
29 from yardstick.common.yaml_loader import yaml_load
30 from yardstick.dispatcher.base import Base as DispatcherBase
31 from yardstick.common.task_template import TaskTemplate
32 from yardstick.common import utils
33 from yardstick.common import constants
34 from yardstick.common import exceptions
35 from yardstick.common.html_template import report_template
36
37 output_file_default = "/tmp/yardstick.out"
38 test_cases_dir_default = "tests/opnfv/test_cases/"
39 LOG = logging.getLogger(__name__)
40
41
42 class Task(object):     # pragma: no cover
43     """Task commands.
44
45        Set of commands to manage benchmark tasks.
46     """
47
48     def __init__(self):
49         self.contexts = []
50         self.outputs = {}
51
52     def _set_dispatchers(self, output_config):
53         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
54                                                            'file')
55         out_types = [s.strip() for s in dispatchers.split(',')]
56         output_config['DEFAULT']['dispatcher'] = out_types
57
58     def start(self, args):
59         """Start a benchmark scenario."""
60
61         atexit.register(self.atexit_handler)
62
63         task_id = getattr(args, 'task_id')
64         self.task_id = task_id if task_id else str(uuid.uuid4())
65
66         self._set_log()
67
68         try:
69             output_config = utils.parse_ini_file(CONF_FILE)
70         except Exception:  # pylint: disable=broad-except
71             # all error will be ignore, the default value is {}
72             output_config = {}
73
74         self._init_output_config(output_config)
75         self._set_output_config(output_config, args.output_file)
76         LOG.debug('Output configuration is: %s', output_config)
77
78         self._set_dispatchers(output_config)
79
80         # update dispatcher list
81         if 'file' in output_config['DEFAULT']['dispatcher']:
82             result = {'status': 0, 'result': {}}
83             utils.write_json_to_file(args.output_file, result)
84
85         total_start_time = time.time()
86         parser = TaskParser(args.inputfile[0])
87
88         if args.suite:
89             # 1.parse suite, return suite_params info
90             task_files, task_args, task_args_fnames = \
91                 parser.parse_suite()
92         else:
93             task_files = [parser.path]
94             task_args = [args.task_args]
95             task_args_fnames = [args.task_args_file]
96
97         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
98                   task_files, task_args, task_args_fnames)
99
100         if args.parse_only:
101             sys.exit(0)
102
103         testcases = {}
104         # parse task_files
105         for i in range(0, len(task_files)):
106             one_task_start_time = time.time()
107             parser.path = task_files[i]
108             scenarios, run_in_parallel, meet_precondition, contexts = \
109                 parser.parse_task(self.task_id, task_args[i],
110                                   task_args_fnames[i])
111
112             self.contexts.extend(contexts)
113
114             if not meet_precondition:
115                 LOG.info("meet_precondition is %s, please check envrionment",
116                          meet_precondition)
117                 continue
118
119             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
120             try:
121                 data = self._run(scenarios, run_in_parallel, args.output_file)
122             except KeyboardInterrupt:
123                 raise
124             except Exception:  # pylint: disable=broad-except
125                 LOG.error('Testcase: "%s" FAILED!!!', case_name, exc_info=True)
126                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
127             else:
128                 LOG.info('Testcase: "%s" SUCCESS!!!', case_name)
129                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
130
131             if args.keep_deploy:
132                 # keep deployment, forget about stack
133                 # (hide it for exit handler)
134                 self.contexts = []
135             else:
136                 for context in self.contexts[::-1]:
137                     context.undeploy()
138                 self.contexts = []
139             one_task_end_time = time.time()
140             LOG.info("Task %s finished in %d secs", task_files[i],
141                      one_task_end_time - one_task_start_time)
142
143         result = self._get_format_result(testcases)
144
145         self._do_output(output_config, result)
146         self._generate_reporting(result)
147
148         total_end_time = time.time()
149         LOG.info("Total finished in %d secs",
150                  total_end_time - total_start_time)
151
152         scenario = scenarios[0]
153         LOG.info("To generate report, execute command "
154                  "'yardstick report generate %(task_id)s %(tc)s'", scenario)
155         LOG.info("Task ALL DONE, exiting")
156         return result
157
158     def _generate_reporting(self, result):
159         env = Environment()
160         with open(constants.REPORTING_FILE, 'w') as f:
161             f.write(env.from_string(report_template).render(result))
162
163         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
164
165     def _set_log(self):
166         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
167         log_formatter = logging.Formatter(log_format)
168
169         utils.makedirs(constants.TASK_LOG_DIR)
170         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
171         log_handler = logging.FileHandler(log_path)
172         log_handler.setFormatter(log_formatter)
173         log_handler.setLevel(logging.DEBUG)
174
175         logging.root.addHandler(log_handler)
176
177     def _init_output_config(self, output_config):
178         output_config.setdefault('DEFAULT', {})
179         output_config.setdefault('dispatcher_http', {})
180         output_config.setdefault('dispatcher_file', {})
181         output_config.setdefault('dispatcher_influxdb', {})
182         output_config.setdefault('nsb', {})
183
184     def _set_output_config(self, output_config, file_path):
185         try:
186             out_type = os.environ['DISPATCHER']
187         except KeyError:
188             output_config['DEFAULT'].setdefault('dispatcher', 'file')
189         else:
190             output_config['DEFAULT']['dispatcher'] = out_type
191
192         output_config['dispatcher_file']['file_path'] = file_path
193
194         try:
195             target = os.environ['TARGET']
196         except KeyError:
197             pass
198         else:
199             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
200             output_config[k]['target'] = target
201
202     def _get_format_result(self, testcases):
203         criteria = self._get_task_criteria(testcases)
204
205         info = {
206             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
207             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
208             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
209             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
210         }
211
212         result = {
213             'status': 1,
214             'result': {
215                 'criteria': criteria,
216                 'task_id': self.task_id,
217                 'info': info,
218                 'testcases': testcases
219             }
220         }
221
222         return result
223
224     def _get_task_criteria(self, testcases):
225         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
226         if criteria:
227             return 'FAIL'
228         else:
229             return 'PASS'
230
231     def _do_output(self, output_config, result):
232         dispatchers = DispatcherBase.get(output_config)
233
234         for dispatcher in dispatchers:
235             dispatcher.flush_result_data(result)
236
237     def _run(self, scenarios, run_in_parallel, output_file):
238         """Deploys context and calls runners"""
239         for context in self.contexts:
240             context.deploy()
241
242         background_runners = []
243
244         result = []
245         # Start all background scenarios
246         for scenario in filter(_is_background_scenario, scenarios):
247             scenario["runner"] = dict(type="Duration", duration=1000000000)
248             runner = self.run_one_scenario(scenario, output_file)
249             background_runners.append(runner)
250
251         runners = []
252         if run_in_parallel:
253             for scenario in scenarios:
254                 if not _is_background_scenario(scenario):
255                     runner = self.run_one_scenario(scenario, output_file)
256                     runners.append(runner)
257
258             # Wait for runners to finish
259             for runner in runners:
260                 status = runner_join(runner, background_runners, self.outputs, result)
261                 if status != 0:
262                     raise RuntimeError(
263                         "{0} runner status {1}".format(runner.__execution_type__, status))
264                 LOG.info("Runner ended, output in %s", output_file)
265         else:
266             # run serially
267             for scenario in scenarios:
268                 if not _is_background_scenario(scenario):
269                     runner = self.run_one_scenario(scenario, output_file)
270                     status = runner_join(runner, background_runners, self.outputs, result)
271                     if status != 0:
272                         LOG.error('Scenario NO.%s: "%s" ERROR!',
273                                   scenarios.index(scenario) + 1,
274                                   scenario.get('type'))
275                         raise RuntimeError(
276                             "{0} runner status {1}".format(runner.__execution_type__, status))
277                     LOG.info("Runner ended, output in %s", output_file)
278
279         # Abort background runners
280         for runner in background_runners:
281             runner.abort()
282
283         # Wait for background runners to finish
284         for runner in background_runners:
285             status = runner.join(self.outputs, result)
286             if status is None:
287                 # Nuke if it did not stop nicely
288                 base_runner.Runner.terminate(runner)
289                 runner.join(self.outputs, result)
290             base_runner.Runner.release(runner)
291
292             print("Background task ended")
293         return result
294
295     def atexit_handler(self):
296         """handler for process termination"""
297         base_runner.Runner.terminate_all()
298
299         if self.contexts:
300             LOG.info("Undeploying all contexts")
301             for context in self.contexts[::-1]:
302                 context.undeploy()
303
304     def _parse_options(self, op):
305         if isinstance(op, dict):
306             return {k: self._parse_options(v) for k, v in op.items()}
307         elif isinstance(op, list):
308             return [self._parse_options(v) for v in op]
309         elif isinstance(op, str):
310             return self.outputs.get(op[1:]) if op.startswith('$') else op
311         else:
312             return op
313
314     def run_one_scenario(self, scenario_cfg, output_file):
315         """run one scenario using context"""
316         runner_cfg = scenario_cfg["runner"]
317         runner_cfg['output_filename'] = output_file
318
319         options = scenario_cfg.get('options', {})
320         scenario_cfg['options'] = self._parse_options(options)
321
322         # TODO support get multi hosts/vms info
323         context_cfg = {}
324         server_name = scenario_cfg.get('options', {}).get('server_name', {})
325
326         def config_context_target(cfg):
327             target = cfg['target']
328             if is_ip_addr(target):
329                 context_cfg['target'] = {"ipaddr": target}
330             else:
331                 context_cfg['target'] = Context.get_server(target)
332                 if self._is_same_context(cfg["host"], target):
333                     context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
334                 else:
335                     context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
336
337         host_name = server_name.get('host', scenario_cfg.get('host'))
338         if host_name:
339             context_cfg['host'] = Context.get_server(host_name)
340
341         for item in [server_name, scenario_cfg]:
342             try:
343                 config_context_target(item)
344             except KeyError:
345                 pass
346             else:
347                 break
348
349         if "targets" in scenario_cfg:
350             ip_list = []
351             for target in scenario_cfg["targets"]:
352                 if is_ip_addr(target):
353                     ip_list.append(target)
354                     context_cfg['target'] = {}
355                 else:
356                     context_cfg['target'] = Context.get_server(target)
357                     if self._is_same_context(scenario_cfg["host"],
358                                              target):
359                         ip_list.append(context_cfg["target"]["private_ip"])
360                     else:
361                         ip_list.append(context_cfg["target"]["ip"])
362             context_cfg['target']['ipaddr'] = ','.join(ip_list)
363
364         if "nodes" in scenario_cfg:
365             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
366             context_cfg["networks"] = get_networks_from_nodes(
367                 context_cfg["nodes"])
368
369         runner = base_runner.Runner.get(runner_cfg)
370
371         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
372         runner.run(scenario_cfg, context_cfg)
373
374         return runner
375
376     def _is_same_context(self, host_attr, target_attr):
377         """check if two servers are in the same heat context
378         host_attr: either a name for a server created by yardstick or a dict
379         with attribute name mapping when using external heat templates
380         target_attr: either a name for a server created by yardstick or a dict
381         with attribute name mapping when using external heat templates
382         """
383         for context in self.contexts:
384             if context.__context_type__ not in {"Heat", "Kubernetes"}:
385                 continue
386
387             host = context._get_server(host_attr)
388             if host is None:
389                 continue
390
391             target = context._get_server(target_attr)
392             if target is None:
393                 return False
394
395             # Both host and target is not None, then they are in the
396             # same heat context.
397             return True
398
399         return False
400
401
402 class TaskParser(object):       # pragma: no cover
403     """Parser for task config files in yaml format"""
404
405     def __init__(self, path):
406         self.path = path
407
408     def _meet_constraint(self, task, cur_pod, cur_installer):
409         if "constraint" in task:
410             constraint = task.get('constraint', None)
411             if constraint is not None:
412                 tc_fit_pod = constraint.get('pod', None)
413                 tc_fit_installer = constraint.get('installer', None)
414                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
415                          cur_pod, cur_installer, constraint)
416                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
417                     return False
418                 if (cur_installer is None) or (tc_fit_installer and cur_installer
419                                                not in tc_fit_installer):
420                     return False
421         return True
422
423     def _get_task_para(self, task, cur_pod):
424         task_args = task.get('task_args', None)
425         if task_args is not None:
426             task_args = task_args.get(cur_pod, task_args.get('default'))
427         task_args_fnames = task.get('task_args_fnames', None)
428         if task_args_fnames is not None:
429             task_args_fnames = task_args_fnames.get(cur_pod, None)
430         return task_args, task_args_fnames
431
432     def parse_suite(self):
433         """parse the suite file and return a list of task config file paths
434            and lists of optional parameters if present"""
435         LOG.info("\nParsing suite file:%s", self.path)
436
437         try:
438             with open(self.path) as stream:
439                 cfg = yaml_load(stream)
440         except IOError as ioerror:
441             sys.exit(ioerror)
442
443         self._check_schema(cfg["schema"], "suite")
444         LOG.info("\nStarting scenario:%s", cfg["name"])
445
446         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
447         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
448                                       test_cases_dir)
449         if test_cases_dir[-1] != os.sep:
450             test_cases_dir += os.sep
451
452         cur_pod = os.environ.get('NODE_NAME', None)
453         cur_installer = os.environ.get('INSTALLER_TYPE', None)
454
455         valid_task_files = []
456         valid_task_args = []
457         valid_task_args_fnames = []
458
459         for task in cfg["test_cases"]:
460             # 1.check file_name
461             if "file_name" in task:
462                 task_fname = task.get('file_name', None)
463                 if task_fname is None:
464                     continue
465             else:
466                 continue
467             # 2.check constraint
468             if self._meet_constraint(task, cur_pod, cur_installer):
469                 valid_task_files.append(test_cases_dir + task_fname)
470             else:
471                 continue
472             # 3.fetch task parameters
473             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
474             valid_task_args.append(task_args)
475             valid_task_args_fnames.append(task_args_fnames)
476
477         return valid_task_files, valid_task_args, valid_task_args_fnames
478
479     def parse_task(self, task_id, task_args=None, task_args_file=None):
480         """parses the task file and return an context and scenario instances"""
481         LOG.info("Parsing task config: %s", self.path)
482
483         try:
484             kw = {}
485             if task_args_file:
486                 with open(task_args_file) as f:
487                     kw.update(parse_task_args("task_args_file", f.read()))
488             kw.update(parse_task_args("task_args", task_args))
489         except TypeError:
490             raise TypeError()
491
492         try:
493             with open(self.path) as f:
494                 try:
495                     input_task = f.read()
496                     rendered_task = TaskTemplate.render(input_task, **kw)
497                 except Exception as e:
498                     LOG.exception('Failed to render template:\n%s\n', input_task)
499                     raise e
500                 LOG.debug("Input task is:\n%s\n", rendered_task)
501
502                 cfg = yaml_load(rendered_task)
503         except IOError as ioerror:
504             sys.exit(ioerror)
505
506         self._check_schema(cfg["schema"], "task")
507         meet_precondition = self._check_precondition(cfg)
508
509         # TODO: support one or many contexts? Many would simpler and precise
510         # TODO: support hybrid context type
511         if "context" in cfg:
512             context_cfgs = [cfg["context"]]
513         elif "contexts" in cfg:
514             context_cfgs = cfg["contexts"]
515         else:
516             context_cfgs = [{"type": "Dummy"}]
517
518         contexts = []
519         for cfg_attrs in context_cfgs:
520
521             cfg_attrs['task_id'] = task_id
522             # default to Heat context because we are testing OpenStack
523             context_type = cfg_attrs.get("type", "Heat")
524             context = Context.get(context_type)
525             context.init(cfg_attrs)
526             contexts.append(context)
527
528         run_in_parallel = cfg.get("run_in_parallel", False)
529
530         # add tc and task id for influxdb extended tags
531         for scenario in cfg["scenarios"]:
532             task_name = os.path.splitext(os.path.basename(self.path))[0]
533             scenario["tc"] = task_name
534             scenario["task_id"] = task_id
535             # embed task path into scenario so we can load other files
536             # relative to task path
537             scenario["task_path"] = os.path.dirname(self.path)
538
539             self._change_node_names(scenario, contexts)
540
541         # TODO we need something better here, a class that represent the file
542         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
543
544     @staticmethod
545     def _change_node_names(scenario, contexts):
546         """Change the node names in a scenario, depending on the context config
547
548         The nodes (VMs or physical servers) are referred in the context section
549         with the name of the server and the name of the context:
550             <server name>.<context name>
551
552         If the context is going to be undeployed at the end of the test, the
553         task ID is suffixed to the name to avoid interferences with previous
554         deployments. If the context needs to be deployed at the end of the
555         test, the name assigned is kept.
556
557         There are several places where a node name could appear in the scenario
558         configuration:
559         scenario:
560           host: athena.demo
561           target: kratos.demo
562           targets:
563             - athena.demo
564             - kratos.demo
565
566         scenario:
567           options:
568             server_name:  # JIRA: YARDSTICK-810
569               host: athena.demo
570               target: kratos.demo
571
572         scenario:
573           nodes:
574             tg__0: tg_0.yardstick
575             vnf__0: vnf_0.yardstick
576         """
577         def qualified_name(name):
578             node_name, context_name = name.split('.')
579             try:
580                 ctx = next((context for context in contexts
581                        if context.assigned_name == context_name))
582             except StopIteration:
583                 raise exceptions.ScenarioConfigContextNameNotFound(
584                     context_name=context_name)
585
586             return '{}.{}'.format(node_name, ctx.name)
587
588         if 'host' in scenario:
589             scenario['host'] = qualified_name(scenario['host'])
590         if 'target' in scenario:
591             scenario['target'] = qualified_name(scenario['target'])
592         server_name = scenario.get('options', {}).get('server_name', {})
593         if 'host' in server_name:
594             server_name['host'] = qualified_name(server_name['host'])
595         if 'target' in server_name:
596             server_name['target'] = qualified_name(server_name['target'])
597         if 'targets' in scenario:
598             for idx, target in enumerate(scenario['targets']):
599                 scenario['targets'][idx] = qualified_name(target)
600         if 'nodes' in scenario:
601             for scenario_node, target in scenario['nodes'].items():
602                 scenario['nodes'][scenario_node] = qualified_name(target)
603
604     def _check_schema(self, cfg_schema, schema_type):
605         """Check if config file is using the correct schema type"""
606
607         if cfg_schema != "yardstick:" + schema_type + ":0.1":
608             sys.exit("error: file %s has unknown schema %s" % (self.path,
609                                                                cfg_schema))
610
611     def _check_precondition(self, cfg):
612         """Check if the environment meet the precondition"""
613
614         if "precondition" in cfg:
615             precondition = cfg["precondition"]
616             installer_type = precondition.get("installer_type", None)
617             deploy_scenarios = precondition.get("deploy_scenarios", None)
618             tc_fit_pods = precondition.get("pod_name", None)
619             installer_type_env = os.environ.get('INSTALL_TYPE', None)
620             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
621             pod_name_env = os.environ.get('NODE_NAME', None)
622
623             LOG.info("installer_type: %s, installer_type_env: %s",
624                      installer_type, installer_type_env)
625             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
626                      deploy_scenarios, deploy_scenario_env)
627             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
628                      tc_fit_pods, pod_name_env)
629             if installer_type and installer_type_env:
630                 if installer_type_env not in installer_type:
631                     return False
632             if deploy_scenarios and deploy_scenario_env:
633                 deploy_scenarios_list = deploy_scenarios.split(',')
634                 for deploy_scenario in deploy_scenarios_list:
635                     if deploy_scenario_env.startswith(deploy_scenario):
636                         return True
637                 return False
638             if tc_fit_pods and pod_name_env:
639                 if pod_name_env not in tc_fit_pods:
640                     return False
641         return True
642
643
644 def is_ip_addr(addr):
645     """check if string addr is an IP address"""
646     try:
647         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
648     except AttributeError:
649         pass
650
651     try:
652         ipaddress.ip_address(addr.encode('utf-8'))
653     except ValueError:
654         return False
655     else:
656         return True
657
658
659 def _is_background_scenario(scenario):
660     if "run_in_background" in scenario:
661         return scenario["run_in_background"]
662     else:
663         return False
664
665
666 def parse_nodes_with_context(scenario_cfg):
667     """parse the 'nodes' fields in scenario """
668     # ensure consistency in node instantiation order
669     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
670                        for nodename in sorted(scenario_cfg["nodes"]))
671
672
673 def get_networks_from_nodes(nodes):
674     """parse the 'nodes' fields in scenario """
675     networks = {}
676     for node in nodes.values():
677         if not node:
678             continue
679         interfaces = node.get('interfaces', {})
680         for interface in interfaces.values():
681             # vld_id is network_name
682             network_name = interface.get('network_name')
683             if not network_name:
684                 continue
685             network = Context.get_network(network_name)
686             if network:
687                 networks[network['name']] = network
688     return networks
689
690
691 def runner_join(runner, background_runners, outputs, result):
692     """join (wait for) a runner, exit process at runner failure
693     :param background_runners:
694     :type background_runners:
695     :param outputs:
696     :type outputs: dict
697     :param result:
698     :type result: list
699     """
700     while runner.poll() is None:
701         outputs.update(runner.get_output())
702         result.extend(runner.get_result())
703         # drain all the background runner queues
704         for background in background_runners:
705             outputs.update(background.get_output())
706             result.extend(background.get_result())
707     status = runner.join(outputs, result)
708     base_runner.Runner.release(runner)
709     return status
710
711
712 def print_invalid_header(source_name, args):
713     print("Invalid %(source)s passed:\n\n %(args)s\n"
714           % {"source": source_name, "args": args})
715
716
717 def parse_task_args(src_name, args):
718     if isinstance(args, collections.Mapping):
719         return args
720
721     try:
722         kw = args and yaml_load(args)
723         kw = {} if kw is None else kw
724     except yaml.parser.ParserError as e:
725         print_invalid_header(src_name, args)
726         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
727               % {"source": src_name, "err": e})
728         raise TypeError()
729
730     if not isinstance(kw, dict):
731         print_invalid_header(src_name, args)
732         print("%(src)s had to be dict, actually %(src_type)s\n"
733               % {"src": src_name, "src_type": type(kw)})
734         raise TypeError()
735     return kw