Merge "Create opnfv_k8-ovn-lb-noha_daily.yaml test suite file"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import collections
25
26 from six.moves import filter
27 from jinja2 import Environment
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.runners import base as base_runner
31 from yardstick.common.yaml_loader import yaml_load
32 from yardstick.dispatcher.base import Base as DispatcherBase
33 from yardstick.common.task_template import TaskTemplate
34 from yardstick.common import utils
35 from yardstick.common import constants
36 from yardstick.common.html_template import report_template
37
38 output_file_default = "/tmp/yardstick.out"
39 config_file = '/etc/yardstick/yardstick.conf'
40 test_cases_dir_default = "tests/opnfv/test_cases/"
41 LOG = logging.getLogger(__name__)
42 JOIN_TIMEOUT = 60
43
44
45 class Task(object):     # pragma: no cover
46     """Task commands.
47
48        Set of commands to manage benchmark tasks.
49     """
50
51     def __init__(self):
52         self.contexts = []
53         self.outputs = {}
54
55     def _set_dispatchers(self, output_config):
56         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
57                                                            'file')
58         out_types = [s.strip() for s in dispatchers.split(',')]
59         output_config['DEFAULT']['dispatcher'] = out_types
60
61     def start(self, args, **kwargs):
62         """Start a benchmark scenario."""
63
64         atexit.register(self.atexit_handler)
65
66         task_id = getattr(args, 'task_id')
67         self.task_id = task_id if task_id else str(uuid.uuid4())
68
69         self._set_log()
70
71         try:
72             output_config = utils.parse_ini_file(config_file)
73         except Exception:
74             # all error will be ignore, the default value is {}
75             output_config = {}
76
77         self._init_output_config(output_config)
78         self._set_output_config(output_config, args.output_file)
79         LOG.debug('Output configuration is: %s', output_config)
80
81         self._set_dispatchers(output_config)
82
83         # update dispatcher list
84         if 'file' in output_config['DEFAULT']['dispatcher']:
85             result = {'status': 0, 'result': {}}
86             utils.write_json_to_file(args.output_file, result)
87
88         total_start_time = time.time()
89         parser = TaskParser(args.inputfile[0])
90
91         if args.suite:
92             # 1.parse suite, return suite_params info
93             task_files, task_args, task_args_fnames = \
94                 parser.parse_suite()
95         else:
96             task_files = [parser.path]
97             task_args = [args.task_args]
98             task_args_fnames = [args.task_args_file]
99
100         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
101                  task_files, task_args, task_args_fnames)
102
103         if args.parse_only:
104             sys.exit(0)
105
106         testcases = {}
107         # parse task_files
108         for i in range(0, len(task_files)):
109             one_task_start_time = time.time()
110             parser.path = task_files[i]
111             scenarios, run_in_parallel, meet_precondition, contexts = \
112                 parser.parse_task(self.task_id, task_args[i],
113                                   task_args_fnames[i])
114
115             self.contexts.extend(contexts)
116
117             if not meet_precondition:
118                 LOG.info("meet_precondition is %s, please check envrionment",
119                          meet_precondition)
120                 continue
121
122             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
123             try:
124                 data = self._run(scenarios, run_in_parallel, args.output_file)
125             except KeyboardInterrupt:
126                 raise
127             except Exception:
128                 LOG.exception("Running test case %s failed!", case_name)
129                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
130             else:
131                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
132
133             if args.keep_deploy:
134                 # keep deployment, forget about stack
135                 # (hide it for exit handler)
136                 self.contexts = []
137             else:
138                 for context in self.contexts[::-1]:
139                     context.undeploy()
140                 self.contexts = []
141             one_task_end_time = time.time()
142             LOG.info("task %s finished in %d secs", task_files[i],
143                      one_task_end_time - one_task_start_time)
144
145         result = self._get_format_result(testcases)
146
147         self._do_output(output_config, result)
148         self._generate_reporting(result)
149
150         total_end_time = time.time()
151         LOG.info("total finished in %d secs",
152                  total_end_time - total_start_time)
153
154         scenario = scenarios[0]
155         print("To generate report execute => yardstick report generate ",
156               scenario['task_id'], scenario['tc'])
157
158         print("Done, exiting")
159         return result
160
161     def _generate_reporting(self, result):
162         env = Environment()
163         with open(constants.REPORTING_FILE, 'w') as f:
164             f.write(env.from_string(report_template).render(result))
165
166         LOG.info('yardstick reporting generate in %s', constants.REPORTING_FILE)
167
168     def _set_log(self):
169         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
170         log_formatter = logging.Formatter(log_format)
171
172         utils.makedirs(constants.TASK_LOG_DIR)
173         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
174         log_handler = logging.FileHandler(log_path)
175         log_handler.setFormatter(log_formatter)
176         log_handler.setLevel(logging.DEBUG)
177
178         logging.root.addHandler(log_handler)
179
180     def _init_output_config(self, output_config):
181         output_config.setdefault('DEFAULT', {})
182         output_config.setdefault('dispatcher_http', {})
183         output_config.setdefault('dispatcher_file', {})
184         output_config.setdefault('dispatcher_influxdb', {})
185         output_config.setdefault('nsb', {})
186
187     def _set_output_config(self, output_config, file_path):
188         try:
189             out_type = os.environ['DISPATCHER']
190         except KeyError:
191             output_config['DEFAULT'].setdefault('dispatcher', 'file')
192         else:
193             output_config['DEFAULT']['dispatcher'] = out_type
194
195         output_config['dispatcher_file']['file_path'] = file_path
196
197         try:
198             target = os.environ['TARGET']
199         except KeyError:
200             pass
201         else:
202             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
203             output_config[k]['target'] = target
204
205     def _get_format_result(self, testcases):
206         criteria = self._get_task_criteria(testcases)
207
208         info = {
209             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
210             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
211             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
212             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
213         }
214
215         result = {
216             'status': 1,
217             'result': {
218                 'criteria': criteria,
219                 'task_id': self.task_id,
220                 'info': info,
221                 'testcases': testcases
222             }
223         }
224
225         return result
226
227     def _get_task_criteria(self, testcases):
228         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
229         if criteria:
230             return 'FAIL'
231         else:
232             return 'PASS'
233
234     def _do_output(self, output_config, result):
235         dispatchers = DispatcherBase.get(output_config)
236
237         for dispatcher in dispatchers:
238             dispatcher.flush_result_data(result)
239
240     def _run(self, scenarios, run_in_parallel, output_file):
241         """Deploys context and calls runners"""
242         for context in self.contexts:
243             context.deploy()
244
245         background_runners = []
246
247         result = []
248         # Start all background scenarios
249         for scenario in filter(_is_background_scenario, scenarios):
250             scenario["runner"] = dict(type="Duration", duration=1000000000)
251             runner = self.run_one_scenario(scenario, output_file)
252             background_runners.append(runner)
253
254         runners = []
255         if run_in_parallel:
256             for scenario in scenarios:
257                 if not _is_background_scenario(scenario):
258                     runner = self.run_one_scenario(scenario, output_file)
259                     runners.append(runner)
260
261             # Wait for runners to finish
262             for runner in runners:
263                 status = runner_join(runner)
264                 if status != 0:
265                     raise RuntimeError
266                 self.outputs.update(runner.get_output())
267                 result.extend(runner.get_result())
268                 print("Runner ended, output in", output_file)
269         else:
270             # run serially
271             for scenario in scenarios:
272                 if not _is_background_scenario(scenario):
273                     runner = self.run_one_scenario(scenario, output_file)
274                     status = runner_join(runner)
275                     if status != 0:
276                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
277                         raise RuntimeError
278                     self.outputs.update(runner.get_output())
279                     result.extend(runner.get_result())
280                     print("Runner ended, output in", output_file)
281
282         # Abort background runners
283         for runner in background_runners:
284             runner.abort()
285
286         # Wait for background runners to finish
287         for runner in background_runners:
288             status = runner.join(JOIN_TIMEOUT)
289             if status is None:
290                 # Nuke if it did not stop nicely
291                 base_runner.Runner.terminate(runner)
292                 runner.join(JOIN_TIMEOUT)
293             base_runner.Runner.release(runner)
294
295             self.outputs.update(runner.get_output())
296             result.extend(runner.get_result())
297             print("Background task ended")
298         return result
299
300     def atexit_handler(self):
301         """handler for process termination"""
302         base_runner.Runner.terminate_all()
303
304         if self.contexts:
305             print("Undeploying all contexts")
306             for context in self.contexts[::-1]:
307                 context.undeploy()
308
309     def _parse_options(self, op):
310         if isinstance(op, dict):
311             return {k: self._parse_options(v) for k, v in op.items()}
312         elif isinstance(op, list):
313             return [self._parse_options(v) for v in op]
314         elif isinstance(op, str):
315             return self.outputs.get(op[1:]) if op.startswith('$') else op
316         else:
317             return op
318
319     def run_one_scenario(self, scenario_cfg, output_file):
320         """run one scenario using context"""
321         runner_cfg = scenario_cfg["runner"]
322         runner_cfg['output_filename'] = output_file
323
324         options = scenario_cfg.get('options', {})
325         scenario_cfg['options'] = self._parse_options(options)
326
327         # TODO support get multi hosts/vms info
328         context_cfg = {}
329         if "host" in scenario_cfg:
330             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
331
332         if "target" in scenario_cfg:
333             if is_ip_addr(scenario_cfg["target"]):
334                 context_cfg['target'] = {}
335                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
336             else:
337                 context_cfg['target'] = Context.get_server(
338                     scenario_cfg["target"])
339                 if self._is_same_heat_context(scenario_cfg["host"],
340                                               scenario_cfg["target"]):
341                     context_cfg["target"]["ipaddr"] = \
342                         context_cfg["target"]["private_ip"]
343                 else:
344                     context_cfg["target"]["ipaddr"] = \
345                         context_cfg["target"]["ip"]
346
347         if "targets" in scenario_cfg:
348             ip_list = []
349             for target in scenario_cfg["targets"]:
350                 if is_ip_addr(target):
351                     ip_list.append(target)
352                     context_cfg['target'] = {}
353                 else:
354                     context_cfg['target'] = Context.get_server(target)
355                     if self._is_same_heat_context(scenario_cfg["host"],
356                                                   target):
357                         ip_list.append(context_cfg["target"]["private_ip"])
358                     else:
359                         ip_list.append(context_cfg["target"]["ip"])
360             context_cfg['target']['ipaddr'] = ','.join(ip_list)
361
362         if "nodes" in scenario_cfg:
363             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
364             context_cfg["networks"] = get_networks_from_nodes(
365                 context_cfg["nodes"])
366         runner = base_runner.Runner.get(runner_cfg)
367
368         print("Starting runner of type '%s'" % runner_cfg["type"])
369         runner.run(scenario_cfg, context_cfg)
370
371         return runner
372
373     def _is_same_heat_context(self, host_attr, target_attr):
374         """check if two servers are in the same heat context
375         host_attr: either a name for a server created by yardstick or a dict
376         with attribute name mapping when using external heat templates
377         target_attr: either a name for a server created by yardstick or a dict
378         with attribute name mapping when using external heat templates
379         """
380         for context in self.contexts:
381             if context.__context_type__ != "Heat":
382                 continue
383
384             host = context._get_server(host_attr)
385             if host is None:
386                 continue
387
388             target = context._get_server(target_attr)
389             if target is None:
390                 return False
391
392             # Both host and target is not None, then they are in the
393             # same heat context.
394             return True
395
396         return False
397
398
399 class TaskParser(object):       # pragma: no cover
400     """Parser for task config files in yaml format"""
401
402     def __init__(self, path):
403         self.path = path
404
405     def _meet_constraint(self, task, cur_pod, cur_installer):
406         if "constraint" in task:
407             constraint = task.get('constraint', None)
408             if constraint is not None:
409                 tc_fit_pod = constraint.get('pod', None)
410                 tc_fit_installer = constraint.get('installer', None)
411                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
412                          cur_pod, cur_installer, constraint)
413                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
414                     return False
415                 if (cur_installer is None) or (tc_fit_installer and cur_installer
416                                                not in tc_fit_installer):
417                     return False
418         return True
419
420     def _get_task_para(self, task, cur_pod):
421         task_args = task.get('task_args', None)
422         if task_args is not None:
423             task_args = task_args.get(cur_pod, task_args.get('default'))
424         task_args_fnames = task.get('task_args_fnames', None)
425         if task_args_fnames is not None:
426             task_args_fnames = task_args_fnames.get(cur_pod, None)
427         return task_args, task_args_fnames
428
429     def parse_suite(self):
430         """parse the suite file and return a list of task config file paths
431            and lists of optional parameters if present"""
432         LOG.info("\nParsing suite file:%s", self.path)
433
434         try:
435             with open(self.path) as stream:
436                 cfg = yaml_load(stream)
437         except IOError as ioerror:
438             sys.exit(ioerror)
439
440         self._check_schema(cfg["schema"], "suite")
441         LOG.info("\nStarting scenario:%s", cfg["name"])
442
443         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
444         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
445                                       test_cases_dir)
446         if test_cases_dir[-1] != os.sep:
447             test_cases_dir += os.sep
448
449         cur_pod = os.environ.get('NODE_NAME', None)
450         cur_installer = os.environ.get('INSTALLER_TYPE', None)
451
452         valid_task_files = []
453         valid_task_args = []
454         valid_task_args_fnames = []
455
456         for task in cfg["test_cases"]:
457             # 1.check file_name
458             if "file_name" in task:
459                 task_fname = task.get('file_name', None)
460                 if task_fname is None:
461                     continue
462             else:
463                 continue
464             # 2.check constraint
465             if self._meet_constraint(task, cur_pod, cur_installer):
466                 valid_task_files.append(test_cases_dir + task_fname)
467             else:
468                 continue
469             # 3.fetch task parameters
470             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
471             valid_task_args.append(task_args)
472             valid_task_args_fnames.append(task_args_fnames)
473
474         return valid_task_files, valid_task_args, valid_task_args_fnames
475
476     def parse_task(self, task_id, task_args=None, task_args_file=None):
477         """parses the task file and return an context and scenario instances"""
478         print("Parsing task config:", self.path)
479
480         try:
481             kw = {}
482             if task_args_file:
483                 with open(task_args_file) as f:
484                     kw.update(parse_task_args("task_args_file", f.read()))
485             kw.update(parse_task_args("task_args", task_args))
486         except TypeError:
487             raise TypeError()
488
489         try:
490             with open(self.path) as f:
491                 try:
492                     input_task = f.read()
493                     rendered_task = TaskTemplate.render(input_task, **kw)
494                 except Exception as e:
495                     print("Failed to render template:\n%(task)s\n%(err)s\n"
496                           % {"task": input_task, "err": e})
497                     raise e
498                 print("Input task is:\n%s\n" % rendered_task)
499
500                 cfg = yaml_load(rendered_task)
501         except IOError as ioerror:
502             sys.exit(ioerror)
503
504         self._check_schema(cfg["schema"], "task")
505         meet_precondition = self._check_precondition(cfg)
506
507         # TODO: support one or many contexts? Many would simpler and precise
508         # TODO: support hybrid context type
509         if "context" in cfg:
510             context_cfgs = [cfg["context"]]
511         elif "contexts" in cfg:
512             context_cfgs = cfg["contexts"]
513         else:
514             context_cfgs = [{"type": "Dummy"}]
515
516         contexts = []
517         name_suffix = '-{}'.format(task_id[:8])
518         for cfg_attrs in context_cfgs:
519             try:
520                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
521                                                   name_suffix)
522             except KeyError:
523                 pass
524             # default to Heat context because we are testing OpenStack
525             context_type = cfg_attrs.get("type", "Heat")
526             context = Context.get(context_type)
527             context.init(cfg_attrs)
528             contexts.append(context)
529
530         run_in_parallel = cfg.get("run_in_parallel", False)
531
532         # add tc and task id for influxdb extended tags
533         for scenario in cfg["scenarios"]:
534             task_name = os.path.splitext(os.path.basename(self.path))[0]
535             scenario["tc"] = task_name
536             scenario["task_id"] = task_id
537             # embed task path into scenario so we can load other files
538             # relative to task path
539             scenario["task_path"] = os.path.dirname(self.path)
540
541             change_server_name(scenario, name_suffix)
542
543             try:
544                 for node in scenario['nodes']:
545                     scenario['nodes'][node] += name_suffix
546             except KeyError:
547                 pass
548
549         # TODO we need something better here, a class that represent the file
550         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
551
552     def _check_schema(self, cfg_schema, schema_type):
553         """Check if config file is using the correct schema type"""
554
555         if cfg_schema != "yardstick:" + schema_type + ":0.1":
556             sys.exit("error: file %s has unknown schema %s" % (self.path,
557                                                                cfg_schema))
558
559     def _check_precondition(self, cfg):
560         """Check if the environment meet the precondition"""
561
562         if "precondition" in cfg:
563             precondition = cfg["precondition"]
564             installer_type = precondition.get("installer_type", None)
565             deploy_scenarios = precondition.get("deploy_scenarios", None)
566             tc_fit_pods = precondition.get("pod_name", None)
567             installer_type_env = os.environ.get('INSTALL_TYPE', None)
568             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
569             pod_name_env = os.environ.get('NODE_NAME', None)
570
571             LOG.info("installer_type: %s, installer_type_env: %s",
572                      installer_type, installer_type_env)
573             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
574                      deploy_scenarios, deploy_scenario_env)
575             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
576                      tc_fit_pods, pod_name_env)
577             if installer_type and installer_type_env:
578                 if installer_type_env not in installer_type:
579                     return False
580             if deploy_scenarios and deploy_scenario_env:
581                 deploy_scenarios_list = deploy_scenarios.split(',')
582                 for deploy_scenario in deploy_scenarios_list:
583                     if deploy_scenario_env.startswith(deploy_scenario):
584                         return True
585                 return False
586             if tc_fit_pods and pod_name_env:
587                 if pod_name_env not in tc_fit_pods:
588                     return False
589         return True
590
591
592 def is_ip_addr(addr):
593     """check if string addr is an IP address"""
594     try:
595         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
596     except AttributeError:
597         pass
598
599     try:
600         ipaddress.ip_address(addr.encode('utf-8'))
601     except ValueError:
602         return False
603     else:
604         return True
605
606
607 def _is_background_scenario(scenario):
608     if "run_in_background" in scenario:
609         return scenario["run_in_background"]
610     else:
611         return False
612
613
614 def parse_nodes_with_context(scenario_cfg):
615     """parse the 'nodes' fields in scenario """
616     # ensure consistency in node instantiation order
617     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
618                        for nodename in sorted(scenario_cfg["nodes"]))
619
620
621 def get_networks_from_nodes(nodes):
622     """parse the 'nodes' fields in scenario """
623     networks = {}
624     for node in nodes.values():
625         if not node:
626             continue
627         interfaces = node.get('interfaces', {})
628         for interface in interfaces.values():
629             # vld_id is network_name
630             network_name = interface.get('network_name')
631             if not network_name:
632                 continue
633             network = Context.get_network(network_name)
634             if network:
635                 networks[network['name']] = network
636     return networks
637
638
639 def runner_join(runner):
640     """join (wait for) a runner, exit process at runner failure"""
641     status = runner.join()
642     base_runner.Runner.release(runner)
643     return status
644
645
646 def print_invalid_header(source_name, args):
647     print("Invalid %(source)s passed:\n\n %(args)s\n"
648           % {"source": source_name, "args": args})
649
650
651 def parse_task_args(src_name, args):
652     if isinstance(args, collections.Mapping):
653         return args
654
655     try:
656         kw = args and yaml_load(args)
657         kw = {} if kw is None else kw
658     except yaml.parser.ParserError as e:
659         print_invalid_header(src_name, args)
660         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
661               % {"source": src_name, "err": e})
662         raise TypeError()
663
664     if not isinstance(kw, dict):
665         print_invalid_header(src_name, args)
666         print("%(src)s had to be dict, actually %(src_type)s\n"
667               % {"src": src_name, "src_type": type(kw)})
668         raise TypeError()
669     return kw
670
671
672 def change_server_name(scenario, suffix):
673     try:
674         host = scenario['host']
675     except KeyError:
676         pass
677     else:
678         try:
679             host['name'] += suffix
680         except TypeError:
681             scenario['host'] += suffix
682
683     try:
684         target = scenario['target']
685     except KeyError:
686         pass
687     else:
688         try:
689             target['name'] += suffix
690         except TypeError:
691             scenario['target'] += suffix
692
693     try:
694         key = 'targets'
695         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
696     except KeyError:
697         pass