utils: create TASK_LOG_DIR if it doesn't exist
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import errno
25 import collections
26
27 from six.moves import filter
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.runners import base as base_runner
31 from yardstick.dispatcher.base import Base as DispatcherBase
32 from yardstick.common.task_template import TaskTemplate
33 from yardstick.common.utils import source_env
34 from yardstick.common import utils
35 from yardstick.common import constants
36
37 output_file_default = "/tmp/yardstick.out"
38 config_file = '/etc/yardstick/yardstick.conf'
39 test_cases_dir_default = "tests/opnfv/test_cases/"
40 LOG = logging.getLogger(__name__)
41 JOIN_TIMEOUT = 60
42
43
44 class Task(object):     # pragma: no cover
45     """Task commands.
46
47        Set of commands to manage benchmark tasks.
48     """
49
50     def __init__(self):
51         self.contexts = []
52         self.outputs = {}
53
54     def _set_dispatchers(self, output_config):
55         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
56                                                            'file')
57         out_types = [s.strip() for s in dispatchers.split(',')]
58         output_config['DEFAULT']['dispatcher'] = out_types
59
60     def start(self, args, **kwargs):
61         """Start a benchmark scenario."""
62
63         atexit.register(self.atexit_handler)
64
65         task_id = getattr(args, 'task_id')
66         self.task_id = task_id if task_id else str(uuid.uuid4())
67
68         self._set_log()
69
70         check_environment()
71
72         try:
73             output_config = utils.parse_ini_file(config_file)
74         except Exception:
75             # all error will be ignore, the default value is {}
76             output_config = {}
77
78         self._init_output_config(output_config)
79         self._set_output_config(output_config, args.output_file)
80         LOG.debug('Output configuration is: %s', output_config)
81
82         self._set_dispatchers(output_config)
83
84         # update dispatcher list
85         if 'file' in output_config['DEFAULT']['dispatcher']:
86             result = {'status': 0, 'result': {}}
87             utils.write_json_to_file(args.output_file, result)
88
89         total_start_time = time.time()
90         parser = TaskParser(args.inputfile[0])
91
92         if args.suite:
93             # 1.parse suite, return suite_params info
94             task_files, task_args, task_args_fnames = \
95                 parser.parse_suite()
96         else:
97             task_files = [parser.path]
98             task_args = [args.task_args]
99             task_args_fnames = [args.task_args_file]
100
101         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
102                  task_files, task_args, task_args_fnames)
103
104         if args.parse_only:
105             sys.exit(0)
106
107         testcases = {}
108         # parse task_files
109         for i in range(0, len(task_files)):
110             one_task_start_time = time.time()
111             parser.path = task_files[i]
112             scenarios, run_in_parallel, meet_precondition, contexts = \
113                 parser.parse_task(self.task_id, task_args[i],
114                                   task_args_fnames[i])
115
116             self.contexts.extend(contexts)
117
118             if not meet_precondition:
119                 LOG.info("meet_precondition is %s, please check envrionment",
120                          meet_precondition)
121                 continue
122
123             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
124             try:
125                 data = self._run(scenarios, run_in_parallel, args.output_file)
126             except KeyboardInterrupt:
127                 raise
128             except Exception:
129                 LOG.exception("Running test case %s failed!", case_name)
130                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
131             else:
132                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
133
134             if args.keep_deploy:
135                 # keep deployment, forget about stack
136                 # (hide it for exit handler)
137                 self.contexts = []
138             else:
139                 for context in self.contexts[::-1]:
140                     context.undeploy()
141                 self.contexts = []
142             one_task_end_time = time.time()
143             LOG.info("task %s finished in %d secs", task_files[i],
144                      one_task_end_time - one_task_start_time)
145
146         result = self._get_format_result(testcases)
147
148         self._do_output(output_config, result)
149
150         total_end_time = time.time()
151         LOG.info("total finished in %d secs",
152                  total_end_time - total_start_time)
153
154         scenario = scenarios[0]
155         print("To generate report execute => yardstick report generate ",
156               scenario['task_id'], scenario['tc'])
157
158         print("Done, exiting")
159         return result
160
161     def _set_log(self):
162         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
163         log_formatter = logging.Formatter(log_format)
164
165         utils.makedirs(constants.TASK_LOG_DIR)
166         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
167         log_handler = logging.FileHandler(log_path)
168         log_handler.setFormatter(log_formatter)
169         log_handler.setLevel(logging.DEBUG)
170
171         logging.root.addHandler(log_handler)
172
173     def _init_output_config(self, output_config):
174         output_config.setdefault('DEFAULT', {})
175         output_config.setdefault('dispatcher_http', {})
176         output_config.setdefault('dispatcher_file', {})
177         output_config.setdefault('dispatcher_influxdb', {})
178         output_config.setdefault('nsb', {})
179
180     def _set_output_config(self, output_config, file_path):
181         try:
182             out_type = os.environ['DISPATCHER']
183         except KeyError:
184             output_config['DEFAULT'].setdefault('dispatcher', 'file')
185         else:
186             output_config['DEFAULT']['dispatcher'] = out_type
187
188         output_config['dispatcher_file']['file_path'] = file_path
189
190         try:
191             target = os.environ['TARGET']
192         except KeyError:
193             pass
194         else:
195             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
196             output_config[k]['target'] = target
197
198     def _get_format_result(self, testcases):
199         criteria = self._get_task_criteria(testcases)
200
201         info = {
202             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
203             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
204             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
205             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
206         }
207
208         result = {
209             'status': 1,
210             'result': {
211                 'criteria': criteria,
212                 'task_id': self.task_id,
213                 'info': info,
214                 'testcases': testcases
215             }
216         }
217
218         return result
219
220     def _get_task_criteria(self, testcases):
221         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
222         if criteria:
223             return 'FAIL'
224         else:
225             return 'PASS'
226
227     def _do_output(self, output_config, result):
228         dispatchers = DispatcherBase.get(output_config)
229
230         for dispatcher in dispatchers:
231             dispatcher.flush_result_data(result)
232
233     def _run(self, scenarios, run_in_parallel, output_file):
234         """Deploys context and calls runners"""
235         for context in self.contexts:
236             context.deploy()
237
238         background_runners = []
239
240         result = []
241         # Start all background scenarios
242         for scenario in filter(_is_background_scenario, scenarios):
243             scenario["runner"] = dict(type="Duration", duration=1000000000)
244             runner = self.run_one_scenario(scenario, output_file)
245             background_runners.append(runner)
246
247         runners = []
248         if run_in_parallel:
249             for scenario in scenarios:
250                 if not _is_background_scenario(scenario):
251                     runner = self.run_one_scenario(scenario, output_file)
252                     runners.append(runner)
253
254             # Wait for runners to finish
255             for runner in runners:
256                 status = runner_join(runner)
257                 if status != 0:
258                     raise RuntimeError
259                 self.outputs.update(runner.get_output())
260                 result.extend(runner.get_result())
261                 print("Runner ended, output in", output_file)
262         else:
263             # run serially
264             for scenario in scenarios:
265                 if not _is_background_scenario(scenario):
266                     runner = self.run_one_scenario(scenario, output_file)
267                     status = runner_join(runner)
268                     if status != 0:
269                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
270                         raise RuntimeError
271                     self.outputs.update(runner.get_output())
272                     result.extend(runner.get_result())
273                     print("Runner ended, output in", output_file)
274
275         # Abort background runners
276         for runner in background_runners:
277             runner.abort()
278
279         # Wait for background runners to finish
280         for runner in background_runners:
281             status = runner.join(JOIN_TIMEOUT)
282             if status is None:
283                 # Nuke if it did not stop nicely
284                 base_runner.Runner.terminate(runner)
285                 runner.join(JOIN_TIMEOUT)
286             base_runner.Runner.release(runner)
287
288             self.outputs.update(runner.get_output())
289             result.extend(runner.get_result())
290             print("Background task ended")
291         return result
292
293     def atexit_handler(self):
294         """handler for process termination"""
295         base_runner.Runner.terminate_all()
296
297         if self.contexts:
298             print("Undeploying all contexts")
299             for context in self.contexts[::-1]:
300                 context.undeploy()
301
302     def _parse_options(self, op):
303         if isinstance(op, dict):
304             return {k: self._parse_options(v) for k, v in op.items()}
305         elif isinstance(op, list):
306             return [self._parse_options(v) for v in op]
307         elif isinstance(op, str):
308             return self.outputs.get(op[1:]) if op.startswith('$') else op
309         else:
310             return op
311
312     def run_one_scenario(self, scenario_cfg, output_file):
313         """run one scenario using context"""
314         runner_cfg = scenario_cfg["runner"]
315         runner_cfg['output_filename'] = output_file
316
317         options = scenario_cfg.get('options', {})
318         scenario_cfg['options'] = self._parse_options(options)
319
320         # TODO support get multi hosts/vms info
321         context_cfg = {}
322         if "host" in scenario_cfg:
323             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
324
325         if "target" in scenario_cfg:
326             if is_ip_addr(scenario_cfg["target"]):
327                 context_cfg['target'] = {}
328                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
329             else:
330                 context_cfg['target'] = Context.get_server(
331                     scenario_cfg["target"])
332                 if self._is_same_heat_context(scenario_cfg["host"],
333                                               scenario_cfg["target"]):
334                     context_cfg["target"]["ipaddr"] = \
335                         context_cfg["target"]["private_ip"]
336                 else:
337                     context_cfg["target"]["ipaddr"] = \
338                         context_cfg["target"]["ip"]
339
340         if "targets" in scenario_cfg:
341             ip_list = []
342             for target in scenario_cfg["targets"]:
343                 if is_ip_addr(target):
344                     ip_list.append(target)
345                     context_cfg['target'] = {}
346                 else:
347                     context_cfg['target'] = Context.get_server(target)
348                     if self._is_same_heat_context(scenario_cfg["host"],
349                                                   target):
350                         ip_list.append(context_cfg["target"]["private_ip"])
351                     else:
352                         ip_list.append(context_cfg["target"]["ip"])
353             context_cfg['target']['ipaddr'] = ','.join(ip_list)
354
355         if "nodes" in scenario_cfg:
356             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
357             context_cfg["networks"] = get_networks_from_nodes(
358                 context_cfg["nodes"])
359         runner = base_runner.Runner.get(runner_cfg)
360
361         print("Starting runner of type '%s'" % runner_cfg["type"])
362         runner.run(scenario_cfg, context_cfg)
363
364         return runner
365
366     def _is_same_heat_context(self, host_attr, target_attr):
367         """check if two servers are in the same heat context
368         host_attr: either a name for a server created by yardstick or a dict
369         with attribute name mapping when using external heat templates
370         target_attr: either a name for a server created by yardstick or a dict
371         with attribute name mapping when using external heat templates
372         """
373         host = None
374         target = None
375         for context in self.contexts:
376             if context.__context_type__ != "Heat":
377                 continue
378
379             host = context._get_server(host_attr)
380             if host is None:
381                 continue
382
383             target = context._get_server(target_attr)
384             if target is None:
385                 return False
386
387             # Both host and target is not None, then they are in the
388             # same heat context.
389             return True
390
391         return False
392
393
394 class TaskParser(object):       # pragma: no cover
395     """Parser for task config files in yaml format"""
396
397     def __init__(self, path):
398         self.path = path
399
400     def _meet_constraint(self, task, cur_pod, cur_installer):
401         if "constraint" in task:
402             constraint = task.get('constraint', None)
403             if constraint is not None:
404                 tc_fit_pod = constraint.get('pod', None)
405                 tc_fit_installer = constraint.get('installer', None)
406                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
407                          cur_pod, cur_installer, constraint)
408                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
409                     return False
410                 if (cur_installer is None) or (tc_fit_installer and cur_installer
411                                                not in tc_fit_installer):
412                     return False
413         return True
414
415     def _get_task_para(self, task, cur_pod):
416         task_args = task.get('task_args', None)
417         if task_args is not None:
418             task_args = task_args.get(cur_pod, task_args.get('default'))
419         task_args_fnames = task.get('task_args_fnames', None)
420         if task_args_fnames is not None:
421             task_args_fnames = task_args_fnames.get(cur_pod, None)
422         return task_args, task_args_fnames
423
424     def parse_suite(self):
425         """parse the suite file and return a list of task config file paths
426            and lists of optional parameters if present"""
427         LOG.info("\nParsing suite file:%s", self.path)
428
429         try:
430             with open(self.path) as stream:
431                 cfg = yaml.load(stream)
432         except IOError as ioerror:
433             sys.exit(ioerror)
434
435         self._check_schema(cfg["schema"], "suite")
436         LOG.info("\nStarting scenario:%s", cfg["name"])
437
438         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
439         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
440                                       test_cases_dir)
441         if test_cases_dir[-1] != os.sep:
442             test_cases_dir += os.sep
443
444         cur_pod = os.environ.get('NODE_NAME', None)
445         cur_installer = os.environ.get('INSTALLER_TYPE', None)
446
447         valid_task_files = []
448         valid_task_args = []
449         valid_task_args_fnames = []
450
451         for task in cfg["test_cases"]:
452             # 1.check file_name
453             if "file_name" in task:
454                 task_fname = task.get('file_name', None)
455                 if task_fname is None:
456                     continue
457             else:
458                 continue
459             # 2.check constraint
460             if self._meet_constraint(task, cur_pod, cur_installer):
461                 valid_task_files.append(test_cases_dir + task_fname)
462             else:
463                 continue
464             # 3.fetch task parameters
465             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
466             valid_task_args.append(task_args)
467             valid_task_args_fnames.append(task_args_fnames)
468
469         return valid_task_files, valid_task_args, valid_task_args_fnames
470
471     def parse_task(self, task_id, task_args=None, task_args_file=None):
472         """parses the task file and return an context and scenario instances"""
473         print("Parsing task config:", self.path)
474
475         try:
476             kw = {}
477             if task_args_file:
478                 with open(task_args_file) as f:
479                     kw.update(parse_task_args("task_args_file", f.read()))
480             kw.update(parse_task_args("task_args", task_args))
481         except TypeError:
482             raise TypeError()
483
484         try:
485             with open(self.path) as f:
486                 try:
487                     input_task = f.read()
488                     rendered_task = TaskTemplate.render(input_task, **kw)
489                 except Exception as e:
490                     print("Failed to render template:\n%(task)s\n%(err)s\n"
491                           % {"task": input_task, "err": e})
492                     raise e
493                 print("Input task is:\n%s\n" % rendered_task)
494
495                 cfg = yaml.load(rendered_task)
496         except IOError as ioerror:
497             sys.exit(ioerror)
498
499         self._check_schema(cfg["schema"], "task")
500         meet_precondition = self._check_precondition(cfg)
501
502         # TODO: support one or many contexts? Many would simpler and precise
503         # TODO: support hybrid context type
504         if "context" in cfg:
505             context_cfgs = [cfg["context"]]
506         elif "contexts" in cfg:
507             context_cfgs = cfg["contexts"]
508         else:
509             context_cfgs = [{"type": "Dummy"}]
510
511         contexts = []
512         name_suffix = '-{}'.format(task_id[:8])
513         for cfg_attrs in context_cfgs:
514             try:
515                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
516                                                   name_suffix)
517             except KeyError:
518                 pass
519             # default to Heat context because we are testing OpenStack
520             context_type = cfg_attrs.get("type", "Heat")
521             context = Context.get(context_type)
522             context.init(cfg_attrs)
523             contexts.append(context)
524
525         run_in_parallel = cfg.get("run_in_parallel", False)
526
527         # add tc and task id for influxdb extended tags
528         for scenario in cfg["scenarios"]:
529             task_name = os.path.splitext(os.path.basename(self.path))[0]
530             scenario["tc"] = task_name
531             scenario["task_id"] = task_id
532             # embed task path into scenario so we can load other files
533             # relative to task path
534             scenario["task_path"] = os.path.dirname(self.path)
535
536             change_server_name(scenario, name_suffix)
537
538             try:
539                 for node in scenario['nodes']:
540                     scenario['nodes'][node] += name_suffix
541             except KeyError:
542                 pass
543
544         # TODO we need something better here, a class that represent the file
545         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
546
547     def _check_schema(self, cfg_schema, schema_type):
548         """Check if config file is using the correct schema type"""
549
550         if cfg_schema != "yardstick:" + schema_type + ":0.1":
551             sys.exit("error: file %s has unknown schema %s" % (self.path,
552                                                                cfg_schema))
553
554     def _check_precondition(self, cfg):
555         """Check if the environment meet the precondition"""
556
557         if "precondition" in cfg:
558             precondition = cfg["precondition"]
559             installer_type = precondition.get("installer_type", None)
560             deploy_scenarios = precondition.get("deploy_scenarios", None)
561             tc_fit_pods = precondition.get("pod_name", None)
562             installer_type_env = os.environ.get('INSTALL_TYPE', None)
563             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
564             pod_name_env = os.environ.get('NODE_NAME', None)
565
566             LOG.info("installer_type: %s, installer_type_env: %s",
567                      installer_type, installer_type_env)
568             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
569                      deploy_scenarios, deploy_scenario_env)
570             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
571                      tc_fit_pods, pod_name_env)
572             if installer_type and installer_type_env:
573                 if installer_type_env not in installer_type:
574                     return False
575             if deploy_scenarios and deploy_scenario_env:
576                 deploy_scenarios_list = deploy_scenarios.split(',')
577                 for deploy_scenario in deploy_scenarios_list:
578                     if deploy_scenario_env.startswith(deploy_scenario):
579                         return True
580                 return False
581             if tc_fit_pods and pod_name_env:
582                 if pod_name_env not in tc_fit_pods:
583                     return False
584         return True
585
586
587 def is_ip_addr(addr):
588     """check if string addr is an IP address"""
589     try:
590         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
591     except AttributeError:
592         pass
593
594     try:
595         ipaddress.ip_address(addr.encode('utf-8'))
596     except ValueError:
597         return False
598     else:
599         return True
600
601
602 def _is_background_scenario(scenario):
603     if "run_in_background" in scenario:
604         return scenario["run_in_background"]
605     else:
606         return False
607
608
609 def parse_nodes_with_context(scenario_cfg):
610     """parse the 'nodes' fields in scenario """
611     # ensure consistency in node instantiation order
612     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
613                        for nodename in sorted(scenario_cfg["nodes"]))
614
615
616 def get_networks_from_nodes(nodes):
617     """parse the 'nodes' fields in scenario """
618     networks = {}
619     for node in nodes.values():
620         if not node:
621             continue
622         interfaces = node.get('interfaces', {})
623         for interface in interfaces.values():
624             vld_id = interface.get('vld_id')
625             # mgmt network doesn't have vld_id
626             if not vld_id:
627                 continue
628             network = Context.get_network({"vld_id": vld_id})
629             if network:
630                 networks[network['name']] = network
631     return networks
632
633
634 def runner_join(runner):
635     """join (wait for) a runner, exit process at runner failure"""
636     status = runner.join()
637     base_runner.Runner.release(runner)
638     return status
639
640
641 def print_invalid_header(source_name, args):
642     print("Invalid %(source)s passed:\n\n %(args)s\n"
643           % {"source": source_name, "args": args})
644
645
646 def parse_task_args(src_name, args):
647     if isinstance(args, collections.Mapping):
648         return args
649
650     try:
651         kw = args and yaml.safe_load(args)
652         kw = {} if kw is None else kw
653     except yaml.parser.ParserError as e:
654         print_invalid_header(src_name, args)
655         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
656               % {"source": src_name, "err": e})
657         raise TypeError()
658
659     if not isinstance(kw, dict):
660         print_invalid_header(src_name, args)
661         print("%(src)s had to be dict, actually %(src_type)s\n"
662               % {"src": src_name, "src_type": type(kw)})
663         raise TypeError()
664     return kw
665
666
667 def check_environment():
668     auth_url = os.environ.get('OS_AUTH_URL', None)
669     if not auth_url:
670         try:
671             source_env(constants.OPENRC)
672         except IOError as e:
673             if e.errno != errno.EEXIST:
674                 raise
675             LOG.debug('OPENRC file not found')
676
677
678 def change_server_name(scenario, suffix):
679     try:
680         host = scenario['host']
681     except KeyError:
682         pass
683     else:
684         try:
685             host['name'] += suffix
686         except TypeError:
687             scenario['host'] += suffix
688
689     try:
690         target = scenario['target']
691     except KeyError:
692         pass
693     else:
694         try:
695             target['name'] += suffix
696         except TypeError:
697             scenario['target'] += suffix
698
699     try:
700         key = 'targets'
701         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
702     except KeyError:
703         pass