Merge "Add methods to get an existing stack"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10
11 import sys
12 import os
13 from collections import OrderedDict
14
15 import yaml
16 import atexit
17 import ipaddress
18 import time
19 import logging
20 import uuid
21 import collections
22
23 from six.moves import filter
24 from jinja2 import Environment
25
26 from yardstick.benchmark.contexts.base import Context
27 from yardstick.benchmark.runners import base as base_runner
28 from yardstick.common.constants import CONF_FILE
29 from yardstick.common.yaml_loader import yaml_load
30 from yardstick.dispatcher.base import Base as DispatcherBase
31 from yardstick.common.task_template import TaskTemplate
32 from yardstick.common import utils
33 from yardstick.common import constants
34 from yardstick.common import exceptions
35 from yardstick.common.html_template import report_template
36
37 output_file_default = "/tmp/yardstick.out"
38 test_cases_dir_default = "tests/opnfv/test_cases/"
39 LOG = logging.getLogger(__name__)
40
41
42 class Task(object):     # pragma: no cover
43     """Task commands.
44
45        Set of commands to manage benchmark tasks.
46     """
47
48     def __init__(self):
49         self.contexts = []
50         self.outputs = {}
51
52     def _set_dispatchers(self, output_config):
53         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
54                                                            'file')
55         out_types = [s.strip() for s in dispatchers.split(',')]
56         output_config['DEFAULT']['dispatcher'] = out_types
57
58     def start(self, args):
59         """Start a benchmark scenario."""
60
61         atexit.register(self.atexit_handler)
62
63         task_id = getattr(args, 'task_id')
64         self.task_id = task_id if task_id else str(uuid.uuid4())
65
66         self._set_log()
67
68         try:
69             output_config = utils.parse_ini_file(CONF_FILE)
70         except Exception:  # pylint: disable=broad-except
71             # all error will be ignore, the default value is {}
72             output_config = {}
73
74         self._init_output_config(output_config)
75         self._set_output_config(output_config, args.output_file)
76         LOG.debug('Output configuration is: %s', output_config)
77
78         self._set_dispatchers(output_config)
79
80         # update dispatcher list
81         if 'file' in output_config['DEFAULT']['dispatcher']:
82             result = {'status': 0, 'result': {}}
83             utils.write_json_to_file(args.output_file, result)
84
85         total_start_time = time.time()
86         parser = TaskParser(args.inputfile[0])
87
88         if args.suite:
89             # 1.parse suite, return suite_params info
90             task_files, task_args, task_args_fnames = \
91                 parser.parse_suite()
92         else:
93             task_files = [parser.path]
94             task_args = [args.task_args]
95             task_args_fnames = [args.task_args_file]
96
97         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
98                   task_files, task_args, task_args_fnames)
99
100         if args.parse_only:
101             sys.exit(0)
102
103         testcases = {}
104         # parse task_files
105         for i in range(0, len(task_files)):
106             one_task_start_time = time.time()
107             parser.path = task_files[i]
108             scenarios, run_in_parallel, meet_precondition, contexts = \
109                 parser.parse_task(self.task_id, task_args[i],
110                                   task_args_fnames[i])
111
112             self.contexts.extend(contexts)
113
114             if not meet_precondition:
115                 LOG.info("meet_precondition is %s, please check envrionment",
116                          meet_precondition)
117                 continue
118
119             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
120             try:
121                 data = self._run(scenarios, run_in_parallel, output_config)
122             except KeyboardInterrupt:
123                 raise
124             except Exception:  # pylint: disable=broad-except
125                 LOG.error('Testcase: "%s" FAILED!!!', case_name, exc_info=True)
126                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
127             else:
128                 LOG.info('Testcase: "%s" SUCCESS!!!', case_name)
129                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
130
131             if args.keep_deploy:
132                 # keep deployment, forget about stack
133                 # (hide it for exit handler)
134                 self.contexts = []
135             else:
136                 for context in self.contexts[::-1]:
137                     context.undeploy()
138                 self.contexts = []
139             one_task_end_time = time.time()
140             LOG.info("Task %s finished in %d secs", task_files[i],
141                      one_task_end_time - one_task_start_time)
142
143         result = self._get_format_result(testcases)
144
145         self._do_output(output_config, result)
146         self._generate_reporting(result)
147
148         total_end_time = time.time()
149         LOG.info("Total finished in %d secs",
150                  total_end_time - total_start_time)
151
152         scenario = scenarios[0]
153         LOG.info("To generate report, execute command "
154                  "'yardstick report generate %(task_id)s %(tc)s'", scenario)
155         LOG.info("Task ALL DONE, exiting")
156         return result
157
158     def _generate_reporting(self, result):
159         env = Environment()
160         with open(constants.REPORTING_FILE, 'w') as f:
161             f.write(env.from_string(report_template).render(result))
162
163         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
164
165     def _set_log(self):
166         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
167         log_formatter = logging.Formatter(log_format)
168
169         utils.makedirs(constants.TASK_LOG_DIR)
170         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
171         log_handler = logging.FileHandler(log_path)
172         log_handler.setFormatter(log_formatter)
173         log_handler.setLevel(logging.DEBUG)
174
175         logging.root.addHandler(log_handler)
176
177     def _init_output_config(self, output_config):
178         output_config.setdefault('DEFAULT', {})
179         output_config.setdefault('dispatcher_http', {})
180         output_config.setdefault('dispatcher_file', {})
181         output_config.setdefault('dispatcher_influxdb', {})
182         output_config.setdefault('nsb', {})
183
184     def _set_output_config(self, output_config, file_path):
185         try:
186             out_type = os.environ['DISPATCHER']
187         except KeyError:
188             output_config['DEFAULT'].setdefault('dispatcher', 'file')
189         else:
190             output_config['DEFAULT']['dispatcher'] = out_type
191
192         output_config['dispatcher_file']['file_path'] = file_path
193
194         try:
195             target = os.environ['TARGET']
196         except KeyError:
197             pass
198         else:
199             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
200             output_config[k]['target'] = target
201
202     def _get_format_result(self, testcases):
203         criteria = self._get_task_criteria(testcases)
204
205         info = {
206             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
207             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
208             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
209             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
210         }
211
212         result = {
213             'status': 1,
214             'result': {
215                 'criteria': criteria,
216                 'task_id': self.task_id,
217                 'info': info,
218                 'testcases': testcases
219             }
220         }
221
222         return result
223
224     def _get_task_criteria(self, testcases):
225         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
226         if criteria:
227             return 'FAIL'
228         else:
229             return 'PASS'
230
231     def _do_output(self, output_config, result):
232         dispatchers = DispatcherBase.get(output_config)
233         dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
234
235         for dispatcher in dispatchers:
236             dispatcher.flush_result_data(result)
237
238     def _run(self, scenarios, run_in_parallel, output_config):
239         """Deploys context and calls runners"""
240         for context in self.contexts:
241             context.deploy()
242
243         background_runners = []
244
245         result = []
246         # Start all background scenarios
247         for scenario in filter(_is_background_scenario, scenarios):
248             scenario["runner"] = dict(type="Duration", duration=1000000000)
249             runner = self.run_one_scenario(scenario, output_config)
250             background_runners.append(runner)
251
252         runners = []
253         if run_in_parallel:
254             for scenario in scenarios:
255                 if not _is_background_scenario(scenario):
256                     runner = self.run_one_scenario(scenario, output_config)
257                     runners.append(runner)
258
259             # Wait for runners to finish
260             for runner in runners:
261                 status = runner_join(runner, background_runners, self.outputs, result)
262                 if status != 0:
263                     raise RuntimeError(
264                         "{0} runner status {1}".format(runner.__execution_type__, status))
265                 LOG.info("Runner ended")
266         else:
267             # run serially
268             for scenario in scenarios:
269                 if not _is_background_scenario(scenario):
270                     runner = self.run_one_scenario(scenario, output_config)
271                     status = runner_join(runner, background_runners, self.outputs, result)
272                     if status != 0:
273                         LOG.error('Scenario NO.%s: "%s" ERROR!',
274                                   scenarios.index(scenario) + 1,
275                                   scenario.get('type'))
276                         raise RuntimeError(
277                             "{0} runner status {1}".format(runner.__execution_type__, status))
278                     LOG.info("Runner ended")
279
280         # Abort background runners
281         for runner in background_runners:
282             runner.abort()
283
284         # Wait for background runners to finish
285         for runner in background_runners:
286             status = runner.join(self.outputs, result)
287             if status is None:
288                 # Nuke if it did not stop nicely
289                 base_runner.Runner.terminate(runner)
290                 runner.join(self.outputs, result)
291             base_runner.Runner.release(runner)
292
293             print("Background task ended")
294         return result
295
296     def atexit_handler(self):
297         """handler for process termination"""
298         base_runner.Runner.terminate_all()
299
300         if self.contexts:
301             LOG.info("Undeploying all contexts")
302             for context in self.contexts[::-1]:
303                 context.undeploy()
304
305     def _parse_options(self, op):
306         if isinstance(op, dict):
307             return {k: self._parse_options(v) for k, v in op.items()}
308         elif isinstance(op, list):
309             return [self._parse_options(v) for v in op]
310         elif isinstance(op, str):
311             return self.outputs.get(op[1:]) if op.startswith('$') else op
312         else:
313             return op
314
315     def run_one_scenario(self, scenario_cfg, output_config):
316         """run one scenario using context"""
317         runner_cfg = scenario_cfg["runner"]
318         runner_cfg['output_config'] = output_config
319
320         options = scenario_cfg.get('options', {})
321         scenario_cfg['options'] = self._parse_options(options)
322
323         # TODO support get multi hosts/vms info
324         context_cfg = {}
325         server_name = scenario_cfg.get('options', {}).get('server_name', {})
326
327         def config_context_target(cfg):
328             target = cfg['target']
329             if is_ip_addr(target):
330                 context_cfg['target'] = {"ipaddr": target}
331             else:
332                 context_cfg['target'] = Context.get_server(target)
333                 if self._is_same_context(cfg["host"], target):
334                     context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
335                 else:
336                     context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
337
338         host_name = server_name.get('host', scenario_cfg.get('host'))
339         if host_name:
340             context_cfg['host'] = Context.get_server(host_name)
341
342         for item in [server_name, scenario_cfg]:
343             try:
344                 config_context_target(item)
345             except KeyError:
346                 pass
347             else:
348                 break
349
350         if "targets" in scenario_cfg:
351             ip_list = []
352             for target in scenario_cfg["targets"]:
353                 if is_ip_addr(target):
354                     ip_list.append(target)
355                     context_cfg['target'] = {}
356                 else:
357                     context_cfg['target'] = Context.get_server(target)
358                     if self._is_same_context(scenario_cfg["host"],
359                                              target):
360                         ip_list.append(context_cfg["target"]["private_ip"])
361                     else:
362                         ip_list.append(context_cfg["target"]["ip"])
363             context_cfg['target']['ipaddr'] = ','.join(ip_list)
364
365         if "nodes" in scenario_cfg:
366             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
367             context_cfg["networks"] = get_networks_from_nodes(
368                 context_cfg["nodes"])
369
370         runner = base_runner.Runner.get(runner_cfg)
371
372         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
373         runner.run(scenario_cfg, context_cfg)
374
375         return runner
376
377     def _is_same_context(self, host_attr, target_attr):
378         """check if two servers are in the same heat context
379         host_attr: either a name for a server created by yardstick or a dict
380         with attribute name mapping when using external heat templates
381         target_attr: either a name for a server created by yardstick or a dict
382         with attribute name mapping when using external heat templates
383         """
384         for context in self.contexts:
385             if context.__context_type__ not in {"Heat", "Kubernetes"}:
386                 continue
387
388             host = context._get_server(host_attr)
389             if host is None:
390                 continue
391
392             target = context._get_server(target_attr)
393             if target is None:
394                 return False
395
396             # Both host and target is not None, then they are in the
397             # same heat context.
398             return True
399
400         return False
401
402
403 class TaskParser(object):       # pragma: no cover
404     """Parser for task config files in yaml format"""
405
406     def __init__(self, path):
407         self.path = path
408
409     def _meet_constraint(self, task, cur_pod, cur_installer):
410         if "constraint" in task:
411             constraint = task.get('constraint', None)
412             if constraint is not None:
413                 tc_fit_pod = constraint.get('pod', None)
414                 tc_fit_installer = constraint.get('installer', None)
415                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
416                          cur_pod, cur_installer, constraint)
417                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
418                     return False
419                 if (cur_installer is None) or (tc_fit_installer and cur_installer
420                                                not in tc_fit_installer):
421                     return False
422         return True
423
424     def _get_task_para(self, task, cur_pod):
425         task_args = task.get('task_args', None)
426         if task_args is not None:
427             task_args = task_args.get(cur_pod, task_args.get('default'))
428         task_args_fnames = task.get('task_args_fnames', None)
429         if task_args_fnames is not None:
430             task_args_fnames = task_args_fnames.get(cur_pod, None)
431         return task_args, task_args_fnames
432
433     def parse_suite(self):
434         """parse the suite file and return a list of task config file paths
435            and lists of optional parameters if present"""
436         LOG.info("\nParsing suite file:%s", self.path)
437
438         try:
439             with open(self.path) as stream:
440                 cfg = yaml_load(stream)
441         except IOError as ioerror:
442             sys.exit(ioerror)
443
444         self._check_schema(cfg["schema"], "suite")
445         LOG.info("\nStarting scenario:%s", cfg["name"])
446
447         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
448         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
449                                       test_cases_dir)
450         if test_cases_dir[-1] != os.sep:
451             test_cases_dir += os.sep
452
453         cur_pod = os.environ.get('NODE_NAME', None)
454         cur_installer = os.environ.get('INSTALLER_TYPE', None)
455
456         valid_task_files = []
457         valid_task_args = []
458         valid_task_args_fnames = []
459
460         for task in cfg["test_cases"]:
461             # 1.check file_name
462             if "file_name" in task:
463                 task_fname = task.get('file_name', None)
464                 if task_fname is None:
465                     continue
466             else:
467                 continue
468             # 2.check constraint
469             if self._meet_constraint(task, cur_pod, cur_installer):
470                 valid_task_files.append(test_cases_dir + task_fname)
471             else:
472                 continue
473             # 3.fetch task parameters
474             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
475             valid_task_args.append(task_args)
476             valid_task_args_fnames.append(task_args_fnames)
477
478         return valid_task_files, valid_task_args, valid_task_args_fnames
479
480     def parse_task(self, task_id, task_args=None, task_args_file=None):
481         """parses the task file and return an context and scenario instances"""
482         LOG.info("Parsing task config: %s", self.path)
483
484         try:
485             kw = {}
486             if task_args_file:
487                 with open(task_args_file) as f:
488                     kw.update(parse_task_args("task_args_file", f.read()))
489             kw.update(parse_task_args("task_args", task_args))
490         except TypeError:
491             raise TypeError()
492
493         try:
494             with open(self.path) as f:
495                 try:
496                     input_task = f.read()
497                     rendered_task = TaskTemplate.render(input_task, **kw)
498                 except Exception as e:
499                     LOG.exception('Failed to render template:\n%s\n', input_task)
500                     raise e
501                 LOG.debug("Input task is:\n%s\n", rendered_task)
502
503                 cfg = yaml_load(rendered_task)
504         except IOError as ioerror:
505             sys.exit(ioerror)
506
507         self._check_schema(cfg["schema"], "task")
508         meet_precondition = self._check_precondition(cfg)
509
510         # TODO: support one or many contexts? Many would simpler and precise
511         # TODO: support hybrid context type
512         if "context" in cfg:
513             context_cfgs = [cfg["context"]]
514         elif "contexts" in cfg:
515             context_cfgs = cfg["contexts"]
516         else:
517             context_cfgs = [{"type": "Dummy"}]
518
519         contexts = []
520         for cfg_attrs in context_cfgs:
521
522             cfg_attrs['task_id'] = task_id
523             # default to Heat context because we are testing OpenStack
524             context_type = cfg_attrs.get("type", "Heat")
525             context = Context.get(context_type)
526             context.init(cfg_attrs)
527             contexts.append(context)
528
529         run_in_parallel = cfg.get("run_in_parallel", False)
530
531         # add tc and task id for influxdb extended tags
532         for scenario in cfg["scenarios"]:
533             task_name = os.path.splitext(os.path.basename(self.path))[0]
534             scenario["tc"] = task_name
535             scenario["task_id"] = task_id
536             # embed task path into scenario so we can load other files
537             # relative to task path
538             scenario["task_path"] = os.path.dirname(self.path)
539
540             self._change_node_names(scenario, contexts)
541
542         # TODO we need something better here, a class that represent the file
543         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
544
545     @staticmethod
546     def _change_node_names(scenario, contexts):
547         """Change the node names in a scenario, depending on the context config
548
549         The nodes (VMs or physical servers) are referred in the context section
550         with the name of the server and the name of the context:
551             <server name>.<context name>
552
553         If the context is going to be undeployed at the end of the test, the
554         task ID is suffixed to the name to avoid interferences with previous
555         deployments. If the context needs to be deployed at the end of the
556         test, the name assigned is kept.
557
558         There are several places where a node name could appear in the scenario
559         configuration:
560         scenario:
561           host: athena.demo
562           target: kratos.demo
563           targets:
564             - athena.demo
565             - kratos.demo
566
567         scenario:
568           options:
569             server_name:  # JIRA: YARDSTICK-810
570               host: athena.demo
571               target: kratos.demo
572
573         scenario:
574           nodes:
575             tg__0: tg_0.yardstick
576             vnf__0: vnf_0.yardstick
577         """
578         def qualified_name(name):
579             node_name, context_name = name.split('.')
580             try:
581                 ctx = next((context for context in contexts
582                        if context.assigned_name == context_name))
583             except StopIteration:
584                 raise exceptions.ScenarioConfigContextNameNotFound(
585                     context_name=context_name)
586
587             return '{}.{}'.format(node_name, ctx.name)
588
589         if 'host' in scenario:
590             scenario['host'] = qualified_name(scenario['host'])
591         if 'target' in scenario:
592             scenario['target'] = qualified_name(scenario['target'])
593         server_name = scenario.get('options', {}).get('server_name', {})
594         if 'host' in server_name:
595             server_name['host'] = qualified_name(server_name['host'])
596         if 'target' in server_name:
597             server_name['target'] = qualified_name(server_name['target'])
598         if 'targets' in scenario:
599             for idx, target in enumerate(scenario['targets']):
600                 scenario['targets'][idx] = qualified_name(target)
601         if 'nodes' in scenario:
602             for scenario_node, target in scenario['nodes'].items():
603                 scenario['nodes'][scenario_node] = qualified_name(target)
604
605     def _check_schema(self, cfg_schema, schema_type):
606         """Check if config file is using the correct schema type"""
607
608         if cfg_schema != "yardstick:" + schema_type + ":0.1":
609             sys.exit("error: file %s has unknown schema %s" % (self.path,
610                                                                cfg_schema))
611
612     def _check_precondition(self, cfg):
613         """Check if the environment meet the precondition"""
614
615         if "precondition" in cfg:
616             precondition = cfg["precondition"]
617             installer_type = precondition.get("installer_type", None)
618             deploy_scenarios = precondition.get("deploy_scenarios", None)
619             tc_fit_pods = precondition.get("pod_name", None)
620             installer_type_env = os.environ.get('INSTALL_TYPE', None)
621             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
622             pod_name_env = os.environ.get('NODE_NAME', None)
623
624             LOG.info("installer_type: %s, installer_type_env: %s",
625                      installer_type, installer_type_env)
626             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
627                      deploy_scenarios, deploy_scenario_env)
628             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
629                      tc_fit_pods, pod_name_env)
630             if installer_type and installer_type_env:
631                 if installer_type_env not in installer_type:
632                     return False
633             if deploy_scenarios and deploy_scenario_env:
634                 deploy_scenarios_list = deploy_scenarios.split(',')
635                 for deploy_scenario in deploy_scenarios_list:
636                     if deploy_scenario_env.startswith(deploy_scenario):
637                         return True
638                 return False
639             if tc_fit_pods and pod_name_env:
640                 if pod_name_env not in tc_fit_pods:
641                     return False
642         return True
643
644
645 def is_ip_addr(addr):
646     """check if string addr is an IP address"""
647     try:
648         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
649     except AttributeError:
650         pass
651
652     try:
653         ipaddress.ip_address(addr.encode('utf-8'))
654     except ValueError:
655         return False
656     else:
657         return True
658
659
660 def _is_background_scenario(scenario):
661     if "run_in_background" in scenario:
662         return scenario["run_in_background"]
663     else:
664         return False
665
666
667 def parse_nodes_with_context(scenario_cfg):
668     """parse the 'nodes' fields in scenario """
669     # ensure consistency in node instantiation order
670     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
671                        for nodename in sorted(scenario_cfg["nodes"]))
672
673
674 def get_networks_from_nodes(nodes):
675     """parse the 'nodes' fields in scenario """
676     networks = {}
677     for node in nodes.values():
678         if not node:
679             continue
680         interfaces = node.get('interfaces', {})
681         for interface in interfaces.values():
682             # vld_id is network_name
683             network_name = interface.get('network_name')
684             if not network_name:
685                 continue
686             network = Context.get_network(network_name)
687             if network:
688                 networks[network['name']] = network
689     return networks
690
691
692 def runner_join(runner, background_runners, outputs, result):
693     """join (wait for) a runner, exit process at runner failure
694     :param background_runners:
695     :type background_runners:
696     :param outputs:
697     :type outputs: dict
698     :param result:
699     :type result: list
700     """
701     while runner.poll() is None:
702         outputs.update(runner.get_output())
703         result.extend(runner.get_result())
704         # drain all the background runner queues
705         for background in background_runners:
706             outputs.update(background.get_output())
707             result.extend(background.get_result())
708     status = runner.join(outputs, result)
709     base_runner.Runner.release(runner)
710     return status
711
712
713 def print_invalid_header(source_name, args):
714     print("Invalid %(source)s passed:\n\n %(args)s\n"
715           % {"source": source_name, "args": args})
716
717
718 def parse_task_args(src_name, args):
719     if isinstance(args, collections.Mapping):
720         return args
721
722     try:
723         kw = args and yaml_load(args)
724         kw = {} if kw is None else kw
725     except yaml.parser.ParserError as e:
726         print_invalid_header(src_name, args)
727         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
728               % {"source": src_name, "err": e})
729         raise TypeError()
730
731     if not isinstance(kw, dict):
732         print_invalid_header(src_name, args)
733         print("%(src)s had to be dict, actually %(src_type)s\n"
734               % {"src": src_name, "src_type": type(kw)})
735         raise TypeError()
736     return kw