a32e990ff94cd95d422a8c4d06fe421d5164cac7
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import collections
25
26 from six.moves import filter
27 from jinja2 import Environment
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.runners import base as base_runner
31 from yardstick.common.yaml_loader import yaml_load
32 from yardstick.dispatcher.base import Base as DispatcherBase
33 from yardstick.common.task_template import TaskTemplate
34 from yardstick.common import utils
35 from yardstick.common import constants
36 from yardstick.common.html_template import report_template
37
38 output_file_default = "/tmp/yardstick.out"
39 config_file = '/etc/yardstick/yardstick.conf'
40 test_cases_dir_default = "tests/opnfv/test_cases/"
41 LOG = logging.getLogger(__name__)
42 JOIN_TIMEOUT = 60
43
44
45 class Task(object):     # pragma: no cover
46     """Task commands.
47
48        Set of commands to manage benchmark tasks.
49     """
50
51     def __init__(self):
52         self.contexts = []
53         self.outputs = {}
54
55     def _set_dispatchers(self, output_config):
56         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
57                                                            'file')
58         out_types = [s.strip() for s in dispatchers.split(',')]
59         output_config['DEFAULT']['dispatcher'] = out_types
60
61     def start(self, args, **kwargs):
62         """Start a benchmark scenario."""
63
64         atexit.register(self.atexit_handler)
65
66         task_id = getattr(args, 'task_id')
67         self.task_id = task_id if task_id else str(uuid.uuid4())
68
69         self._set_log()
70
71         try:
72             output_config = utils.parse_ini_file(config_file)
73         except Exception:
74             # all error will be ignore, the default value is {}
75             output_config = {}
76
77         self._init_output_config(output_config)
78         self._set_output_config(output_config, args.output_file)
79         LOG.debug('Output configuration is: %s', output_config)
80
81         self._set_dispatchers(output_config)
82
83         # update dispatcher list
84         if 'file' in output_config['DEFAULT']['dispatcher']:
85             result = {'status': 0, 'result': {}}
86             utils.write_json_to_file(args.output_file, result)
87
88         total_start_time = time.time()
89         parser = TaskParser(args.inputfile[0])
90
91         if args.suite:
92             # 1.parse suite, return suite_params info
93             task_files, task_args, task_args_fnames = \
94                 parser.parse_suite()
95         else:
96             task_files = [parser.path]
97             task_args = [args.task_args]
98             task_args_fnames = [args.task_args_file]
99
100         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
101                   task_files, task_args, task_args_fnames)
102
103         if args.parse_only:
104             sys.exit(0)
105
106         testcases = {}
107         # parse task_files
108         for i in range(0, len(task_files)):
109             one_task_start_time = time.time()
110             parser.path = task_files[i]
111             scenarios, run_in_parallel, meet_precondition, contexts = \
112                 parser.parse_task(self.task_id, task_args[i],
113                                   task_args_fnames[i])
114
115             self.contexts.extend(contexts)
116
117             if not meet_precondition:
118                 LOG.info("meet_precondition is %s, please check envrionment",
119                          meet_precondition)
120                 continue
121
122             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
123             try:
124                 data = self._run(scenarios, run_in_parallel, args.output_file)
125             except KeyboardInterrupt:
126                 raise
127             except Exception:
128                 LOG.exception("Running test case %s failed!", case_name)
129                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
130             else:
131                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
132
133             if args.keep_deploy:
134                 # keep deployment, forget about stack
135                 # (hide it for exit handler)
136                 self.contexts = []
137             else:
138                 for context in self.contexts[::-1]:
139                     context.undeploy()
140                 self.contexts = []
141             one_task_end_time = time.time()
142             LOG.info("Task %s finished in %d secs", task_files[i],
143                      one_task_end_time - one_task_start_time)
144
145         result = self._get_format_result(testcases)
146
147         self._do_output(output_config, result)
148         self._generate_reporting(result)
149
150         total_end_time = time.time()
151         LOG.info("Total finished in %d secs",
152                  total_end_time - total_start_time)
153
154         scenario = scenarios[0]
155         LOG.info("To generate report, execute command "
156                  "'yardstick report generate %(task_id)s %(tc)s'", scenario)
157         LOG.info("Task ALL DONE, exiting")
158         return result
159
160     def _generate_reporting(self, result):
161         env = Environment()
162         with open(constants.REPORTING_FILE, 'w') as f:
163             f.write(env.from_string(report_template).render(result))
164
165         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
166
167     def _set_log(self):
168         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
169         log_formatter = logging.Formatter(log_format)
170
171         utils.makedirs(constants.TASK_LOG_DIR)
172         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
173         log_handler = logging.FileHandler(log_path)
174         log_handler.setFormatter(log_formatter)
175         log_handler.setLevel(logging.DEBUG)
176
177         logging.root.addHandler(log_handler)
178
179     def _init_output_config(self, output_config):
180         output_config.setdefault('DEFAULT', {})
181         output_config.setdefault('dispatcher_http', {})
182         output_config.setdefault('dispatcher_file', {})
183         output_config.setdefault('dispatcher_influxdb', {})
184         output_config.setdefault('nsb', {})
185
186     def _set_output_config(self, output_config, file_path):
187         try:
188             out_type = os.environ['DISPATCHER']
189         except KeyError:
190             output_config['DEFAULT'].setdefault('dispatcher', 'file')
191         else:
192             output_config['DEFAULT']['dispatcher'] = out_type
193
194         output_config['dispatcher_file']['file_path'] = file_path
195
196         try:
197             target = os.environ['TARGET']
198         except KeyError:
199             pass
200         else:
201             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
202             output_config[k]['target'] = target
203
204     def _get_format_result(self, testcases):
205         criteria = self._get_task_criteria(testcases)
206
207         info = {
208             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
209             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
210             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
211             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
212         }
213
214         result = {
215             'status': 1,
216             'result': {
217                 'criteria': criteria,
218                 'task_id': self.task_id,
219                 'info': info,
220                 'testcases': testcases
221             }
222         }
223
224         return result
225
226     def _get_task_criteria(self, testcases):
227         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
228         if criteria:
229             return 'FAIL'
230         else:
231             return 'PASS'
232
233     def _do_output(self, output_config, result):
234         dispatchers = DispatcherBase.get(output_config)
235
236         for dispatcher in dispatchers:
237             dispatcher.flush_result_data(result)
238
239     def _run(self, scenarios, run_in_parallel, output_file):
240         """Deploys context and calls runners"""
241         for context in self.contexts:
242             context.deploy()
243
244         background_runners = []
245
246         result = []
247         # Start all background scenarios
248         for scenario in filter(_is_background_scenario, scenarios):
249             scenario["runner"] = dict(type="Duration", duration=1000000000)
250             runner = self.run_one_scenario(scenario, output_file)
251             background_runners.append(runner)
252
253         runners = []
254         if run_in_parallel:
255             for scenario in scenarios:
256                 if not _is_background_scenario(scenario):
257                     runner = self.run_one_scenario(scenario, output_file)
258                     runners.append(runner)
259
260             # Wait for runners to finish
261             for runner in runners:
262                 status = runner_join(runner)
263                 if status != 0:
264                     raise RuntimeError
265                 self.outputs.update(runner.get_output())
266                 result.extend(runner.get_result())
267                 LOG.info("Runner ended, output in %s", output_file)
268         else:
269             # run serially
270             for scenario in scenarios:
271                 if not _is_background_scenario(scenario):
272                     runner = self.run_one_scenario(scenario, output_file)
273                     status = runner_join(runner)
274                     if status != 0:
275                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
276                         raise RuntimeError
277                     self.outputs.update(runner.get_output())
278                     result.extend(runner.get_result())
279                     LOG.info("Runner ended, output in %s", output_file)
280
281         # Abort background runners
282         for runner in background_runners:
283             runner.abort()
284
285         # Wait for background runners to finish
286         for runner in background_runners:
287             status = runner.join(JOIN_TIMEOUT)
288             if status is None:
289                 # Nuke if it did not stop nicely
290                 base_runner.Runner.terminate(runner)
291                 runner.join(JOIN_TIMEOUT)
292             base_runner.Runner.release(runner)
293
294             self.outputs.update(runner.get_output())
295             result.extend(runner.get_result())
296             print("Background task ended")
297         return result
298
299     def atexit_handler(self):
300         """handler for process termination"""
301         base_runner.Runner.terminate_all()
302
303         if self.contexts:
304             LOG.info("Undeploying all contexts")
305             for context in self.contexts[::-1]:
306                 context.undeploy()
307
308     def _parse_options(self, op):
309         if isinstance(op, dict):
310             return {k: self._parse_options(v) for k, v in op.items()}
311         elif isinstance(op, list):
312             return [self._parse_options(v) for v in op]
313         elif isinstance(op, str):
314             return self.outputs.get(op[1:]) if op.startswith('$') else op
315         else:
316             return op
317
318     def run_one_scenario(self, scenario_cfg, output_file):
319         """run one scenario using context"""
320         runner_cfg = scenario_cfg["runner"]
321         runner_cfg['output_filename'] = output_file
322
323         options = scenario_cfg.get('options', {})
324         scenario_cfg['options'] = self._parse_options(options)
325
326         # TODO support get multi hosts/vms info
327         context_cfg = {}
328         server_name = scenario_cfg.get('options', {}).get('server_name', {})
329
330         def config_context_target(cfg):
331             target = cfg['target']
332             if is_ip_addr(target):
333                 context_cfg['target'] = {"ipaddr": target}
334             else:
335                 context_cfg['target'] = Context.get_server(target)
336                 if self._is_same_context(cfg["host"], target):
337                     context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
338                 else:
339                     context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
340
341         host_name = server_name.get('host', scenario_cfg.get('host'))
342         if host_name:
343             context_cfg['host'] = Context.get_server(host_name)
344
345         for item in [server_name, scenario_cfg]:
346             try:
347                 config_context_target(item)
348             except KeyError:
349                 pass
350             else:
351                 break
352
353         if "targets" in scenario_cfg:
354             ip_list = []
355             for target in scenario_cfg["targets"]:
356                 if is_ip_addr(target):
357                     ip_list.append(target)
358                     context_cfg['target'] = {}
359                 else:
360                     context_cfg['target'] = Context.get_server(target)
361                     if self._is_same_context(scenario_cfg["host"],
362                                              target):
363                         ip_list.append(context_cfg["target"]["private_ip"])
364                     else:
365                         ip_list.append(context_cfg["target"]["ip"])
366             context_cfg['target']['ipaddr'] = ','.join(ip_list)
367
368         if "nodes" in scenario_cfg:
369             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
370             context_cfg["networks"] = get_networks_from_nodes(
371                 context_cfg["nodes"])
372
373         runner = base_runner.Runner.get(runner_cfg)
374
375         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
376         runner.run(scenario_cfg, context_cfg)
377
378         return runner
379
380     def _is_same_context(self, host_attr, target_attr):
381         """check if two servers are in the same heat context
382         host_attr: either a name for a server created by yardstick or a dict
383         with attribute name mapping when using external heat templates
384         target_attr: either a name for a server created by yardstick or a dict
385         with attribute name mapping when using external heat templates
386         """
387         for context in self.contexts:
388             if context.__context_type__ not in {"Heat", "Kubernetes"}:
389                 continue
390
391             host = context._get_server(host_attr)
392             if host is None:
393                 continue
394
395             target = context._get_server(target_attr)
396             if target is None:
397                 return False
398
399             # Both host and target is not None, then they are in the
400             # same heat context.
401             return True
402
403         return False
404
405
406 class TaskParser(object):       # pragma: no cover
407     """Parser for task config files in yaml format"""
408
409     def __init__(self, path):
410         self.path = path
411
412     def _meet_constraint(self, task, cur_pod, cur_installer):
413         if "constraint" in task:
414             constraint = task.get('constraint', None)
415             if constraint is not None:
416                 tc_fit_pod = constraint.get('pod', None)
417                 tc_fit_installer = constraint.get('installer', None)
418                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
419                          cur_pod, cur_installer, constraint)
420                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
421                     return False
422                 if (cur_installer is None) or (tc_fit_installer and cur_installer
423                                                not in tc_fit_installer):
424                     return False
425         return True
426
427     def _get_task_para(self, task, cur_pod):
428         task_args = task.get('task_args', None)
429         if task_args is not None:
430             task_args = task_args.get(cur_pod, task_args.get('default'))
431         task_args_fnames = task.get('task_args_fnames', None)
432         if task_args_fnames is not None:
433             task_args_fnames = task_args_fnames.get(cur_pod, None)
434         return task_args, task_args_fnames
435
436     def parse_suite(self):
437         """parse the suite file and return a list of task config file paths
438            and lists of optional parameters if present"""
439         LOG.info("\nParsing suite file:%s", self.path)
440
441         try:
442             with open(self.path) as stream:
443                 cfg = yaml_load(stream)
444         except IOError as ioerror:
445             sys.exit(ioerror)
446
447         self._check_schema(cfg["schema"], "suite")
448         LOG.info("\nStarting scenario:%s", cfg["name"])
449
450         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
451         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
452                                       test_cases_dir)
453         if test_cases_dir[-1] != os.sep:
454             test_cases_dir += os.sep
455
456         cur_pod = os.environ.get('NODE_NAME', None)
457         cur_installer = os.environ.get('INSTALLER_TYPE', None)
458
459         valid_task_files = []
460         valid_task_args = []
461         valid_task_args_fnames = []
462
463         for task in cfg["test_cases"]:
464             # 1.check file_name
465             if "file_name" in task:
466                 task_fname = task.get('file_name', None)
467                 if task_fname is None:
468                     continue
469             else:
470                 continue
471             # 2.check constraint
472             if self._meet_constraint(task, cur_pod, cur_installer):
473                 valid_task_files.append(test_cases_dir + task_fname)
474             else:
475                 continue
476             # 3.fetch task parameters
477             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
478             valid_task_args.append(task_args)
479             valid_task_args_fnames.append(task_args_fnames)
480
481         return valid_task_files, valid_task_args, valid_task_args_fnames
482
483     def parse_task(self, task_id, task_args=None, task_args_file=None):
484         """parses the task file and return an context and scenario instances"""
485         LOG.info("Parsing task config: %s", self.path)
486
487         try:
488             kw = {}
489             if task_args_file:
490                 with open(task_args_file) as f:
491                     kw.update(parse_task_args("task_args_file", f.read()))
492             kw.update(parse_task_args("task_args", task_args))
493         except TypeError:
494             raise TypeError()
495
496         try:
497             with open(self.path) as f:
498                 try:
499                     input_task = f.read()
500                     rendered_task = TaskTemplate.render(input_task, **kw)
501                 except Exception as e:
502                     LOG.exception('Failed to render template:\n%s\n', input_task)
503                     raise e
504                 LOG.debug("Input task is:\n%s\n", rendered_task)
505
506                 cfg = yaml_load(rendered_task)
507         except IOError as ioerror:
508             sys.exit(ioerror)
509
510         self._check_schema(cfg["schema"], "task")
511         meet_precondition = self._check_precondition(cfg)
512
513         # TODO: support one or many contexts? Many would simpler and precise
514         # TODO: support hybrid context type
515         if "context" in cfg:
516             context_cfgs = [cfg["context"]]
517         elif "contexts" in cfg:
518             context_cfgs = cfg["contexts"]
519         else:
520             context_cfgs = [{"type": "Dummy"}]
521
522         contexts = []
523         name_suffix = '-{}'.format(task_id[:8])
524         for cfg_attrs in context_cfgs:
525             try:
526                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
527                                                   name_suffix)
528             except KeyError:
529                 pass
530             # default to Heat context because we are testing OpenStack
531             context_type = cfg_attrs.get("type", "Heat")
532             context = Context.get(context_type)
533             context.init(cfg_attrs)
534             contexts.append(context)
535
536         run_in_parallel = cfg.get("run_in_parallel", False)
537
538         # add tc and task id for influxdb extended tags
539         for scenario in cfg["scenarios"]:
540             task_name = os.path.splitext(os.path.basename(self.path))[0]
541             scenario["tc"] = task_name
542             scenario["task_id"] = task_id
543             # embed task path into scenario so we can load other files
544             # relative to task path
545             scenario["task_path"] = os.path.dirname(self.path)
546
547             change_server_name(scenario, name_suffix)
548
549             try:
550                 for node in scenario['nodes']:
551                     scenario['nodes'][node] += name_suffix
552             except KeyError:
553                 pass
554
555         # TODO we need something better here, a class that represent the file
556         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
557
558     def _check_schema(self, cfg_schema, schema_type):
559         """Check if config file is using the correct schema type"""
560
561         if cfg_schema != "yardstick:" + schema_type + ":0.1":
562             sys.exit("error: file %s has unknown schema %s" % (self.path,
563                                                                cfg_schema))
564
565     def _check_precondition(self, cfg):
566         """Check if the environment meet the precondition"""
567
568         if "precondition" in cfg:
569             precondition = cfg["precondition"]
570             installer_type = precondition.get("installer_type", None)
571             deploy_scenarios = precondition.get("deploy_scenarios", None)
572             tc_fit_pods = precondition.get("pod_name", None)
573             installer_type_env = os.environ.get('INSTALL_TYPE', None)
574             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
575             pod_name_env = os.environ.get('NODE_NAME', None)
576
577             LOG.info("installer_type: %s, installer_type_env: %s",
578                      installer_type, installer_type_env)
579             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
580                      deploy_scenarios, deploy_scenario_env)
581             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
582                      tc_fit_pods, pod_name_env)
583             if installer_type and installer_type_env:
584                 if installer_type_env not in installer_type:
585                     return False
586             if deploy_scenarios and deploy_scenario_env:
587                 deploy_scenarios_list = deploy_scenarios.split(',')
588                 for deploy_scenario in deploy_scenarios_list:
589                     if deploy_scenario_env.startswith(deploy_scenario):
590                         return True
591                 return False
592             if tc_fit_pods and pod_name_env:
593                 if pod_name_env not in tc_fit_pods:
594                     return False
595         return True
596
597
598 def is_ip_addr(addr):
599     """check if string addr is an IP address"""
600     try:
601         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
602     except AttributeError:
603         pass
604
605     try:
606         ipaddress.ip_address(addr.encode('utf-8'))
607     except ValueError:
608         return False
609     else:
610         return True
611
612
613 def _is_background_scenario(scenario):
614     if "run_in_background" in scenario:
615         return scenario["run_in_background"]
616     else:
617         return False
618
619
620 def parse_nodes_with_context(scenario_cfg):
621     """parse the 'nodes' fields in scenario """
622     # ensure consistency in node instantiation order
623     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
624                        for nodename in sorted(scenario_cfg["nodes"]))
625
626
627 def get_networks_from_nodes(nodes):
628     """parse the 'nodes' fields in scenario """
629     networks = {}
630     for node in nodes.values():
631         if not node:
632             continue
633         interfaces = node.get('interfaces', {})
634         for interface in interfaces.values():
635             # vld_id is network_name
636             network_name = interface.get('network_name')
637             if not network_name:
638                 continue
639             network = Context.get_network(network_name)
640             if network:
641                 networks[network['name']] = network
642     return networks
643
644
645 def runner_join(runner):
646     """join (wait for) a runner, exit process at runner failure"""
647     status = runner.join()
648     base_runner.Runner.release(runner)
649     return status
650
651
652 def print_invalid_header(source_name, args):
653     print("Invalid %(source)s passed:\n\n %(args)s\n"
654           % {"source": source_name, "args": args})
655
656
657 def parse_task_args(src_name, args):
658     if isinstance(args, collections.Mapping):
659         return args
660
661     try:
662         kw = args and yaml_load(args)
663         kw = {} if kw is None else kw
664     except yaml.parser.ParserError as e:
665         print_invalid_header(src_name, args)
666         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
667               % {"source": src_name, "err": e})
668         raise TypeError()
669
670     if not isinstance(kw, dict):
671         print_invalid_header(src_name, args)
672         print("%(src)s had to be dict, actually %(src_type)s\n"
673               % {"src": src_name, "src_type": type(kw)})
674         raise TypeError()
675     return kw
676
677
678 def change_server_name(scenario, suffix):
679
680     def add_suffix(cfg, key):
681         try:
682             value = cfg[key]
683         except KeyError:
684             pass
685         else:
686             try:
687                 value['name'] += suffix
688             except TypeError:
689                 cfg[key] += suffix
690
691     server_name = scenario.get('options', {}).get('server_name', {})
692
693     add_suffix(scenario, 'host')
694     add_suffix(scenario, 'target')
695     add_suffix(server_name, 'host')
696     add_suffix(server_name, 'target')
697
698     try:
699         key = 'targets'
700         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
701     except KeyError:
702         pass