Merge "yardstick setup ansible, including load_images"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import errno
25 import collections
26
27 from six.moves import filter
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.runners import base as base_runner
31 from yardstick.dispatcher.base import Base as DispatcherBase
32 from yardstick.common.task_template import TaskTemplate
33 from yardstick.common.utils import source_env
34 from yardstick.common import utils
35 from yardstick.common import constants
36
37 output_file_default = "/tmp/yardstick.out"
38 config_file = '/etc/yardstick/yardstick.conf'
39 test_cases_dir_default = "tests/opnfv/test_cases/"
40 LOG = logging.getLogger(__name__)
41 JOIN_TIMEOUT = 60
42
43
44 class Task(object):     # pragma: no cover
45     """Task commands.
46
47        Set of commands to manage benchmark tasks.
48     """
49
50     def __init__(self):
51         self.contexts = []
52         self.outputs = {}
53
54     def _set_dispatchers(self, output_config):
55         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
56                                                            'file')
57         out_types = [s.strip() for s in dispatchers.split(',')]
58         output_config['DEFAULT']['dispatcher'] = out_types
59
60     def start(self, args, **kwargs):
61         """Start a benchmark scenario."""
62
63         atexit.register(self.atexit_handler)
64
65         task_id = getattr(args, 'task_id')
66         self.task_id = task_id if task_id else str(uuid.uuid4())
67
68         self._set_log()
69
70         check_environment()
71
72         try:
73             output_config = utils.parse_ini_file(config_file)
74         except Exception:
75             # all error will be ignore, the default value is {}
76             output_config = {}
77
78         self._init_output_config(output_config)
79         self._set_output_config(output_config, args.output_file)
80         LOG.debug('Output configuration is: %s', output_config)
81
82         self._set_dispatchers(output_config)
83
84         # update dispatcher list
85         if 'file' in output_config['DEFAULT']['dispatcher']:
86             result = {'status': 0, 'result': {}}
87             utils.write_json_to_file(args.output_file, result)
88
89         total_start_time = time.time()
90         parser = TaskParser(args.inputfile[0])
91
92         if args.suite:
93             # 1.parse suite, return suite_params info
94             task_files, task_args, task_args_fnames = \
95                 parser.parse_suite()
96         else:
97             task_files = [parser.path]
98             task_args = [args.task_args]
99             task_args_fnames = [args.task_args_file]
100
101         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
102                  task_files, task_args, task_args_fnames)
103
104         if args.parse_only:
105             sys.exit(0)
106
107         testcases = {}
108         # parse task_files
109         for i in range(0, len(task_files)):
110             one_task_start_time = time.time()
111             parser.path = task_files[i]
112             scenarios, run_in_parallel, meet_precondition, contexts = \
113                 parser.parse_task(self.task_id, task_args[i],
114                                   task_args_fnames[i])
115
116             self.contexts.extend(contexts)
117
118             if not meet_precondition:
119                 LOG.info("meet_precondition is %s, please check envrionment",
120                          meet_precondition)
121                 continue
122
123             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
124             try:
125                 data = self._run(scenarios, run_in_parallel, args.output_file)
126             except KeyboardInterrupt:
127                 raise
128             except Exception:
129                 LOG.exception("Running test case %s failed!", case_name)
130                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
131             else:
132                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
133
134             if args.keep_deploy:
135                 # keep deployment, forget about stack
136                 # (hide it for exit handler)
137                 self.contexts = []
138             else:
139                 for context in self.contexts[::-1]:
140                     context.undeploy()
141                 self.contexts = []
142             one_task_end_time = time.time()
143             LOG.info("task %s finished in %d secs", task_files[i],
144                      one_task_end_time - one_task_start_time)
145
146         result = self._get_format_result(testcases)
147
148         self._do_output(output_config, result)
149
150         total_end_time = time.time()
151         LOG.info("total finished in %d secs",
152                  total_end_time - total_start_time)
153
154         scenario = scenarios[0]
155         print("To generate report execute => yardstick report generate ",
156               scenario['task_id'], scenario['tc'])
157
158         print("Done, exiting")
159         return result
160
161     def _set_log(self):
162         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
163         log_formatter = logging.Formatter(log_format)
164
165         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
166         log_handler = logging.FileHandler(log_path)
167         log_handler.setFormatter(log_formatter)
168         log_handler.setLevel(logging.DEBUG)
169
170         logging.root.addHandler(log_handler)
171
172     def _init_output_config(self, output_config):
173         output_config.setdefault('DEFAULT', {})
174         output_config.setdefault('dispatcher_http', {})
175         output_config.setdefault('dispatcher_file', {})
176         output_config.setdefault('dispatcher_influxdb', {})
177         output_config.setdefault('nsb', {})
178
179     def _set_output_config(self, output_config, file_path):
180         try:
181             out_type = os.environ['DISPATCHER']
182         except KeyError:
183             output_config['DEFAULT'].setdefault('dispatcher', 'file')
184         else:
185             output_config['DEFAULT']['dispatcher'] = out_type
186
187         output_config['dispatcher_file']['file_path'] = file_path
188
189         try:
190             target = os.environ['TARGET']
191         except KeyError:
192             pass
193         else:
194             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
195             output_config[k]['target'] = target
196
197     def _get_format_result(self, testcases):
198         criteria = self._get_task_criteria(testcases)
199
200         info = {
201             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
202             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
203             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
204             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
205         }
206
207         result = {
208             'status': 1,
209             'result': {
210                 'criteria': criteria,
211                 'task_id': self.task_id,
212                 'info': info,
213                 'testcases': testcases
214             }
215         }
216
217         return result
218
219     def _get_task_criteria(self, testcases):
220         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
221         if criteria:
222             return 'FAIL'
223         else:
224             return 'PASS'
225
226     def _do_output(self, output_config, result):
227         dispatchers = DispatcherBase.get(output_config)
228
229         for dispatcher in dispatchers:
230             dispatcher.flush_result_data(result)
231
232     def _run(self, scenarios, run_in_parallel, output_file):
233         """Deploys context and calls runners"""
234         for context in self.contexts:
235             context.deploy()
236
237         background_runners = []
238
239         result = []
240         # Start all background scenarios
241         for scenario in filter(_is_background_scenario, scenarios):
242             scenario["runner"] = dict(type="Duration", duration=1000000000)
243             runner = self.run_one_scenario(scenario, output_file)
244             background_runners.append(runner)
245
246         runners = []
247         if run_in_parallel:
248             for scenario in scenarios:
249                 if not _is_background_scenario(scenario):
250                     runner = self.run_one_scenario(scenario, output_file)
251                     runners.append(runner)
252
253             # Wait for runners to finish
254             for runner in runners:
255                 status = runner_join(runner)
256                 if status != 0:
257                     raise RuntimeError
258                 self.outputs.update(runner.get_output())
259                 result.extend(runner.get_result())
260                 print("Runner ended, output in", output_file)
261         else:
262             # run serially
263             for scenario in scenarios:
264                 if not _is_background_scenario(scenario):
265                     runner = self.run_one_scenario(scenario, output_file)
266                     status = runner_join(runner)
267                     if status != 0:
268                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
269                         raise RuntimeError
270                     self.outputs.update(runner.get_output())
271                     result.extend(runner.get_result())
272                     print("Runner ended, output in", output_file)
273
274         # Abort background runners
275         for runner in background_runners:
276             runner.abort()
277
278         # Wait for background runners to finish
279         for runner in background_runners:
280             status = runner.join(JOIN_TIMEOUT)
281             if status is None:
282                 # Nuke if it did not stop nicely
283                 base_runner.Runner.terminate(runner)
284                 runner.join(JOIN_TIMEOUT)
285             base_runner.Runner.release(runner)
286
287             self.outputs.update(runner.get_output())
288             result.extend(runner.get_result())
289             print("Background task ended")
290         return result
291
292     def atexit_handler(self):
293         """handler for process termination"""
294         base_runner.Runner.terminate_all()
295
296         if self.contexts:
297             print("Undeploying all contexts")
298             for context in self.contexts[::-1]:
299                 context.undeploy()
300
301     def _parse_options(self, op):
302         if isinstance(op, dict):
303             return {k: self._parse_options(v) for k, v in op.items()}
304         elif isinstance(op, list):
305             return [self._parse_options(v) for v in op]
306         elif isinstance(op, str):
307             return self.outputs.get(op[1:]) if op.startswith('$') else op
308         else:
309             return op
310
311     def run_one_scenario(self, scenario_cfg, output_file):
312         """run one scenario using context"""
313         runner_cfg = scenario_cfg["runner"]
314         runner_cfg['output_filename'] = output_file
315
316         options = scenario_cfg.get('options', {})
317         scenario_cfg['options'] = self._parse_options(options)
318
319         # TODO support get multi hosts/vms info
320         context_cfg = {}
321         if "host" in scenario_cfg:
322             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
323
324         if "target" in scenario_cfg:
325             if is_ip_addr(scenario_cfg["target"]):
326                 context_cfg['target'] = {}
327                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
328             else:
329                 context_cfg['target'] = Context.get_server(
330                     scenario_cfg["target"])
331                 if self._is_same_heat_context(scenario_cfg["host"],
332                                               scenario_cfg["target"]):
333                     context_cfg["target"]["ipaddr"] = \
334                         context_cfg["target"]["private_ip"]
335                 else:
336                     context_cfg["target"]["ipaddr"] = \
337                         context_cfg["target"]["ip"]
338
339         if "targets" in scenario_cfg:
340             ip_list = []
341             for target in scenario_cfg["targets"]:
342                 if is_ip_addr(target):
343                     ip_list.append(target)
344                     context_cfg['target'] = {}
345                 else:
346                     context_cfg['target'] = Context.get_server(target)
347                     if self._is_same_heat_context(scenario_cfg["host"],
348                                                   target):
349                         ip_list.append(context_cfg["target"]["private_ip"])
350                     else:
351                         ip_list.append(context_cfg["target"]["ip"])
352             context_cfg['target']['ipaddr'] = ','.join(ip_list)
353
354         if "nodes" in scenario_cfg:
355             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
356             context_cfg["networks"] = get_networks_from_nodes(
357                 context_cfg["nodes"])
358         runner = base_runner.Runner.get(runner_cfg)
359
360         print("Starting runner of type '%s'" % runner_cfg["type"])
361         runner.run(scenario_cfg, context_cfg)
362
363         return runner
364
365     def _is_same_heat_context(self, host_attr, target_attr):
366         """check if two servers are in the same heat context
367         host_attr: either a name for a server created by yardstick or a dict
368         with attribute name mapping when using external heat templates
369         target_attr: either a name for a server created by yardstick or a dict
370         with attribute name mapping when using external heat templates
371         """
372         host = None
373         target = None
374         for context in self.contexts:
375             if context.__context_type__ != "Heat":
376                 continue
377
378             host = context._get_server(host_attr)
379             if host is None:
380                 continue
381
382             target = context._get_server(target_attr)
383             if target is None:
384                 return False
385
386             # Both host and target is not None, then they are in the
387             # same heat context.
388             return True
389
390         return False
391
392
393 class TaskParser(object):       # pragma: no cover
394     """Parser for task config files in yaml format"""
395
396     def __init__(self, path):
397         self.path = path
398
399     def _meet_constraint(self, task, cur_pod, cur_installer):
400         if "constraint" in task:
401             constraint = task.get('constraint', None)
402             if constraint is not None:
403                 tc_fit_pod = constraint.get('pod', None)
404                 tc_fit_installer = constraint.get('installer', None)
405                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
406                          cur_pod, cur_installer, constraint)
407                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
408                     return False
409                 if (cur_installer is None) or (tc_fit_installer and cur_installer
410                                                not in tc_fit_installer):
411                     return False
412         return True
413
414     def _get_task_para(self, task, cur_pod):
415         task_args = task.get('task_args', None)
416         if task_args is not None:
417             task_args = task_args.get(cur_pod, task_args.get('default'))
418         task_args_fnames = task.get('task_args_fnames', None)
419         if task_args_fnames is not None:
420             task_args_fnames = task_args_fnames.get(cur_pod, None)
421         return task_args, task_args_fnames
422
423     def parse_suite(self):
424         """parse the suite file and return a list of task config file paths
425            and lists of optional parameters if present"""
426         LOG.info("\nParsing suite file:%s", self.path)
427
428         try:
429             with open(self.path) as stream:
430                 cfg = yaml.load(stream)
431         except IOError as ioerror:
432             sys.exit(ioerror)
433
434         self._check_schema(cfg["schema"], "suite")
435         LOG.info("\nStarting scenario:%s", cfg["name"])
436
437         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
438         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
439                                       test_cases_dir)
440         if test_cases_dir[-1] != os.sep:
441             test_cases_dir += os.sep
442
443         cur_pod = os.environ.get('NODE_NAME', None)
444         cur_installer = os.environ.get('INSTALLER_TYPE', None)
445
446         valid_task_files = []
447         valid_task_args = []
448         valid_task_args_fnames = []
449
450         for task in cfg["test_cases"]:
451             # 1.check file_name
452             if "file_name" in task:
453                 task_fname = task.get('file_name', None)
454                 if task_fname is None:
455                     continue
456             else:
457                 continue
458             # 2.check constraint
459             if self._meet_constraint(task, cur_pod, cur_installer):
460                 valid_task_files.append(test_cases_dir + task_fname)
461             else:
462                 continue
463             # 3.fetch task parameters
464             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
465             valid_task_args.append(task_args)
466             valid_task_args_fnames.append(task_args_fnames)
467
468         return valid_task_files, valid_task_args, valid_task_args_fnames
469
470     def parse_task(self, task_id, task_args=None, task_args_file=None):
471         """parses the task file and return an context and scenario instances"""
472         print("Parsing task config:", self.path)
473
474         try:
475             kw = {}
476             if task_args_file:
477                 with open(task_args_file) as f:
478                     kw.update(parse_task_args("task_args_file", f.read()))
479             kw.update(parse_task_args("task_args", task_args))
480         except TypeError:
481             raise TypeError()
482
483         try:
484             with open(self.path) as f:
485                 try:
486                     input_task = f.read()
487                     rendered_task = TaskTemplate.render(input_task, **kw)
488                 except Exception as e:
489                     print("Failed to render template:\n%(task)s\n%(err)s\n"
490                           % {"task": input_task, "err": e})
491                     raise e
492                 print("Input task is:\n%s\n" % rendered_task)
493
494                 cfg = yaml.load(rendered_task)
495         except IOError as ioerror:
496             sys.exit(ioerror)
497
498         self._check_schema(cfg["schema"], "task")
499         meet_precondition = self._check_precondition(cfg)
500
501         # TODO: support one or many contexts? Many would simpler and precise
502         # TODO: support hybrid context type
503         if "context" in cfg:
504             context_cfgs = [cfg["context"]]
505         elif "contexts" in cfg:
506             context_cfgs = cfg["contexts"]
507         else:
508             context_cfgs = [{"type": "Dummy"}]
509
510         contexts = []
511         name_suffix = '-{}'.format(task_id[:8])
512         for cfg_attrs in context_cfgs:
513             try:
514                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
515                                                   name_suffix)
516             except KeyError:
517                 pass
518             # default to Heat context because we are testing OpenStack
519             context_type = cfg_attrs.get("type", "Heat")
520             context = Context.get(context_type)
521             context.init(cfg_attrs)
522             contexts.append(context)
523
524         run_in_parallel = cfg.get("run_in_parallel", False)
525
526         # add tc and task id for influxdb extended tags
527         for scenario in cfg["scenarios"]:
528             task_name = os.path.splitext(os.path.basename(self.path))[0]
529             scenario["tc"] = task_name
530             scenario["task_id"] = task_id
531             # embed task path into scenario so we can load other files
532             # relative to task path
533             scenario["task_path"] = os.path.dirname(self.path)
534
535             change_server_name(scenario, name_suffix)
536
537             try:
538                 for node in scenario['nodes']:
539                     scenario['nodes'][node] += name_suffix
540             except KeyError:
541                 pass
542
543         # TODO we need something better here, a class that represent the file
544         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
545
546     def _check_schema(self, cfg_schema, schema_type):
547         """Check if config file is using the correct schema type"""
548
549         if cfg_schema != "yardstick:" + schema_type + ":0.1":
550             sys.exit("error: file %s has unknown schema %s" % (self.path,
551                                                                cfg_schema))
552
553     def _check_precondition(self, cfg):
554         """Check if the environment meet the precondition"""
555
556         if "precondition" in cfg:
557             precondition = cfg["precondition"]
558             installer_type = precondition.get("installer_type", None)
559             deploy_scenarios = precondition.get("deploy_scenarios", None)
560             tc_fit_pods = precondition.get("pod_name", None)
561             installer_type_env = os.environ.get('INSTALL_TYPE', None)
562             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
563             pod_name_env = os.environ.get('NODE_NAME', None)
564
565             LOG.info("installer_type: %s, installer_type_env: %s",
566                      installer_type, installer_type_env)
567             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
568                      deploy_scenarios, deploy_scenario_env)
569             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
570                      tc_fit_pods, pod_name_env)
571             if installer_type and installer_type_env:
572                 if installer_type_env not in installer_type:
573                     return False
574             if deploy_scenarios and deploy_scenario_env:
575                 deploy_scenarios_list = deploy_scenarios.split(',')
576                 for deploy_scenario in deploy_scenarios_list:
577                     if deploy_scenario_env.startswith(deploy_scenario):
578                         return True
579                 return False
580             if tc_fit_pods and pod_name_env:
581                 if pod_name_env not in tc_fit_pods:
582                     return False
583         return True
584
585
586 def is_ip_addr(addr):
587     """check if string addr is an IP address"""
588     try:
589         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
590     except AttributeError:
591         pass
592
593     try:
594         ipaddress.ip_address(addr.encode('utf-8'))
595     except ValueError:
596         return False
597     else:
598         return True
599
600
601 def _is_background_scenario(scenario):
602     if "run_in_background" in scenario:
603         return scenario["run_in_background"]
604     else:
605         return False
606
607
608 def parse_nodes_with_context(scenario_cfg):
609     """parse the 'nodes' fields in scenario """
610     # ensure consistency in node instantiation order
611     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
612                        for nodename in sorted(scenario_cfg["nodes"]))
613
614
615 def get_networks_from_nodes(nodes):
616     """parse the 'nodes' fields in scenario """
617     networks = {}
618     for node in nodes.values():
619         if not node:
620             continue
621         interfaces = node.get('interfaces', {})
622         for interface in interfaces.values():
623             vld_id = interface.get('vld_id')
624             # mgmt network doesn't have vld_id
625             if not vld_id:
626                 continue
627             network = Context.get_network({"vld_id": vld_id})
628             if network:
629                 networks[network['name']] = network
630     return networks
631
632
633 def runner_join(runner):
634     """join (wait for) a runner, exit process at runner failure"""
635     status = runner.join()
636     base_runner.Runner.release(runner)
637     return status
638
639
640 def print_invalid_header(source_name, args):
641     print("Invalid %(source)s passed:\n\n %(args)s\n"
642           % {"source": source_name, "args": args})
643
644
645 def parse_task_args(src_name, args):
646     if isinstance(args, collections.Mapping):
647         return args
648
649     try:
650         kw = args and yaml.safe_load(args)
651         kw = {} if kw is None else kw
652     except yaml.parser.ParserError as e:
653         print_invalid_header(src_name, args)
654         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
655               % {"source": src_name, "err": e})
656         raise TypeError()
657
658     if not isinstance(kw, dict):
659         print_invalid_header(src_name, args)
660         print("%(src)s had to be dict, actually %(src_type)s\n"
661               % {"src": src_name, "src_type": type(kw)})
662         raise TypeError()
663     return kw
664
665
666 def check_environment():
667     auth_url = os.environ.get('OS_AUTH_URL', None)
668     if not auth_url:
669         try:
670             source_env(constants.OPENRC)
671         except IOError as e:
672             if e.errno != errno.EEXIST:
673                 raise
674             LOG.debug('OPENRC file not found')
675
676
677 def change_server_name(scenario, suffix):
678     try:
679         host = scenario['host']
680     except KeyError:
681         pass
682     else:
683         try:
684             host['name'] += suffix
685         except TypeError:
686             scenario['host'] += suffix
687
688     try:
689         target = scenario['target']
690     except KeyError:
691         pass
692     else:
693         try:
694             target['name'] += suffix
695         except TypeError:
696             scenario['target'] += suffix
697
698     try:
699         key = 'targets'
700         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
701     except KeyError:
702         pass