Merge "YAML fixes"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import errno
25 import collections
26
27 from six.moves import filter
28 from jinja2 import Environment
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.runners import base as base_runner
32 from yardstick.common.yaml_loader import yaml_load
33 from yardstick.dispatcher.base import Base as DispatcherBase
34 from yardstick.common.task_template import TaskTemplate
35 from yardstick.common.utils import source_env
36 from yardstick.common import utils
37 from yardstick.common import constants
38 from yardstick.common.html_template import report_template
39
40 output_file_default = "/tmp/yardstick.out"
41 config_file = '/etc/yardstick/yardstick.conf'
42 test_cases_dir_default = "tests/opnfv/test_cases/"
43 LOG = logging.getLogger(__name__)
44 JOIN_TIMEOUT = 60
45
46
47 class Task(object):     # pragma: no cover
48     """Task commands.
49
50        Set of commands to manage benchmark tasks.
51     """
52
53     def __init__(self):
54         self.contexts = []
55         self.outputs = {}
56
57     def _set_dispatchers(self, output_config):
58         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
59                                                            'file')
60         out_types = [s.strip() for s in dispatchers.split(',')]
61         output_config['DEFAULT']['dispatcher'] = out_types
62
63     def start(self, args, **kwargs):
64         """Start a benchmark scenario."""
65
66         atexit.register(self.atexit_handler)
67
68         task_id = getattr(args, 'task_id')
69         self.task_id = task_id if task_id else str(uuid.uuid4())
70
71         self._set_log()
72
73         check_environment()
74
75         try:
76             output_config = utils.parse_ini_file(config_file)
77         except Exception:
78             # all error will be ignore, the default value is {}
79             output_config = {}
80
81         self._init_output_config(output_config)
82         self._set_output_config(output_config, args.output_file)
83         LOG.debug('Output configuration is: %s', output_config)
84
85         self._set_dispatchers(output_config)
86
87         # update dispatcher list
88         if 'file' in output_config['DEFAULT']['dispatcher']:
89             result = {'status': 0, 'result': {}}
90             utils.write_json_to_file(args.output_file, result)
91
92         total_start_time = time.time()
93         parser = TaskParser(args.inputfile[0])
94
95         if args.suite:
96             # 1.parse suite, return suite_params info
97             task_files, task_args, task_args_fnames = \
98                 parser.parse_suite()
99         else:
100             task_files = [parser.path]
101             task_args = [args.task_args]
102             task_args_fnames = [args.task_args_file]
103
104         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
105                  task_files, task_args, task_args_fnames)
106
107         if args.parse_only:
108             sys.exit(0)
109
110         testcases = {}
111         # parse task_files
112         for i in range(0, len(task_files)):
113             one_task_start_time = time.time()
114             parser.path = task_files[i]
115             scenarios, run_in_parallel, meet_precondition, contexts = \
116                 parser.parse_task(self.task_id, task_args[i],
117                                   task_args_fnames[i])
118
119             self.contexts.extend(contexts)
120
121             if not meet_precondition:
122                 LOG.info("meet_precondition is %s, please check envrionment",
123                          meet_precondition)
124                 continue
125
126             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
127             try:
128                 data = self._run(scenarios, run_in_parallel, args.output_file)
129             except KeyboardInterrupt:
130                 raise
131             except Exception:
132                 LOG.exception("Running test case %s failed!", case_name)
133                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
134             else:
135                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
136
137             if args.keep_deploy:
138                 # keep deployment, forget about stack
139                 # (hide it for exit handler)
140                 self.contexts = []
141             else:
142                 for context in self.contexts[::-1]:
143                     context.undeploy()
144                 self.contexts = []
145             one_task_end_time = time.time()
146             LOG.info("task %s finished in %d secs", task_files[i],
147                      one_task_end_time - one_task_start_time)
148
149         result = self._get_format_result(testcases)
150
151         self._do_output(output_config, result)
152         self._generate_reporting(result)
153
154         total_end_time = time.time()
155         LOG.info("total finished in %d secs",
156                  total_end_time - total_start_time)
157
158         scenario = scenarios[0]
159         print("To generate report execute => yardstick report generate ",
160               scenario['task_id'], scenario['tc'])
161
162         print("Done, exiting")
163         return result
164
165     def _generate_reporting(self, result):
166         env = Environment()
167         with open(constants.REPORTING_FILE, 'w') as f:
168             f.write(env.from_string(report_template).render(result))
169
170         LOG.info('yardstick reporting generate in %s', constants.REPORTING_FILE)
171
172     def _set_log(self):
173         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
174         log_formatter = logging.Formatter(log_format)
175
176         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
177         log_handler = logging.FileHandler(log_path)
178         log_handler.setFormatter(log_formatter)
179         log_handler.setLevel(logging.DEBUG)
180
181         logging.root.addHandler(log_handler)
182
183     def _init_output_config(self, output_config):
184         output_config.setdefault('DEFAULT', {})
185         output_config.setdefault('dispatcher_http', {})
186         output_config.setdefault('dispatcher_file', {})
187         output_config.setdefault('dispatcher_influxdb', {})
188         output_config.setdefault('nsb', {})
189
190     def _set_output_config(self, output_config, file_path):
191         try:
192             out_type = os.environ['DISPATCHER']
193         except KeyError:
194             output_config['DEFAULT'].setdefault('dispatcher', 'file')
195         else:
196             output_config['DEFAULT']['dispatcher'] = out_type
197
198         output_config['dispatcher_file']['file_path'] = file_path
199
200         try:
201             target = os.environ['TARGET']
202         except KeyError:
203             pass
204         else:
205             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
206             output_config[k]['target'] = target
207
208     def _get_format_result(self, testcases):
209         criteria = self._get_task_criteria(testcases)
210
211         info = {
212             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
213             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
214             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
215             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
216         }
217
218         result = {
219             'status': 1,
220             'result': {
221                 'criteria': criteria,
222                 'task_id': self.task_id,
223                 'info': info,
224                 'testcases': testcases
225             }
226         }
227
228         return result
229
230     def _get_task_criteria(self, testcases):
231         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
232         if criteria:
233             return 'FAIL'
234         else:
235             return 'PASS'
236
237     def _do_output(self, output_config, result):
238         dispatchers = DispatcherBase.get(output_config)
239
240         for dispatcher in dispatchers:
241             dispatcher.flush_result_data(result)
242
243     def _run(self, scenarios, run_in_parallel, output_file):
244         """Deploys context and calls runners"""
245         for context in self.contexts:
246             context.deploy()
247
248         background_runners = []
249
250         result = []
251         # Start all background scenarios
252         for scenario in filter(_is_background_scenario, scenarios):
253             scenario["runner"] = dict(type="Duration", duration=1000000000)
254             runner = self.run_one_scenario(scenario, output_file)
255             background_runners.append(runner)
256
257         runners = []
258         if run_in_parallel:
259             for scenario in scenarios:
260                 if not _is_background_scenario(scenario):
261                     runner = self.run_one_scenario(scenario, output_file)
262                     runners.append(runner)
263
264             # Wait for runners to finish
265             for runner in runners:
266                 status = runner_join(runner)
267                 if status != 0:
268                     raise RuntimeError
269                 self.outputs.update(runner.get_output())
270                 result.extend(runner.get_result())
271                 print("Runner ended, output in", output_file)
272         else:
273             # run serially
274             for scenario in scenarios:
275                 if not _is_background_scenario(scenario):
276                     runner = self.run_one_scenario(scenario, output_file)
277                     status = runner_join(runner)
278                     if status != 0:
279                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
280                         raise RuntimeError
281                     self.outputs.update(runner.get_output())
282                     result.extend(runner.get_result())
283                     print("Runner ended, output in", output_file)
284
285         # Abort background runners
286         for runner in background_runners:
287             runner.abort()
288
289         # Wait for background runners to finish
290         for runner in background_runners:
291             status = runner.join(JOIN_TIMEOUT)
292             if status is None:
293                 # Nuke if it did not stop nicely
294                 base_runner.Runner.terminate(runner)
295                 runner.join(JOIN_TIMEOUT)
296             base_runner.Runner.release(runner)
297
298             self.outputs.update(runner.get_output())
299             result.extend(runner.get_result())
300             print("Background task ended")
301         return result
302
303     def atexit_handler(self):
304         """handler for process termination"""
305         base_runner.Runner.terminate_all()
306
307         if self.contexts:
308             print("Undeploying all contexts")
309             for context in self.contexts[::-1]:
310                 context.undeploy()
311
312     def _parse_options(self, op):
313         if isinstance(op, dict):
314             return {k: self._parse_options(v) for k, v in op.items()}
315         elif isinstance(op, list):
316             return [self._parse_options(v) for v in op]
317         elif isinstance(op, str):
318             return self.outputs.get(op[1:]) if op.startswith('$') else op
319         else:
320             return op
321
322     def run_one_scenario(self, scenario_cfg, output_file):
323         """run one scenario using context"""
324         runner_cfg = scenario_cfg["runner"]
325         runner_cfg['output_filename'] = output_file
326
327         options = scenario_cfg.get('options', {})
328         scenario_cfg['options'] = self._parse_options(options)
329
330         # TODO support get multi hosts/vms info
331         context_cfg = {}
332         if "host" in scenario_cfg:
333             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
334
335         if "target" in scenario_cfg:
336             if is_ip_addr(scenario_cfg["target"]):
337                 context_cfg['target'] = {}
338                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
339             else:
340                 context_cfg['target'] = Context.get_server(
341                     scenario_cfg["target"])
342                 if self._is_same_heat_context(scenario_cfg["host"],
343                                               scenario_cfg["target"]):
344                     context_cfg["target"]["ipaddr"] = \
345                         context_cfg["target"]["private_ip"]
346                 else:
347                     context_cfg["target"]["ipaddr"] = \
348                         context_cfg["target"]["ip"]
349
350         if "targets" in scenario_cfg:
351             ip_list = []
352             for target in scenario_cfg["targets"]:
353                 if is_ip_addr(target):
354                     ip_list.append(target)
355                     context_cfg['target'] = {}
356                 else:
357                     context_cfg['target'] = Context.get_server(target)
358                     if self._is_same_heat_context(scenario_cfg["host"],
359                                                   target):
360                         ip_list.append(context_cfg["target"]["private_ip"])
361                     else:
362                         ip_list.append(context_cfg["target"]["ip"])
363             context_cfg['target']['ipaddr'] = ','.join(ip_list)
364
365         if "nodes" in scenario_cfg:
366             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
367             context_cfg["networks"] = get_networks_from_nodes(
368                 context_cfg["nodes"])
369         runner = base_runner.Runner.get(runner_cfg)
370
371         print("Starting runner of type '%s'" % runner_cfg["type"])
372         runner.run(scenario_cfg, context_cfg)
373
374         return runner
375
376     def _is_same_heat_context(self, host_attr, target_attr):
377         """check if two servers are in the same heat context
378         host_attr: either a name for a server created by yardstick or a dict
379         with attribute name mapping when using external heat templates
380         target_attr: either a name for a server created by yardstick or a dict
381         with attribute name mapping when using external heat templates
382         """
383         host = None
384         target = None
385         for context in self.contexts:
386             if context.__context_type__ != "Heat":
387                 continue
388
389             host = context._get_server(host_attr)
390             if host is None:
391                 continue
392
393             target = context._get_server(target_attr)
394             if target is None:
395                 return False
396
397             # Both host and target is not None, then they are in the
398             # same heat context.
399             return True
400
401         return False
402
403
404 class TaskParser(object):       # pragma: no cover
405     """Parser for task config files in yaml format"""
406
407     def __init__(self, path):
408         self.path = path
409
410     def _meet_constraint(self, task, cur_pod, cur_installer):
411         if "constraint" in task:
412             constraint = task.get('constraint', None)
413             if constraint is not None:
414                 tc_fit_pod = constraint.get('pod', None)
415                 tc_fit_installer = constraint.get('installer', None)
416                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
417                          cur_pod, cur_installer, constraint)
418                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
419                     return False
420                 if (cur_installer is None) or (tc_fit_installer and cur_installer
421                                                not in tc_fit_installer):
422                     return False
423         return True
424
425     def _get_task_para(self, task, cur_pod):
426         task_args = task.get('task_args', None)
427         if task_args is not None:
428             task_args = task_args.get(cur_pod, task_args.get('default'))
429         task_args_fnames = task.get('task_args_fnames', None)
430         if task_args_fnames is not None:
431             task_args_fnames = task_args_fnames.get(cur_pod, None)
432         return task_args, task_args_fnames
433
434     def parse_suite(self):
435         """parse the suite file and return a list of task config file paths
436            and lists of optional parameters if present"""
437         LOG.info("\nParsing suite file:%s", self.path)
438
439         try:
440             with open(self.path) as stream:
441                 cfg = yaml_load(stream)
442         except IOError as ioerror:
443             sys.exit(ioerror)
444
445         self._check_schema(cfg["schema"], "suite")
446         LOG.info("\nStarting scenario:%s", cfg["name"])
447
448         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
449         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
450                                       test_cases_dir)
451         if test_cases_dir[-1] != os.sep:
452             test_cases_dir += os.sep
453
454         cur_pod = os.environ.get('NODE_NAME', None)
455         cur_installer = os.environ.get('INSTALLER_TYPE', None)
456
457         valid_task_files = []
458         valid_task_args = []
459         valid_task_args_fnames = []
460
461         for task in cfg["test_cases"]:
462             # 1.check file_name
463             if "file_name" in task:
464                 task_fname = task.get('file_name', None)
465                 if task_fname is None:
466                     continue
467             else:
468                 continue
469             # 2.check constraint
470             if self._meet_constraint(task, cur_pod, cur_installer):
471                 valid_task_files.append(test_cases_dir + task_fname)
472             else:
473                 continue
474             # 3.fetch task parameters
475             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
476             valid_task_args.append(task_args)
477             valid_task_args_fnames.append(task_args_fnames)
478
479         return valid_task_files, valid_task_args, valid_task_args_fnames
480
481     def parse_task(self, task_id, task_args=None, task_args_file=None):
482         """parses the task file and return an context and scenario instances"""
483         print("Parsing task config:", self.path)
484
485         try:
486             kw = {}
487             if task_args_file:
488                 with open(task_args_file) as f:
489                     kw.update(parse_task_args("task_args_file", f.read()))
490             kw.update(parse_task_args("task_args", task_args))
491         except TypeError:
492             raise TypeError()
493
494         try:
495             with open(self.path) as f:
496                 try:
497                     input_task = f.read()
498                     rendered_task = TaskTemplate.render(input_task, **kw)
499                 except Exception as e:
500                     print("Failed to render template:\n%(task)s\n%(err)s\n"
501                           % {"task": input_task, "err": e})
502                     raise e
503                 print("Input task is:\n%s\n" % rendered_task)
504
505                 cfg = yaml_load(rendered_task)
506         except IOError as ioerror:
507             sys.exit(ioerror)
508
509         self._check_schema(cfg["schema"], "task")
510         meet_precondition = self._check_precondition(cfg)
511
512         # TODO: support one or many contexts? Many would simpler and precise
513         # TODO: support hybrid context type
514         if "context" in cfg:
515             context_cfgs = [cfg["context"]]
516         elif "contexts" in cfg:
517             context_cfgs = cfg["contexts"]
518         else:
519             context_cfgs = [{"type": "Dummy"}]
520
521         contexts = []
522         name_suffix = '-{}'.format(task_id[:8])
523         for cfg_attrs in context_cfgs:
524             try:
525                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
526                                                   name_suffix)
527             except KeyError:
528                 pass
529             # default to Heat context because we are testing OpenStack
530             context_type = cfg_attrs.get("type", "Heat")
531             context = Context.get(context_type)
532             context.init(cfg_attrs)
533             contexts.append(context)
534
535         run_in_parallel = cfg.get("run_in_parallel", False)
536
537         # add tc and task id for influxdb extended tags
538         for scenario in cfg["scenarios"]:
539             task_name = os.path.splitext(os.path.basename(self.path))[0]
540             scenario["tc"] = task_name
541             scenario["task_id"] = task_id
542             # embed task path into scenario so we can load other files
543             # relative to task path
544             scenario["task_path"] = os.path.dirname(self.path)
545
546             change_server_name(scenario, name_suffix)
547
548             try:
549                 for node in scenario['nodes']:
550                     scenario['nodes'][node] += name_suffix
551             except KeyError:
552                 pass
553
554         # TODO we need something better here, a class that represent the file
555         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
556
557     def _check_schema(self, cfg_schema, schema_type):
558         """Check if config file is using the correct schema type"""
559
560         if cfg_schema != "yardstick:" + schema_type + ":0.1":
561             sys.exit("error: file %s has unknown schema %s" % (self.path,
562                                                                cfg_schema))
563
564     def _check_precondition(self, cfg):
565         """Check if the environment meet the precondition"""
566
567         if "precondition" in cfg:
568             precondition = cfg["precondition"]
569             installer_type = precondition.get("installer_type", None)
570             deploy_scenarios = precondition.get("deploy_scenarios", None)
571             tc_fit_pods = precondition.get("pod_name", None)
572             installer_type_env = os.environ.get('INSTALL_TYPE', None)
573             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
574             pod_name_env = os.environ.get('NODE_NAME', None)
575
576             LOG.info("installer_type: %s, installer_type_env: %s",
577                      installer_type, installer_type_env)
578             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
579                      deploy_scenarios, deploy_scenario_env)
580             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
581                      tc_fit_pods, pod_name_env)
582             if installer_type and installer_type_env:
583                 if installer_type_env not in installer_type:
584                     return False
585             if deploy_scenarios and deploy_scenario_env:
586                 deploy_scenarios_list = deploy_scenarios.split(',')
587                 for deploy_scenario in deploy_scenarios_list:
588                     if deploy_scenario_env.startswith(deploy_scenario):
589                         return True
590                 return False
591             if tc_fit_pods and pod_name_env:
592                 if pod_name_env not in tc_fit_pods:
593                     return False
594         return True
595
596
597 def is_ip_addr(addr):
598     """check if string addr is an IP address"""
599     try:
600         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
601     except AttributeError:
602         pass
603
604     try:
605         ipaddress.ip_address(addr.encode('utf-8'))
606     except ValueError:
607         return False
608     else:
609         return True
610
611
612 def _is_background_scenario(scenario):
613     if "run_in_background" in scenario:
614         return scenario["run_in_background"]
615     else:
616         return False
617
618
619 def parse_nodes_with_context(scenario_cfg):
620     """parse the 'nodes' fields in scenario """
621     # ensure consistency in node instantiation order
622     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
623                        for nodename in sorted(scenario_cfg["nodes"]))
624
625
626 def get_networks_from_nodes(nodes):
627     """parse the 'nodes' fields in scenario """
628     networks = {}
629     for node in nodes.values():
630         if not node:
631             continue
632         interfaces = node.get('interfaces', {})
633         for interface in interfaces.values():
634             vld_id = interface.get('vld_id')
635             # mgmt network doesn't have vld_id
636             if not vld_id:
637                 continue
638             network = Context.get_network({"vld_id": vld_id})
639             if network:
640                 networks[network['name']] = network
641     return networks
642
643
644 def runner_join(runner):
645     """join (wait for) a runner, exit process at runner failure"""
646     status = runner.join()
647     base_runner.Runner.release(runner)
648     return status
649
650
651 def print_invalid_header(source_name, args):
652     print("Invalid %(source)s passed:\n\n %(args)s\n"
653           % {"source": source_name, "args": args})
654
655
656 def parse_task_args(src_name, args):
657     if isinstance(args, collections.Mapping):
658         return args
659
660     try:
661         kw = args and yaml_load(args)
662         kw = {} if kw is None else kw
663     except yaml.parser.ParserError as e:
664         print_invalid_header(src_name, args)
665         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
666               % {"source": src_name, "err": e})
667         raise TypeError()
668
669     if not isinstance(kw, dict):
670         print_invalid_header(src_name, args)
671         print("%(src)s had to be dict, actually %(src_type)s\n"
672               % {"src": src_name, "src_type": type(kw)})
673         raise TypeError()
674     return kw
675
676
677 def check_environment():
678     auth_url = os.environ.get('OS_AUTH_URL', None)
679     if not auth_url:
680         try:
681             source_env(constants.OPENRC)
682         except IOError as e:
683             if e.errno != errno.EEXIST:
684                 raise
685             LOG.debug('OPENRC file not found')
686
687
688 def change_server_name(scenario, suffix):
689     try:
690         host = scenario['host']
691     except KeyError:
692         pass
693     else:
694         try:
695             host['name'] += suffix
696         except TypeError:
697             scenario['host'] += suffix
698
699     try:
700         target = scenario['target']
701     except KeyError:
702         pass
703     else:
704         try:
705             target['name'] += suffix
706         except TypeError:
707             scenario['target'] += suffix
708
709     try:
710         key = 'targets'
711         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
712     except KeyError:
713         pass