Bugfix: background scenario can't not update result
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 import collections
24
25 from six.moves import filter
26
27 from yardstick.benchmark.contexts.base import Context
28 from yardstick.benchmark.runners import base as base_runner
29 from yardstick.dispatcher.base import Base as DispatcherBase
30 from yardstick.common.task_template import TaskTemplate
31 from yardstick.common.utils import source_env
32 from yardstick.common import utils
33 from yardstick.common import constants
34
35 output_file_default = "/tmp/yardstick.out"
36 config_file = '/etc/yardstick/yardstick.conf'
37 test_cases_dir_default = "tests/opnfv/test_cases/"
38 LOG = logging.getLogger(__name__)
39
40
41 class Task(object):     # pragma: no cover
42     """Task commands.
43
44        Set of commands to manage benchmark tasks.
45     """
46
47     def __init__(self):
48         self.contexts = []
49         self.outputs = {}
50
51     def start(self, args, **kwargs):
52         """Start a benchmark scenario."""
53
54         atexit.register(self.atexit_handler)
55
56         task_id = getattr(args, 'task_id')
57         self.task_id = task_id if task_id else str(uuid.uuid4())
58
59         check_environment()
60
61         output_config = utils.parse_ini_file(config_file)
62         self._init_output_config(output_config)
63         self._set_output_config(output_config, args.output_file)
64         LOG.debug('Output configuration is: %s', output_config)
65
66         if output_config['DEFAULT'].get('dispatcher') == 'file':
67             result = {'status': 0, 'result': {}}
68             utils.write_json_to_file(args.output_file, result)
69
70         total_start_time = time.time()
71         parser = TaskParser(args.inputfile[0])
72
73         if args.suite:
74             # 1.parse suite, return suite_params info
75             task_files, task_args, task_args_fnames = \
76                 parser.parse_suite()
77         else:
78             task_files = [parser.path]
79             task_args = [args.task_args]
80             task_args_fnames = [args.task_args_file]
81
82         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
83                  task_files, task_args, task_args_fnames)
84
85         if args.parse_only:
86             sys.exit(0)
87
88         testcases = {}
89         # parse task_files
90         for i in range(0, len(task_files)):
91             one_task_start_time = time.time()
92             parser.path = task_files[i]
93             scenarios, run_in_parallel, meet_precondition, contexts = \
94                 parser.parse_task(self.task_id, task_args[i],
95                                   task_args_fnames[i])
96
97             self.contexts.extend(contexts)
98
99             if not meet_precondition:
100                 LOG.info("meet_precondition is %s, please check envrionment",
101                          meet_precondition)
102                 continue
103
104             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
105             try:
106                 data = self._run(scenarios, run_in_parallel, args.output_file)
107             except KeyboardInterrupt:
108                 raise
109             except Exception:
110                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
111             else:
112                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
113
114             if args.keep_deploy:
115                 # keep deployment, forget about stack
116                 # (hide it for exit handler)
117                 self.contexts = []
118             else:
119                 for context in self.contexts[::-1]:
120                     context.undeploy()
121                 self.contexts = []
122             one_task_end_time = time.time()
123             LOG.info("task %s finished in %d secs", task_files[i],
124                      one_task_end_time - one_task_start_time)
125
126         result = self._get_format_result(testcases)
127
128         self._do_output(output_config, result)
129
130         total_end_time = time.time()
131         LOG.info("total finished in %d secs",
132                  total_end_time - total_start_time)
133
134         scenario = scenarios[0]
135         print("To generate report execute => yardstick report generate ",
136               scenario['task_id'], scenario['tc'])
137
138         print("Done, exiting")
139         return result
140
141     def _init_output_config(self, output_config):
142         output_config.setdefault('DEFAULT', {})
143         output_config.setdefault('dispatcher_http', {})
144         output_config.setdefault('dispatcher_file', {})
145         output_config.setdefault('dispatcher_influxdb', {})
146         output_config.setdefault('nsb', {})
147
148     def _set_output_config(self, output_config, file_path):
149         try:
150             out_type = os.environ['DISPATCHER']
151         except KeyError:
152             output_config['DEFAULT'].setdefault('dispatcher', 'file')
153         else:
154             output_config['DEFAULT']['dispatcher'] = out_type
155
156         output_config['dispatcher_file']['file_path'] = file_path
157
158         try:
159             target = os.environ['TARGET']
160         except KeyError:
161             pass
162         else:
163             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
164             output_config[k]['target'] = target
165
166     def _get_format_result(self, testcases):
167         criteria = self._get_task_criteria(testcases)
168
169         info = {
170             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
171             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
172             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
173             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
174         }
175
176         result = {
177             'status': 1,
178             'result': {
179                 'criteria': criteria,
180                 'task_id': self.task_id,
181                 'info': info,
182                 'testcases': testcases
183             }
184         }
185
186         return result
187
188     def _get_task_criteria(self, testcases):
189         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
190         if criteria:
191             return 'FAIL'
192         else:
193             return 'PASS'
194
195     def _do_output(self, output_config, result):
196
197         dispatcher = DispatcherBase.get(output_config)
198         dispatcher.flush_result_data(result)
199
200     def _run(self, scenarios, run_in_parallel, output_file):
201         """Deploys context and calls runners"""
202         for context in self.contexts:
203             context.deploy()
204
205         background_runners = []
206
207         result = []
208         # Start all background scenarios
209         for scenario in filter(_is_background_scenario, scenarios):
210             scenario["runner"] = dict(type="Duration", duration=1000000000)
211             runner = self.run_one_scenario(scenario, output_file)
212             background_runners.append(runner)
213
214         runners = []
215         if run_in_parallel:
216             for scenario in scenarios:
217                 if not _is_background_scenario(scenario):
218                     runner = self.run_one_scenario(scenario, output_file)
219                     runners.append(runner)
220
221             # Wait for runners to finish
222             for runner in runners:
223                 status = runner_join(runner)
224                 if status != 0:
225                     raise RuntimeError
226                 self.outputs.update(runner.get_output())
227                 result.extend(runner.get_result())
228                 print("Runner ended, output in", output_file)
229         else:
230             # run serially
231             for scenario in scenarios:
232                 if not _is_background_scenario(scenario):
233                     runner = self.run_one_scenario(scenario, output_file)
234                     status = runner_join(runner)
235                     if status != 0:
236                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
237                         raise RuntimeError
238                     self.outputs.update(runner.get_output())
239                     result.extend(runner.get_result())
240                     print("Runner ended, output in", output_file)
241
242         # Abort background runners
243         for runner in background_runners:
244             runner.abort()
245
246         # Wait for background runners to finish
247         for runner in background_runners:
248             status = runner.join(timeout=60)
249             if status is None:
250                 # Nuke if it did not stop nicely
251                 base_runner.Runner.terminate(runner)
252                 status = runner_join(runner)
253             else:
254                 base_runner.Runner.release(runner)
255
256             self.outputs.update(runner.get_output())
257             result.extend(runner.get_result())
258
259             if status != 0:
260                 raise RuntimeError
261             print("Background task ended")
262
263         return result
264
265     def atexit_handler(self):
266         """handler for process termination"""
267         base_runner.Runner.terminate_all()
268
269         if self.contexts:
270             print("Undeploying all contexts")
271             for context in self.contexts[::-1]:
272                 context.undeploy()
273
274     def _parse_options(self, op):
275         if isinstance(op, dict):
276             return {k: self._parse_options(v) for k, v in op.items()}
277         elif isinstance(op, list):
278             return [self._parse_options(v) for v in op]
279         elif isinstance(op, str):
280             return self.outputs.get(op[1:]) if op.startswith('$') else op
281         else:
282             return op
283
284     def run_one_scenario(self, scenario_cfg, output_file):
285         """run one scenario using context"""
286         runner_cfg = scenario_cfg["runner"]
287         runner_cfg['output_filename'] = output_file
288
289         options = scenario_cfg.get('options', {})
290         scenario_cfg['options'] = self._parse_options(options)
291
292         # TODO support get multi hosts/vms info
293         context_cfg = {}
294         if "host" in scenario_cfg:
295             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
296
297         if "target" in scenario_cfg:
298             if is_ip_addr(scenario_cfg["target"]):
299                 context_cfg['target'] = {}
300                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
301             else:
302                 context_cfg['target'] = Context.get_server(
303                     scenario_cfg["target"])
304                 if self._is_same_heat_context(scenario_cfg["host"],
305                                               scenario_cfg["target"]):
306                     context_cfg["target"]["ipaddr"] = \
307                         context_cfg["target"]["private_ip"]
308                 else:
309                     context_cfg["target"]["ipaddr"] = \
310                         context_cfg["target"]["ip"]
311
312         if "targets" in scenario_cfg:
313             ip_list = []
314             for target in scenario_cfg["targets"]:
315                 if is_ip_addr(target):
316                     ip_list.append(target)
317                     context_cfg['target'] = {}
318                 else:
319                     context_cfg['target'] = Context.get_server(target)
320                     if self._is_same_heat_context(scenario_cfg["host"],
321                                                   target):
322                         ip_list.append(context_cfg["target"]["private_ip"])
323                     else:
324                         ip_list.append(context_cfg["target"]["ip"])
325             context_cfg['target']['ipaddr'] = ','.join(ip_list)
326
327         if "nodes" in scenario_cfg:
328             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
329         runner = base_runner.Runner.get(runner_cfg)
330
331         print("Starting runner of type '%s'" % runner_cfg["type"])
332         runner.run(scenario_cfg, context_cfg)
333
334         return runner
335
336     def _is_same_heat_context(self, host_attr, target_attr):
337         """check if two servers are in the same heat context
338         host_attr: either a name for a server created by yardstick or a dict
339         with attribute name mapping when using external heat templates
340         target_attr: either a name for a server created by yardstick or a dict
341         with attribute name mapping when using external heat templates
342         """
343         host = None
344         target = None
345         for context in self.contexts:
346             if context.__context_type__ != "Heat":
347                 continue
348
349             host = context._get_server(host_attr)
350             if host is None:
351                 continue
352
353             target = context._get_server(target_attr)
354             if target is None:
355                 return False
356
357             # Both host and target is not None, then they are in the
358             # same heat context.
359             return True
360
361         return False
362
363
364 class TaskParser(object):       # pragma: no cover
365     """Parser for task config files in yaml format"""
366
367     def __init__(self, path):
368         self.path = path
369
370     def _meet_constraint(self, task, cur_pod, cur_installer):
371         if "constraint" in task:
372             constraint = task.get('constraint', None)
373             if constraint is not None:
374                 tc_fit_pod = constraint.get('pod', None)
375                 tc_fit_installer = constraint.get('installer', None)
376                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
377                          cur_pod, cur_installer, constraint)
378                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
379                     return False
380                 if cur_installer and tc_fit_installer and \
381                         cur_installer not in tc_fit_installer:
382                     return False
383         return True
384
385     def _get_task_para(self, task, cur_pod):
386         task_args = task.get('task_args', None)
387         if task_args is not None:
388             task_args = task_args.get(cur_pod, None)
389         task_args_fnames = task.get('task_args_fnames', None)
390         if task_args_fnames is not None:
391             task_args_fnames = task_args_fnames.get(cur_pod, None)
392         return task_args, task_args_fnames
393
394     def parse_suite(self):
395         """parse the suite file and return a list of task config file paths
396            and lists of optional parameters if present"""
397         LOG.info("\nParsing suite file:%s", self.path)
398
399         try:
400             with open(self.path) as stream:
401                 cfg = yaml.load(stream)
402         except IOError as ioerror:
403             sys.exit(ioerror)
404
405         self._check_schema(cfg["schema"], "suite")
406         LOG.info("\nStarting scenario:%s", cfg["name"])
407
408         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
409         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
410                                       test_cases_dir)
411         if test_cases_dir[-1] != os.sep:
412             test_cases_dir += os.sep
413
414         cur_pod = os.environ.get('NODE_NAME', None)
415         cur_installer = os.environ.get('INSTALLER_TYPE', None)
416
417         valid_task_files = []
418         valid_task_args = []
419         valid_task_args_fnames = []
420
421         for task in cfg["test_cases"]:
422             # 1.check file_name
423             if "file_name" in task:
424                 task_fname = task.get('file_name', None)
425                 if task_fname is None:
426                     continue
427             else:
428                 continue
429             # 2.check constraint
430             if self._meet_constraint(task, cur_pod, cur_installer):
431                 valid_task_files.append(test_cases_dir + task_fname)
432             else:
433                 continue
434             # 3.fetch task parameters
435             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
436             valid_task_args.append(task_args)
437             valid_task_args_fnames.append(task_args_fnames)
438
439         return valid_task_files, valid_task_args, valid_task_args_fnames
440
441     def parse_task(self, task_id, task_args=None, task_args_file=None):
442         """parses the task file and return an context and scenario instances"""
443         print("Parsing task config:", self.path)
444
445         try:
446             kw = {}
447             if task_args_file:
448                 with open(task_args_file) as f:
449                     kw.update(parse_task_args("task_args_file", f.read()))
450             kw.update(parse_task_args("task_args", task_args))
451         except TypeError:
452             raise TypeError()
453
454         try:
455             with open(self.path) as f:
456                 try:
457                     input_task = f.read()
458                     rendered_task = TaskTemplate.render(input_task, **kw)
459                 except Exception as e:
460                     print("Failed to render template:\n%(task)s\n%(err)s\n"
461                           % {"task": input_task, "err": e})
462                     raise e
463                 print("Input task is:\n%s\n" % rendered_task)
464
465                 cfg = yaml.load(rendered_task)
466         except IOError as ioerror:
467             sys.exit(ioerror)
468
469         self._check_schema(cfg["schema"], "task")
470         meet_precondition = self._check_precondition(cfg)
471
472         # TODO: support one or many contexts? Many would simpler and precise
473         # TODO: support hybrid context type
474         if "context" in cfg:
475             context_cfgs = [cfg["context"]]
476         elif "contexts" in cfg:
477             context_cfgs = cfg["contexts"]
478         else:
479             context_cfgs = [{"type": "Dummy"}]
480
481         contexts = []
482         name_suffix = '-{}'.format(task_id[:8])
483         for cfg_attrs in context_cfgs:
484             try:
485                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
486                                                   name_suffix)
487             except KeyError:
488                 pass
489             # default to Heat context because we are testing OpenStack
490             context_type = cfg_attrs.get("type", "Heat")
491             context = Context.get(context_type)
492             context.init(cfg_attrs)
493             contexts.append(context)
494
495         run_in_parallel = cfg.get("run_in_parallel", False)
496
497         # add tc and task id for influxdb extended tags
498         for scenario in cfg["scenarios"]:
499             task_name = os.path.splitext(os.path.basename(self.path))[0]
500             scenario["tc"] = task_name
501             scenario["task_id"] = task_id
502             # embed task path into scenario so we can load other files
503             # relative to task path
504             scenario["task_path"] = os.path.dirname(self.path)
505
506             change_server_name(scenario, name_suffix)
507
508             try:
509                 for node in scenario['nodes']:
510                     scenario['nodes'][node] += name_suffix
511             except KeyError:
512                 pass
513
514         # TODO we need something better here, a class that represent the file
515         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
516
517     def _check_schema(self, cfg_schema, schema_type):
518         """Check if config file is using the correct schema type"""
519
520         if cfg_schema != "yardstick:" + schema_type + ":0.1":
521             sys.exit("error: file %s has unknown schema %s" % (self.path,
522                                                                cfg_schema))
523
524     def _check_precondition(self, cfg):
525         """Check if the envrionment meet the preconditon"""
526
527         if "precondition" in cfg:
528             precondition = cfg["precondition"]
529             installer_type = precondition.get("installer_type", None)
530             deploy_scenarios = precondition.get("deploy_scenarios", None)
531             tc_fit_pods = precondition.get("pod_name", None)
532             installer_type_env = os.environ.get('INSTALL_TYPE', None)
533             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
534             pod_name_env = os.environ.get('NODE_NAME', None)
535
536             LOG.info("installer_type: %s, installer_type_env: %s",
537                      installer_type, installer_type_env)
538             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
539                      deploy_scenarios, deploy_scenario_env)
540             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
541                      tc_fit_pods, pod_name_env)
542             if installer_type and installer_type_env:
543                 if installer_type_env not in installer_type:
544                     return False
545             if deploy_scenarios and deploy_scenario_env:
546                 deploy_scenarios_list = deploy_scenarios.split(',')
547                 for deploy_scenario in deploy_scenarios_list:
548                     if deploy_scenario_env.startswith(deploy_scenario):
549                         return True
550                 return False
551             if tc_fit_pods and pod_name_env:
552                 if pod_name_env not in tc_fit_pods:
553                     return False
554         return True
555
556
557 def is_ip_addr(addr):
558     """check if string addr is an IP address"""
559     try:
560         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
561     except AttributeError:
562         pass
563
564     try:
565         ipaddress.ip_address(addr.encode('utf-8'))
566     except ValueError:
567         return False
568     else:
569         return True
570
571
572 def _is_background_scenario(scenario):
573     if "run_in_background" in scenario:
574         return scenario["run_in_background"]
575     else:
576         return False
577
578
579 def parse_nodes_with_context(scenario_cfg):
580     """paras the 'nodes' fields in scenario """
581     nodes = scenario_cfg["nodes"]
582
583     nodes_cfg = {}
584     for nodename in nodes:
585         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
586
587     return nodes_cfg
588
589
590 def runner_join(runner):
591     """join (wait for) a runner, exit process at runner failure"""
592     status = runner.join()
593     base_runner.Runner.release(runner)
594     return status
595
596
597 def print_invalid_header(source_name, args):
598     print("Invalid %(source)s passed:\n\n %(args)s\n"
599           % {"source": source_name, "args": args})
600
601
602 def parse_task_args(src_name, args):
603     if isinstance(args, collections.Mapping):
604         return args
605
606     try:
607         kw = args and yaml.safe_load(args)
608         kw = {} if kw is None else kw
609     except yaml.parser.ParserError as e:
610         print_invalid_header(src_name, args)
611         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
612               % {"source": src_name, "err": e})
613         raise TypeError()
614
615     if not isinstance(kw, dict):
616         print_invalid_header(src_name, args)
617         print("%(src)s had to be dict, actually %(src_type)s\n"
618               % {"src": src_name, "src_type": type(kw)})
619         raise TypeError()
620     return kw
621
622
623 def check_environment():
624     auth_url = os.environ.get('OS_AUTH_URL', None)
625     if not auth_url:
626         try:
627             source_env(constants.OPENRC)
628         except IOError as e:
629             if e.errno != errno.EEXIST:
630                 raise
631             LOG.debug('OPENRC file not found')
632
633
634 def change_server_name(scenario, suffix):
635     try:
636         host = scenario['host']
637     except KeyError:
638         pass
639     else:
640         try:
641             host['name'] += suffix
642         except TypeError:
643             scenario['host'] += suffix
644
645     try:
646         target = scenario['target']
647     except KeyError:
648         pass
649     else:
650         try:
651             target['name'] += suffix
652         except TypeError:
653             scenario['target'] += suffix
654
655     try:
656         key = 'targets'
657         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
658     except KeyError:
659         pass