Merge "NSB fixes: docstring, whitespace, etc."
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import errno
25 import collections
26
27 from six.moves import filter
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.runners import base as base_runner
31 from yardstick.dispatcher.base import Base as DispatcherBase
32 from yardstick.common.task_template import TaskTemplate
33 from yardstick.common.utils import source_env
34 from yardstick.common import utils
35 from yardstick.common import constants
36
37 output_file_default = "/tmp/yardstick.out"
38 config_file = '/etc/yardstick/yardstick.conf'
39 test_cases_dir_default = "tests/opnfv/test_cases/"
40 LOG = logging.getLogger(__name__)
41 JOIN_TIMEOUT = 60
42
43
44 class Task(object):     # pragma: no cover
45     """Task commands.
46
47        Set of commands to manage benchmark tasks.
48     """
49
50     def __init__(self):
51         self.contexts = []
52         self.outputs = {}
53
54     def _set_dispatchers(self, output_config):
55         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
56                                                            'file')
57         out_types = [s.strip() for s in dispatchers.split(',')]
58         output_config['DEFAULT']['dispatcher'] = out_types
59
60     def start(self, args, **kwargs):
61         """Start a benchmark scenario."""
62
63         atexit.register(self.atexit_handler)
64
65         task_id = getattr(args, 'task_id')
66         self.task_id = task_id if task_id else str(uuid.uuid4())
67
68         check_environment()
69
70         try:
71             output_config = utils.parse_ini_file(config_file)
72         except Exception:
73             # all error will be ignore, the default value is {}
74             output_config = {}
75
76         self._init_output_config(output_config)
77         self._set_output_config(output_config, args.output_file)
78         LOG.debug('Output configuration is: %s', output_config)
79
80         self._set_dispatchers(output_config)
81
82         # update dispatcher list
83         if 'file' in output_config['DEFAULT']['dispatcher']:
84             result = {'status': 0, 'result': {}}
85             utils.write_json_to_file(args.output_file, result)
86
87         total_start_time = time.time()
88         parser = TaskParser(args.inputfile[0])
89
90         if args.suite:
91             # 1.parse suite, return suite_params info
92             task_files, task_args, task_args_fnames = \
93                 parser.parse_suite()
94         else:
95             task_files = [parser.path]
96             task_args = [args.task_args]
97             task_args_fnames = [args.task_args_file]
98
99         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
100                  task_files, task_args, task_args_fnames)
101
102         if args.parse_only:
103             sys.exit(0)
104
105         testcases = {}
106         # parse task_files
107         for i in range(0, len(task_files)):
108             one_task_start_time = time.time()
109             parser.path = task_files[i]
110             scenarios, run_in_parallel, meet_precondition, contexts = \
111                 parser.parse_task(self.task_id, task_args[i],
112                                   task_args_fnames[i])
113
114             self.contexts.extend(contexts)
115
116             if not meet_precondition:
117                 LOG.info("meet_precondition is %s, please check envrionment",
118                          meet_precondition)
119                 continue
120
121             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
122             try:
123                 data = self._run(scenarios, run_in_parallel, args.output_file)
124             except KeyboardInterrupt:
125                 raise
126             except Exception:
127                 LOG.exception("Running test case %s failed!", case_name)
128                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
129             else:
130                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
131
132             if args.keep_deploy:
133                 # keep deployment, forget about stack
134                 # (hide it for exit handler)
135                 self.contexts = []
136             else:
137                 for context in self.contexts[::-1]:
138                     context.undeploy()
139                 self.contexts = []
140             one_task_end_time = time.time()
141             LOG.info("task %s finished in %d secs", task_files[i],
142                      one_task_end_time - one_task_start_time)
143
144         result = self._get_format_result(testcases)
145
146         self._do_output(output_config, result)
147
148         total_end_time = time.time()
149         LOG.info("total finished in %d secs",
150                  total_end_time - total_start_time)
151
152         scenario = scenarios[0]
153         print("To generate report execute => yardstick report generate ",
154               scenario['task_id'], scenario['tc'])
155
156         print("Done, exiting")
157         return result
158
159     def _init_output_config(self, output_config):
160         output_config.setdefault('DEFAULT', {})
161         output_config.setdefault('dispatcher_http', {})
162         output_config.setdefault('dispatcher_file', {})
163         output_config.setdefault('dispatcher_influxdb', {})
164         output_config.setdefault('nsb', {})
165
166     def _set_output_config(self, output_config, file_path):
167         try:
168             out_type = os.environ['DISPATCHER']
169         except KeyError:
170             output_config['DEFAULT'].setdefault('dispatcher', 'file')
171         else:
172             output_config['DEFAULT']['dispatcher'] = out_type
173
174         output_config['dispatcher_file']['file_path'] = file_path
175
176         try:
177             target = os.environ['TARGET']
178         except KeyError:
179             pass
180         else:
181             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
182             output_config[k]['target'] = target
183
184     def _get_format_result(self, testcases):
185         criteria = self._get_task_criteria(testcases)
186
187         info = {
188             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
189             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
190             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
191             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
192         }
193
194         result = {
195             'status': 1,
196             'result': {
197                 'criteria': criteria,
198                 'task_id': self.task_id,
199                 'info': info,
200                 'testcases': testcases
201             }
202         }
203
204         return result
205
206     def _get_task_criteria(self, testcases):
207         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
208         if criteria:
209             return 'FAIL'
210         else:
211             return 'PASS'
212
213     def _do_output(self, output_config, result):
214         dispatchers = DispatcherBase.get(output_config)
215
216         for dispatcher in dispatchers:
217             dispatcher.flush_result_data(result)
218
219     def _run(self, scenarios, run_in_parallel, output_file):
220         """Deploys context and calls runners"""
221         for context in self.contexts:
222             context.deploy()
223
224         background_runners = []
225
226         result = []
227         # Start all background scenarios
228         for scenario in filter(_is_background_scenario, scenarios):
229             scenario["runner"] = dict(type="Duration", duration=1000000000)
230             runner = self.run_one_scenario(scenario, output_file)
231             background_runners.append(runner)
232
233         runners = []
234         if run_in_parallel:
235             for scenario in scenarios:
236                 if not _is_background_scenario(scenario):
237                     runner = self.run_one_scenario(scenario, output_file)
238                     runners.append(runner)
239
240             # Wait for runners to finish
241             for runner in runners:
242                 status = runner_join(runner)
243                 if status != 0:
244                     raise RuntimeError
245                 self.outputs.update(runner.get_output())
246                 result.extend(runner.get_result())
247                 print("Runner ended, output in", output_file)
248         else:
249             # run serially
250             for scenario in scenarios:
251                 if not _is_background_scenario(scenario):
252                     runner = self.run_one_scenario(scenario, output_file)
253                     status = runner_join(runner)
254                     if status != 0:
255                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
256                         raise RuntimeError
257                     self.outputs.update(runner.get_output())
258                     result.extend(runner.get_result())
259                     print("Runner ended, output in", output_file)
260
261         # Abort background runners
262         for runner in background_runners:
263             runner.abort()
264
265         # Wait for background runners to finish
266         for runner in background_runners:
267             status = runner.join(JOIN_TIMEOUT)
268             if status is None:
269                 # Nuke if it did not stop nicely
270                 base_runner.Runner.terminate(runner)
271                 runner.join(JOIN_TIMEOUT)
272             base_runner.Runner.release(runner)
273
274             self.outputs.update(runner.get_output())
275             result.extend(runner.get_result())
276             print("Background task ended")
277         return result
278
279     def atexit_handler(self):
280         """handler for process termination"""
281         base_runner.Runner.terminate_all()
282
283         if self.contexts:
284             print("Undeploying all contexts")
285             for context in self.contexts[::-1]:
286                 context.undeploy()
287
288     def _parse_options(self, op):
289         if isinstance(op, dict):
290             return {k: self._parse_options(v) for k, v in op.items()}
291         elif isinstance(op, list):
292             return [self._parse_options(v) for v in op]
293         elif isinstance(op, str):
294             return self.outputs.get(op[1:]) if op.startswith('$') else op
295         else:
296             return op
297
298     def run_one_scenario(self, scenario_cfg, output_file):
299         """run one scenario using context"""
300         runner_cfg = scenario_cfg["runner"]
301         runner_cfg['output_filename'] = output_file
302
303         options = scenario_cfg.get('options', {})
304         scenario_cfg['options'] = self._parse_options(options)
305
306         # TODO support get multi hosts/vms info
307         context_cfg = {}
308         if "host" in scenario_cfg:
309             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
310
311         if "target" in scenario_cfg:
312             if is_ip_addr(scenario_cfg["target"]):
313                 context_cfg['target'] = {}
314                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
315             else:
316                 context_cfg['target'] = Context.get_server(
317                     scenario_cfg["target"])
318                 if self._is_same_heat_context(scenario_cfg["host"],
319                                               scenario_cfg["target"]):
320                     context_cfg["target"]["ipaddr"] = \
321                         context_cfg["target"]["private_ip"]
322                 else:
323                     context_cfg["target"]["ipaddr"] = \
324                         context_cfg["target"]["ip"]
325
326         if "targets" in scenario_cfg:
327             ip_list = []
328             for target in scenario_cfg["targets"]:
329                 if is_ip_addr(target):
330                     ip_list.append(target)
331                     context_cfg['target'] = {}
332                 else:
333                     context_cfg['target'] = Context.get_server(target)
334                     if self._is_same_heat_context(scenario_cfg["host"],
335                                                   target):
336                         ip_list.append(context_cfg["target"]["private_ip"])
337                     else:
338                         ip_list.append(context_cfg["target"]["ip"])
339             context_cfg['target']['ipaddr'] = ','.join(ip_list)
340
341         if "nodes" in scenario_cfg:
342             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
343             context_cfg["networks"] = get_networks_from_nodes(
344                 context_cfg["nodes"])
345         runner = base_runner.Runner.get(runner_cfg)
346
347         print("Starting runner of type '%s'" % runner_cfg["type"])
348         runner.run(scenario_cfg, context_cfg)
349
350         return runner
351
352     def _is_same_heat_context(self, host_attr, target_attr):
353         """check if two servers are in the same heat context
354         host_attr: either a name for a server created by yardstick or a dict
355         with attribute name mapping when using external heat templates
356         target_attr: either a name for a server created by yardstick or a dict
357         with attribute name mapping when using external heat templates
358         """
359         host = None
360         target = None
361         for context in self.contexts:
362             if context.__context_type__ != "Heat":
363                 continue
364
365             host = context._get_server(host_attr)
366             if host is None:
367                 continue
368
369             target = context._get_server(target_attr)
370             if target is None:
371                 return False
372
373             # Both host and target is not None, then they are in the
374             # same heat context.
375             return True
376
377         return False
378
379
380 class TaskParser(object):       # pragma: no cover
381     """Parser for task config files in yaml format"""
382
383     def __init__(self, path):
384         self.path = path
385
386     def _meet_constraint(self, task, cur_pod, cur_installer):
387         if "constraint" in task:
388             constraint = task.get('constraint', None)
389             if constraint is not None:
390                 tc_fit_pod = constraint.get('pod', None)
391                 tc_fit_installer = constraint.get('installer', None)
392                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
393                          cur_pod, cur_installer, constraint)
394                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
395                     return False
396                 if (cur_installer is None) or (tc_fit_installer and cur_installer
397                                                not in tc_fit_installer):
398                     return False
399         return True
400
401     def _get_task_para(self, task, cur_pod):
402         task_args = task.get('task_args', None)
403         if task_args is not None:
404             task_args = task_args.get(cur_pod, task_args.get('default'))
405         task_args_fnames = task.get('task_args_fnames', None)
406         if task_args_fnames is not None:
407             task_args_fnames = task_args_fnames.get(cur_pod, None)
408         return task_args, task_args_fnames
409
410     def parse_suite(self):
411         """parse the suite file and return a list of task config file paths
412            and lists of optional parameters if present"""
413         LOG.info("\nParsing suite file:%s", self.path)
414
415         try:
416             with open(self.path) as stream:
417                 cfg = yaml.safe_load(stream)
418         except IOError as ioerror:
419             sys.exit(ioerror)
420
421         self._check_schema(cfg["schema"], "suite")
422         LOG.info("\nStarting scenario:%s", cfg["name"])
423
424         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
425         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
426                                       test_cases_dir)
427         if test_cases_dir[-1] != os.sep:
428             test_cases_dir += os.sep
429
430         cur_pod = os.environ.get('NODE_NAME', None)
431         cur_installer = os.environ.get('INSTALLER_TYPE', None)
432
433         valid_task_files = []
434         valid_task_args = []
435         valid_task_args_fnames = []
436
437         for task in cfg["test_cases"]:
438             # 1.check file_name
439             if "file_name" in task:
440                 task_fname = task.get('file_name', None)
441                 if task_fname is None:
442                     continue
443             else:
444                 continue
445             # 2.check constraint
446             if self._meet_constraint(task, cur_pod, cur_installer):
447                 valid_task_files.append(test_cases_dir + task_fname)
448             else:
449                 continue
450             # 3.fetch task parameters
451             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
452             valid_task_args.append(task_args)
453             valid_task_args_fnames.append(task_args_fnames)
454
455         return valid_task_files, valid_task_args, valid_task_args_fnames
456
457     def parse_task(self, task_id, task_args=None, task_args_file=None):
458         """parses the task file and return an context and scenario instances"""
459         print("Parsing task config:", self.path)
460
461         try:
462             kw = {}
463             if task_args_file:
464                 with open(task_args_file) as f:
465                     kw.update(parse_task_args("task_args_file", f.read()))
466             kw.update(parse_task_args("task_args", task_args))
467         except TypeError:
468             raise TypeError()
469
470         try:
471             with open(self.path) as f:
472                 try:
473                     input_task = f.read()
474                     rendered_task = TaskTemplate.render(input_task, **kw)
475                 except Exception as e:
476                     print("Failed to render template:\n%(task)s\n%(err)s\n"
477                           % {"task": input_task, "err": e})
478                     raise e
479                 print("Input task is:\n%s\n" % rendered_task)
480
481                 cfg = yaml.safe_load(rendered_task)
482         except IOError as ioerror:
483             sys.exit(ioerror)
484
485         self._check_schema(cfg["schema"], "task")
486         meet_precondition = self._check_precondition(cfg)
487
488         # TODO: support one or many contexts? Many would simpler and precise
489         # TODO: support hybrid context type
490         if "context" in cfg:
491             context_cfgs = [cfg["context"]]
492         elif "contexts" in cfg:
493             context_cfgs = cfg["contexts"]
494         else:
495             context_cfgs = [{"type": "Dummy"}]
496
497         contexts = []
498         name_suffix = '-{}'.format(task_id[:8])
499         for cfg_attrs in context_cfgs:
500             try:
501                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
502                                                   name_suffix)
503             except KeyError:
504                 pass
505             # default to Heat context because we are testing OpenStack
506             context_type = cfg_attrs.get("type", "Heat")
507             context = Context.get(context_type)
508             context.init(cfg_attrs)
509             contexts.append(context)
510
511         run_in_parallel = cfg.get("run_in_parallel", False)
512
513         # add tc and task id for influxdb extended tags
514         for scenario in cfg["scenarios"]:
515             task_name = os.path.splitext(os.path.basename(self.path))[0]
516             scenario["tc"] = task_name
517             scenario["task_id"] = task_id
518             # embed task path into scenario so we can load other files
519             # relative to task path
520             scenario["task_path"] = os.path.dirname(self.path)
521
522             change_server_name(scenario, name_suffix)
523
524             try:
525                 for node in scenario['nodes']:
526                     scenario['nodes'][node] += name_suffix
527             except KeyError:
528                 pass
529
530         # TODO we need something better here, a class that represent the file
531         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
532
533     def _check_schema(self, cfg_schema, schema_type):
534         """Check if config file is using the correct schema type"""
535
536         if cfg_schema != "yardstick:" + schema_type + ":0.1":
537             sys.exit("error: file %s has unknown schema %s" % (self.path,
538                                                                cfg_schema))
539
540     def _check_precondition(self, cfg):
541         """Check if the environment meet the precondition"""
542
543         if "precondition" in cfg:
544             precondition = cfg["precondition"]
545             installer_type = precondition.get("installer_type", None)
546             deploy_scenarios = precondition.get("deploy_scenarios", None)
547             tc_fit_pods = precondition.get("pod_name", None)
548             installer_type_env = os.environ.get('INSTALL_TYPE', None)
549             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
550             pod_name_env = os.environ.get('NODE_NAME', None)
551
552             LOG.info("installer_type: %s, installer_type_env: %s",
553                      installer_type, installer_type_env)
554             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
555                      deploy_scenarios, deploy_scenario_env)
556             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
557                      tc_fit_pods, pod_name_env)
558             if installer_type and installer_type_env:
559                 if installer_type_env not in installer_type:
560                     return False
561             if deploy_scenarios and deploy_scenario_env:
562                 deploy_scenarios_list = deploy_scenarios.split(',')
563                 for deploy_scenario in deploy_scenarios_list:
564                     if deploy_scenario_env.startswith(deploy_scenario):
565                         return True
566                 return False
567             if tc_fit_pods and pod_name_env:
568                 if pod_name_env not in tc_fit_pods:
569                     return False
570         return True
571
572
573 def is_ip_addr(addr):
574     """check if string addr is an IP address"""
575     try:
576         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
577     except AttributeError:
578         pass
579
580     try:
581         ipaddress.ip_address(addr.encode('utf-8'))
582     except ValueError:
583         return False
584     else:
585         return True
586
587
588 def _is_background_scenario(scenario):
589     if "run_in_background" in scenario:
590         return scenario["run_in_background"]
591     else:
592         return False
593
594
595 def parse_nodes_with_context(scenario_cfg):
596     """parse the 'nodes' fields in scenario """
597     # ensure consistency in node instantiation order
598     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
599                        for nodename in sorted(scenario_cfg["nodes"]))
600
601
602 def get_networks_from_nodes(nodes):
603     """parse the 'nodes' fields in scenario """
604     networks = {}
605     for node in nodes.values():
606         if not node:
607             continue
608         interfaces = node.get('interfaces', {})
609         for interface in interfaces.values():
610             vld_id = interface.get('vld_id')
611             # mgmt network doesn't have vld_id
612             if not vld_id:
613                 continue
614             network = Context.get_network({"vld_id": vld_id})
615             if network:
616                 networks[network['name']] = network
617     return networks
618
619
620 def runner_join(runner):
621     """join (wait for) a runner, exit process at runner failure"""
622     status = runner.join()
623     base_runner.Runner.release(runner)
624     return status
625
626
627 def print_invalid_header(source_name, args):
628     print("Invalid %(source)s passed:\n\n %(args)s\n"
629           % {"source": source_name, "args": args})
630
631
632 def parse_task_args(src_name, args):
633     if isinstance(args, collections.Mapping):
634         return args
635
636     try:
637         kw = args and yaml.safe_load(args)
638         kw = {} if kw is None else kw
639     except yaml.parser.ParserError as e:
640         print_invalid_header(src_name, args)
641         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
642               % {"source": src_name, "err": e})
643         raise TypeError()
644
645     if not isinstance(kw, dict):
646         print_invalid_header(src_name, args)
647         print("%(src)s had to be dict, actually %(src_type)s\n"
648               % {"src": src_name, "src_type": type(kw)})
649         raise TypeError()
650     return kw
651
652
653 def check_environment():
654     auth_url = os.environ.get('OS_AUTH_URL', None)
655     if not auth_url:
656         try:
657             source_env(constants.OPENRC)
658         except IOError as e:
659             if e.errno != errno.EEXIST:
660                 raise
661             LOG.debug('OPENRC file not found')
662
663
664 def change_server_name(scenario, suffix):
665     try:
666         host = scenario['host']
667     except KeyError:
668         pass
669     else:
670         try:
671             host['name'] += suffix
672         except TypeError:
673             scenario['host'] += suffix
674
675     try:
676         target = scenario['target']
677     except KeyError:
678         pass
679     else:
680         try:
681             target['name'] += suffix
682         except TypeError:
683             scenario['target'] += suffix
684
685     try:
686         key = 'targets'
687         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
688     except KeyError:
689         pass