Log each test case status in a task
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import collections
25
26 from six.moves import filter
27 from jinja2 import Environment
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.runners import base as base_runner
31 from yardstick.common.yaml_loader import yaml_load
32 from yardstick.dispatcher.base import Base as DispatcherBase
33 from yardstick.common.task_template import TaskTemplate
34 from yardstick.common import utils
35 from yardstick.common import constants
36 from yardstick.common.html_template import report_template
37
38 output_file_default = "/tmp/yardstick.out"
39 config_file = '/etc/yardstick/yardstick.conf'
40 test_cases_dir_default = "tests/opnfv/test_cases/"
41 LOG = logging.getLogger(__name__)
42 JOIN_TIMEOUT = 60
43
44
45 class Task(object):     # pragma: no cover
46     """Task commands.
47
48        Set of commands to manage benchmark tasks.
49     """
50
51     def __init__(self):
52         self.contexts = []
53         self.outputs = {}
54
55     def _set_dispatchers(self, output_config):
56         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
57                                                            'file')
58         out_types = [s.strip() for s in dispatchers.split(',')]
59         output_config['DEFAULT']['dispatcher'] = out_types
60
61     def start(self, args, **kwargs):
62         """Start a benchmark scenario."""
63
64         atexit.register(self.atexit_handler)
65
66         task_id = getattr(args, 'task_id')
67         self.task_id = task_id if task_id else str(uuid.uuid4())
68
69         self._set_log()
70
71         try:
72             output_config = utils.parse_ini_file(config_file)
73         except Exception:
74             # all error will be ignore, the default value is {}
75             output_config = {}
76
77         self._init_output_config(output_config)
78         self._set_output_config(output_config, args.output_file)
79         LOG.debug('Output configuration is: %s', output_config)
80
81         self._set_dispatchers(output_config)
82
83         # update dispatcher list
84         if 'file' in output_config['DEFAULT']['dispatcher']:
85             result = {'status': 0, 'result': {}}
86             utils.write_json_to_file(args.output_file, result)
87
88         total_start_time = time.time()
89         parser = TaskParser(args.inputfile[0])
90
91         if args.suite:
92             # 1.parse suite, return suite_params info
93             task_files, task_args, task_args_fnames = \
94                 parser.parse_suite()
95         else:
96             task_files = [parser.path]
97             task_args = [args.task_args]
98             task_args_fnames = [args.task_args_file]
99
100         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
101                   task_files, task_args, task_args_fnames)
102
103         if args.parse_only:
104             sys.exit(0)
105
106         testcases = {}
107         # parse task_files
108         for i in range(0, len(task_files)):
109             one_task_start_time = time.time()
110             parser.path = task_files[i]
111             scenarios, run_in_parallel, meet_precondition, contexts = \
112                 parser.parse_task(self.task_id, task_args[i],
113                                   task_args_fnames[i])
114
115             self.contexts.extend(contexts)
116
117             if not meet_precondition:
118                 LOG.info("meet_precondition is %s, please check envrionment",
119                          meet_precondition)
120                 continue
121
122             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
123             try:
124                 data = self._run(scenarios, run_in_parallel, args.output_file)
125             except KeyboardInterrupt:
126                 raise
127             except Exception:
128                 LOG.error('Testcase: "%s" FAILED!!!', case_name, exe_info=True)
129                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
130             else:
131                 LOG.info('Testcase: "%s" SUCCESS!!!', case_name)
132                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
133
134             if args.keep_deploy:
135                 # keep deployment, forget about stack
136                 # (hide it for exit handler)
137                 self.contexts = []
138             else:
139                 for context in self.contexts[::-1]:
140                     context.undeploy()
141                 self.contexts = []
142             one_task_end_time = time.time()
143             LOG.info("Task %s finished in %d secs", task_files[i],
144                      one_task_end_time - one_task_start_time)
145
146         result = self._get_format_result(testcases)
147
148         self._do_output(output_config, result)
149         self._generate_reporting(result)
150
151         total_end_time = time.time()
152         LOG.info("Total finished in %d secs",
153                  total_end_time - total_start_time)
154
155         scenario = scenarios[0]
156         LOG.info("To generate report, execute command "
157                  "'yardstick report generate %(task_id)s %(tc)s'", scenario)
158         LOG.info("Task ALL DONE, exiting")
159         return result
160
161     def _generate_reporting(self, result):
162         env = Environment()
163         with open(constants.REPORTING_FILE, 'w') as f:
164             f.write(env.from_string(report_template).render(result))
165
166         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
167
168     def _set_log(self):
169         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
170         log_formatter = logging.Formatter(log_format)
171
172         utils.makedirs(constants.TASK_LOG_DIR)
173         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
174         log_handler = logging.FileHandler(log_path)
175         log_handler.setFormatter(log_formatter)
176         log_handler.setLevel(logging.DEBUG)
177
178         logging.root.addHandler(log_handler)
179
180     def _init_output_config(self, output_config):
181         output_config.setdefault('DEFAULT', {})
182         output_config.setdefault('dispatcher_http', {})
183         output_config.setdefault('dispatcher_file', {})
184         output_config.setdefault('dispatcher_influxdb', {})
185         output_config.setdefault('nsb', {})
186
187     def _set_output_config(self, output_config, file_path):
188         try:
189             out_type = os.environ['DISPATCHER']
190         except KeyError:
191             output_config['DEFAULT'].setdefault('dispatcher', 'file')
192         else:
193             output_config['DEFAULT']['dispatcher'] = out_type
194
195         output_config['dispatcher_file']['file_path'] = file_path
196
197         try:
198             target = os.environ['TARGET']
199         except KeyError:
200             pass
201         else:
202             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
203             output_config[k]['target'] = target
204
205     def _get_format_result(self, testcases):
206         criteria = self._get_task_criteria(testcases)
207
208         info = {
209             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
210             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
211             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
212             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
213         }
214
215         result = {
216             'status': 1,
217             'result': {
218                 'criteria': criteria,
219                 'task_id': self.task_id,
220                 'info': info,
221                 'testcases': testcases
222             }
223         }
224
225         return result
226
227     def _get_task_criteria(self, testcases):
228         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
229         if criteria:
230             return 'FAIL'
231         else:
232             return 'PASS'
233
234     def _do_output(self, output_config, result):
235         dispatchers = DispatcherBase.get(output_config)
236
237         for dispatcher in dispatchers:
238             dispatcher.flush_result_data(result)
239
240     def _run(self, scenarios, run_in_parallel, output_file):
241         """Deploys context and calls runners"""
242         for context in self.contexts:
243             context.deploy()
244
245         background_runners = []
246
247         result = []
248         # Start all background scenarios
249         for scenario in filter(_is_background_scenario, scenarios):
250             scenario["runner"] = dict(type="Duration", duration=1000000000)
251             runner = self.run_one_scenario(scenario, output_file)
252             background_runners.append(runner)
253
254         runners = []
255         if run_in_parallel:
256             for scenario in scenarios:
257                 if not _is_background_scenario(scenario):
258                     runner = self.run_one_scenario(scenario, output_file)
259                     runners.append(runner)
260
261             # Wait for runners to finish
262             for runner in runners:
263                 status = runner_join(runner)
264                 if status != 0:
265                     raise RuntimeError
266                 self.outputs.update(runner.get_output())
267                 result.extend(runner.get_result())
268                 LOG.info("Runner ended, output in %s", output_file)
269         else:
270             # run serially
271             for scenario in scenarios:
272                 if not _is_background_scenario(scenario):
273                     runner = self.run_one_scenario(scenario, output_file)
274                     status = runner_join(runner)
275                     if status != 0:
276                         LOG.error('Scenario NO.%s: "%s" ERROR!',
277                                   scenarios.index(scenario) + 1,
278                                   scenario.get('type'))
279                         raise RuntimeError
280                     self.outputs.update(runner.get_output())
281                     result.extend(runner.get_result())
282                     LOG.info("Runner ended, output in %s", output_file)
283
284         # Abort background runners
285         for runner in background_runners:
286             runner.abort()
287
288         # Wait for background runners to finish
289         for runner in background_runners:
290             status = runner.join(JOIN_TIMEOUT)
291             if status is None:
292                 # Nuke if it did not stop nicely
293                 base_runner.Runner.terminate(runner)
294                 runner.join(JOIN_TIMEOUT)
295             base_runner.Runner.release(runner)
296
297             self.outputs.update(runner.get_output())
298             result.extend(runner.get_result())
299             print("Background task ended")
300         return result
301
302     def atexit_handler(self):
303         """handler for process termination"""
304         base_runner.Runner.terminate_all()
305
306         if self.contexts:
307             LOG.info("Undeploying all contexts")
308             for context in self.contexts[::-1]:
309                 context.undeploy()
310
311     def _parse_options(self, op):
312         if isinstance(op, dict):
313             return {k: self._parse_options(v) for k, v in op.items()}
314         elif isinstance(op, list):
315             return [self._parse_options(v) for v in op]
316         elif isinstance(op, str):
317             return self.outputs.get(op[1:]) if op.startswith('$') else op
318         else:
319             return op
320
321     def run_one_scenario(self, scenario_cfg, output_file):
322         """run one scenario using context"""
323         runner_cfg = scenario_cfg["runner"]
324         runner_cfg['output_filename'] = output_file
325
326         options = scenario_cfg.get('options', {})
327         scenario_cfg['options'] = self._parse_options(options)
328
329         # TODO support get multi hosts/vms info
330         context_cfg = {}
331         if "host" in scenario_cfg:
332             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
333
334         if "target" in scenario_cfg:
335             if is_ip_addr(scenario_cfg["target"]):
336                 context_cfg['target'] = {}
337                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
338             else:
339                 context_cfg['target'] = Context.get_server(
340                     scenario_cfg["target"])
341                 if self._is_same_heat_context(scenario_cfg["host"],
342                                               scenario_cfg["target"]):
343                     context_cfg["target"]["ipaddr"] = \
344                         context_cfg["target"]["private_ip"]
345                 else:
346                     context_cfg["target"]["ipaddr"] = \
347                         context_cfg["target"]["ip"]
348
349         if "targets" in scenario_cfg:
350             ip_list = []
351             for target in scenario_cfg["targets"]:
352                 if is_ip_addr(target):
353                     ip_list.append(target)
354                     context_cfg['target'] = {}
355                 else:
356                     context_cfg['target'] = Context.get_server(target)
357                     if self._is_same_heat_context(scenario_cfg["host"],
358                                                   target):
359                         ip_list.append(context_cfg["target"]["private_ip"])
360                     else:
361                         ip_list.append(context_cfg["target"]["ip"])
362             context_cfg['target']['ipaddr'] = ','.join(ip_list)
363
364         if "nodes" in scenario_cfg:
365             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
366             context_cfg["networks"] = get_networks_from_nodes(
367                 context_cfg["nodes"])
368         runner = base_runner.Runner.get(runner_cfg)
369
370         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
371         runner.run(scenario_cfg, context_cfg)
372
373         return runner
374
375     def _is_same_heat_context(self, host_attr, target_attr):
376         """check if two servers are in the same heat context
377         host_attr: either a name for a server created by yardstick or a dict
378         with attribute name mapping when using external heat templates
379         target_attr: either a name for a server created by yardstick or a dict
380         with attribute name mapping when using external heat templates
381         """
382         for context in self.contexts:
383             if context.__context_type__ != "Heat":
384                 continue
385
386             host = context._get_server(host_attr)
387             if host is None:
388                 continue
389
390             target = context._get_server(target_attr)
391             if target is None:
392                 return False
393
394             # Both host and target is not None, then they are in the
395             # same heat context.
396             return True
397
398         return False
399
400
401 class TaskParser(object):       # pragma: no cover
402     """Parser for task config files in yaml format"""
403
404     def __init__(self, path):
405         self.path = path
406
407     def _meet_constraint(self, task, cur_pod, cur_installer):
408         if "constraint" in task:
409             constraint = task.get('constraint', None)
410             if constraint is not None:
411                 tc_fit_pod = constraint.get('pod', None)
412                 tc_fit_installer = constraint.get('installer', None)
413                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
414                          cur_pod, cur_installer, constraint)
415                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
416                     return False
417                 if (cur_installer is None) or (tc_fit_installer and cur_installer
418                                                not in tc_fit_installer):
419                     return False
420         return True
421
422     def _get_task_para(self, task, cur_pod):
423         task_args = task.get('task_args', None)
424         if task_args is not None:
425             task_args = task_args.get(cur_pod, task_args.get('default'))
426         task_args_fnames = task.get('task_args_fnames', None)
427         if task_args_fnames is not None:
428             task_args_fnames = task_args_fnames.get(cur_pod, None)
429         return task_args, task_args_fnames
430
431     def parse_suite(self):
432         """parse the suite file and return a list of task config file paths
433            and lists of optional parameters if present"""
434         LOG.info("\nParsing suite file:%s", self.path)
435
436         try:
437             with open(self.path) as stream:
438                 cfg = yaml_load(stream)
439         except IOError as ioerror:
440             sys.exit(ioerror)
441
442         self._check_schema(cfg["schema"], "suite")
443         LOG.info("\nStarting scenario:%s", cfg["name"])
444
445         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
446         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
447                                       test_cases_dir)
448         if test_cases_dir[-1] != os.sep:
449             test_cases_dir += os.sep
450
451         cur_pod = os.environ.get('NODE_NAME', None)
452         cur_installer = os.environ.get('INSTALLER_TYPE', None)
453
454         valid_task_files = []
455         valid_task_args = []
456         valid_task_args_fnames = []
457
458         for task in cfg["test_cases"]:
459             # 1.check file_name
460             if "file_name" in task:
461                 task_fname = task.get('file_name', None)
462                 if task_fname is None:
463                     continue
464             else:
465                 continue
466             # 2.check constraint
467             if self._meet_constraint(task, cur_pod, cur_installer):
468                 valid_task_files.append(test_cases_dir + task_fname)
469             else:
470                 continue
471             # 3.fetch task parameters
472             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
473             valid_task_args.append(task_args)
474             valid_task_args_fnames.append(task_args_fnames)
475
476         return valid_task_files, valid_task_args, valid_task_args_fnames
477
478     def parse_task(self, task_id, task_args=None, task_args_file=None):
479         """parses the task file and return an context and scenario instances"""
480         LOG.info("Parsing task config: %s", self.path)
481
482         try:
483             kw = {}
484             if task_args_file:
485                 with open(task_args_file) as f:
486                     kw.update(parse_task_args("task_args_file", f.read()))
487             kw.update(parse_task_args("task_args", task_args))
488         except TypeError:
489             raise TypeError()
490
491         try:
492             with open(self.path) as f:
493                 try:
494                     input_task = f.read()
495                     rendered_task = TaskTemplate.render(input_task, **kw)
496                 except Exception as e:
497                     LOG.exception('Failed to render template:\n%s\n', input_task)
498                     raise e
499                 LOG.debug("Input task is:\n%s\n", rendered_task)
500
501                 cfg = yaml_load(rendered_task)
502         except IOError as ioerror:
503             sys.exit(ioerror)
504
505         self._check_schema(cfg["schema"], "task")
506         meet_precondition = self._check_precondition(cfg)
507
508         # TODO: support one or many contexts? Many would simpler and precise
509         # TODO: support hybrid context type
510         if "context" in cfg:
511             context_cfgs = [cfg["context"]]
512         elif "contexts" in cfg:
513             context_cfgs = cfg["contexts"]
514         else:
515             context_cfgs = [{"type": "Dummy"}]
516
517         contexts = []
518         name_suffix = '-{}'.format(task_id[:8])
519         for cfg_attrs in context_cfgs:
520             try:
521                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
522                                                   name_suffix)
523             except KeyError:
524                 pass
525             # default to Heat context because we are testing OpenStack
526             context_type = cfg_attrs.get("type", "Heat")
527             context = Context.get(context_type)
528             context.init(cfg_attrs)
529             contexts.append(context)
530
531         run_in_parallel = cfg.get("run_in_parallel", False)
532
533         # add tc and task id for influxdb extended tags
534         for scenario in cfg["scenarios"]:
535             task_name = os.path.splitext(os.path.basename(self.path))[0]
536             scenario["tc"] = task_name
537             scenario["task_id"] = task_id
538             # embed task path into scenario so we can load other files
539             # relative to task path
540             scenario["task_path"] = os.path.dirname(self.path)
541
542             change_server_name(scenario, name_suffix)
543
544             try:
545                 for node in scenario['nodes']:
546                     scenario['nodes'][node] += name_suffix
547             except KeyError:
548                 pass
549
550         # TODO we need something better here, a class that represent the file
551         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
552
553     def _check_schema(self, cfg_schema, schema_type):
554         """Check if config file is using the correct schema type"""
555
556         if cfg_schema != "yardstick:" + schema_type + ":0.1":
557             sys.exit("error: file %s has unknown schema %s" % (self.path,
558                                                                cfg_schema))
559
560     def _check_precondition(self, cfg):
561         """Check if the environment meet the precondition"""
562
563         if "precondition" in cfg:
564             precondition = cfg["precondition"]
565             installer_type = precondition.get("installer_type", None)
566             deploy_scenarios = precondition.get("deploy_scenarios", None)
567             tc_fit_pods = precondition.get("pod_name", None)
568             installer_type_env = os.environ.get('INSTALL_TYPE', None)
569             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
570             pod_name_env = os.environ.get('NODE_NAME', None)
571
572             LOG.info("installer_type: %s, installer_type_env: %s",
573                      installer_type, installer_type_env)
574             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
575                      deploy_scenarios, deploy_scenario_env)
576             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
577                      tc_fit_pods, pod_name_env)
578             if installer_type and installer_type_env:
579                 if installer_type_env not in installer_type:
580                     return False
581             if deploy_scenarios and deploy_scenario_env:
582                 deploy_scenarios_list = deploy_scenarios.split(',')
583                 for deploy_scenario in deploy_scenarios_list:
584                     if deploy_scenario_env.startswith(deploy_scenario):
585                         return True
586                 return False
587             if tc_fit_pods and pod_name_env:
588                 if pod_name_env not in tc_fit_pods:
589                     return False
590         return True
591
592
593 def is_ip_addr(addr):
594     """check if string addr is an IP address"""
595     try:
596         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
597     except AttributeError:
598         pass
599
600     try:
601         ipaddress.ip_address(addr.encode('utf-8'))
602     except ValueError:
603         return False
604     else:
605         return True
606
607
608 def _is_background_scenario(scenario):
609     if "run_in_background" in scenario:
610         return scenario["run_in_background"]
611     else:
612         return False
613
614
615 def parse_nodes_with_context(scenario_cfg):
616     """parse the 'nodes' fields in scenario """
617     # ensure consistency in node instantiation order
618     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
619                        for nodename in sorted(scenario_cfg["nodes"]))
620
621
622 def get_networks_from_nodes(nodes):
623     """parse the 'nodes' fields in scenario """
624     networks = {}
625     for node in nodes.values():
626         if not node:
627             continue
628         interfaces = node.get('interfaces', {})
629         for interface in interfaces.values():
630             # vld_id is network_name
631             network_name = interface.get('network_name')
632             if not network_name:
633                 continue
634             network = Context.get_network(network_name)
635             if network:
636                 networks[network['name']] = network
637     return networks
638
639
640 def runner_join(runner):
641     """join (wait for) a runner, exit process at runner failure"""
642     status = runner.join()
643     base_runner.Runner.release(runner)
644     return status
645
646
647 def print_invalid_header(source_name, args):
648     print("Invalid %(source)s passed:\n\n %(args)s\n"
649           % {"source": source_name, "args": args})
650
651
652 def parse_task_args(src_name, args):
653     if isinstance(args, collections.Mapping):
654         return args
655
656     try:
657         kw = args and yaml_load(args)
658         kw = {} if kw is None else kw
659     except yaml.parser.ParserError as e:
660         print_invalid_header(src_name, args)
661         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
662               % {"source": src_name, "err": e})
663         raise TypeError()
664
665     if not isinstance(kw, dict):
666         print_invalid_header(src_name, args)
667         print("%(src)s had to be dict, actually %(src_type)s\n"
668               % {"src": src_name, "src_type": type(kw)})
669         raise TypeError()
670     return kw
671
672
673 def change_server_name(scenario, suffix):
674     try:
675         host = scenario['host']
676     except KeyError:
677         pass
678     else:
679         try:
680             host['name'] += suffix
681         except TypeError:
682             scenario['host'] += suffix
683
684     try:
685         target = scenario['target']
686     except KeyError:
687         pass
688     else:
689         try:
690             target['name'] += suffix
691         except TypeError:
692             scenario['target'] += suffix
693
694     try:
695         key = 'targets'
696         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
697     except KeyError:
698         pass