Merge "Enable vnf/tg instantiate as blocking call."
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import errno
25 import collections
26
27 from six.moves import filter
28 from jinja2 import Environment
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.runners import base as base_runner
32 from yardstick.common.yaml_loader import yaml_load
33 from yardstick.dispatcher.base import Base as DispatcherBase
34 from yardstick.common.task_template import TaskTemplate
35 from yardstick.common.utils import source_env
36 from yardstick.common import utils
37 from yardstick.common import constants
38 from yardstick.common.html_template import report_template
39
40 output_file_default = "/tmp/yardstick.out"
41 config_file = '/etc/yardstick/yardstick.conf'
42 test_cases_dir_default = "tests/opnfv/test_cases/"
43 LOG = logging.getLogger(__name__)
44 JOIN_TIMEOUT = 60
45
46
47 class Task(object):     # pragma: no cover
48     """Task commands.
49
50        Set of commands to manage benchmark tasks.
51     """
52
53     def __init__(self):
54         self.contexts = []
55         self.outputs = {}
56
57     def _set_dispatchers(self, output_config):
58         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
59                                                            'file')
60         out_types = [s.strip() for s in dispatchers.split(',')]
61         output_config['DEFAULT']['dispatcher'] = out_types
62
63     def start(self, args, **kwargs):
64         """Start a benchmark scenario."""
65
66         atexit.register(self.atexit_handler)
67
68         task_id = getattr(args, 'task_id')
69         self.task_id = task_id if task_id else str(uuid.uuid4())
70
71         self._set_log()
72
73         check_environment()
74
75         try:
76             output_config = utils.parse_ini_file(config_file)
77         except Exception:
78             # all error will be ignore, the default value is {}
79             output_config = {}
80
81         self._init_output_config(output_config)
82         self._set_output_config(output_config, args.output_file)
83         LOG.debug('Output configuration is: %s', output_config)
84
85         self._set_dispatchers(output_config)
86
87         # update dispatcher list
88         if 'file' in output_config['DEFAULT']['dispatcher']:
89             result = {'status': 0, 'result': {}}
90             utils.write_json_to_file(args.output_file, result)
91
92         total_start_time = time.time()
93         parser = TaskParser(args.inputfile[0])
94
95         if args.suite:
96             # 1.parse suite, return suite_params info
97             task_files, task_args, task_args_fnames = \
98                 parser.parse_suite()
99         else:
100             task_files = [parser.path]
101             task_args = [args.task_args]
102             task_args_fnames = [args.task_args_file]
103
104         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
105                  task_files, task_args, task_args_fnames)
106
107         if args.parse_only:
108             sys.exit(0)
109
110         testcases = {}
111         # parse task_files
112         for i in range(0, len(task_files)):
113             one_task_start_time = time.time()
114             parser.path = task_files[i]
115             scenarios, run_in_parallel, meet_precondition, contexts = \
116                 parser.parse_task(self.task_id, task_args[i],
117                                   task_args_fnames[i])
118
119             self.contexts.extend(contexts)
120
121             if not meet_precondition:
122                 LOG.info("meet_precondition is %s, please check envrionment",
123                          meet_precondition)
124                 continue
125
126             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
127             try:
128                 data = self._run(scenarios, run_in_parallel, args.output_file)
129             except KeyboardInterrupt:
130                 raise
131             except Exception:
132                 LOG.exception("Running test case %s failed!", case_name)
133                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
134             else:
135                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
136
137             if args.keep_deploy:
138                 # keep deployment, forget about stack
139                 # (hide it for exit handler)
140                 self.contexts = []
141             else:
142                 for context in self.contexts[::-1]:
143                     context.undeploy()
144                 self.contexts = []
145             one_task_end_time = time.time()
146             LOG.info("task %s finished in %d secs", task_files[i],
147                      one_task_end_time - one_task_start_time)
148
149         result = self._get_format_result(testcases)
150
151         self._do_output(output_config, result)
152         self._generate_reporting(result)
153
154         total_end_time = time.time()
155         LOG.info("total finished in %d secs",
156                  total_end_time - total_start_time)
157
158         scenario = scenarios[0]
159         print("To generate report execute => yardstick report generate ",
160               scenario['task_id'], scenario['tc'])
161
162         print("Done, exiting")
163         return result
164
165     def _generate_reporting(self, result):
166         env = Environment()
167         with open(constants.REPORTING_FILE, 'w') as f:
168             f.write(env.from_string(report_template).render(result))
169
170         LOG.info('yardstick reporting generate in %s', constants.REPORTING_FILE)
171
172     def _set_log(self):
173         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
174         log_formatter = logging.Formatter(log_format)
175
176         utils.makedirs(constants.TASK_LOG_DIR)
177         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
178         log_handler = logging.FileHandler(log_path)
179         log_handler.setFormatter(log_formatter)
180         log_handler.setLevel(logging.DEBUG)
181
182         logging.root.addHandler(log_handler)
183
184     def _init_output_config(self, output_config):
185         output_config.setdefault('DEFAULT', {})
186         output_config.setdefault('dispatcher_http', {})
187         output_config.setdefault('dispatcher_file', {})
188         output_config.setdefault('dispatcher_influxdb', {})
189         output_config.setdefault('nsb', {})
190
191     def _set_output_config(self, output_config, file_path):
192         try:
193             out_type = os.environ['DISPATCHER']
194         except KeyError:
195             output_config['DEFAULT'].setdefault('dispatcher', 'file')
196         else:
197             output_config['DEFAULT']['dispatcher'] = out_type
198
199         output_config['dispatcher_file']['file_path'] = file_path
200
201         try:
202             target = os.environ['TARGET']
203         except KeyError:
204             pass
205         else:
206             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
207             output_config[k]['target'] = target
208
209     def _get_format_result(self, testcases):
210         criteria = self._get_task_criteria(testcases)
211
212         info = {
213             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
214             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
215             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
216             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
217         }
218
219         result = {
220             'status': 1,
221             'result': {
222                 'criteria': criteria,
223                 'task_id': self.task_id,
224                 'info': info,
225                 'testcases': testcases
226             }
227         }
228
229         return result
230
231     def _get_task_criteria(self, testcases):
232         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
233         if criteria:
234             return 'FAIL'
235         else:
236             return 'PASS'
237
238     def _do_output(self, output_config, result):
239         dispatchers = DispatcherBase.get(output_config)
240
241         for dispatcher in dispatchers:
242             dispatcher.flush_result_data(result)
243
244     def _run(self, scenarios, run_in_parallel, output_file):
245         """Deploys context and calls runners"""
246         for context in self.contexts:
247             context.deploy()
248
249         background_runners = []
250
251         result = []
252         # Start all background scenarios
253         for scenario in filter(_is_background_scenario, scenarios):
254             scenario["runner"] = dict(type="Duration", duration=1000000000)
255             runner = self.run_one_scenario(scenario, output_file)
256             background_runners.append(runner)
257
258         runners = []
259         if run_in_parallel:
260             for scenario in scenarios:
261                 if not _is_background_scenario(scenario):
262                     runner = self.run_one_scenario(scenario, output_file)
263                     runners.append(runner)
264
265             # Wait for runners to finish
266             for runner in runners:
267                 status = runner_join(runner)
268                 if status != 0:
269                     raise RuntimeError
270                 self.outputs.update(runner.get_output())
271                 result.extend(runner.get_result())
272                 print("Runner ended, output in", output_file)
273         else:
274             # run serially
275             for scenario in scenarios:
276                 if not _is_background_scenario(scenario):
277                     runner = self.run_one_scenario(scenario, output_file)
278                     status = runner_join(runner)
279                     if status != 0:
280                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
281                         raise RuntimeError
282                     self.outputs.update(runner.get_output())
283                     result.extend(runner.get_result())
284                     print("Runner ended, output in", output_file)
285
286         # Abort background runners
287         for runner in background_runners:
288             runner.abort()
289
290         # Wait for background runners to finish
291         for runner in background_runners:
292             status = runner.join(JOIN_TIMEOUT)
293             if status is None:
294                 # Nuke if it did not stop nicely
295                 base_runner.Runner.terminate(runner)
296                 runner.join(JOIN_TIMEOUT)
297             base_runner.Runner.release(runner)
298
299             self.outputs.update(runner.get_output())
300             result.extend(runner.get_result())
301             print("Background task ended")
302         return result
303
304     def atexit_handler(self):
305         """handler for process termination"""
306         base_runner.Runner.terminate_all()
307
308         if self.contexts:
309             print("Undeploying all contexts")
310             for context in self.contexts[::-1]:
311                 context.undeploy()
312
313     def _parse_options(self, op):
314         if isinstance(op, dict):
315             return {k: self._parse_options(v) for k, v in op.items()}
316         elif isinstance(op, list):
317             return [self._parse_options(v) for v in op]
318         elif isinstance(op, str):
319             return self.outputs.get(op[1:]) if op.startswith('$') else op
320         else:
321             return op
322
323     def run_one_scenario(self, scenario_cfg, output_file):
324         """run one scenario using context"""
325         runner_cfg = scenario_cfg["runner"]
326         runner_cfg['output_filename'] = output_file
327
328         options = scenario_cfg.get('options', {})
329         scenario_cfg['options'] = self._parse_options(options)
330
331         # TODO support get multi hosts/vms info
332         context_cfg = {}
333         if "host" in scenario_cfg:
334             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
335
336         if "target" in scenario_cfg:
337             if is_ip_addr(scenario_cfg["target"]):
338                 context_cfg['target'] = {}
339                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
340             else:
341                 context_cfg['target'] = Context.get_server(
342                     scenario_cfg["target"])
343                 if self._is_same_heat_context(scenario_cfg["host"],
344                                               scenario_cfg["target"]):
345                     context_cfg["target"]["ipaddr"] = \
346                         context_cfg["target"]["private_ip"]
347                 else:
348                     context_cfg["target"]["ipaddr"] = \
349                         context_cfg["target"]["ip"]
350
351         if "targets" in scenario_cfg:
352             ip_list = []
353             for target in scenario_cfg["targets"]:
354                 if is_ip_addr(target):
355                     ip_list.append(target)
356                     context_cfg['target'] = {}
357                 else:
358                     context_cfg['target'] = Context.get_server(target)
359                     if self._is_same_heat_context(scenario_cfg["host"],
360                                                   target):
361                         ip_list.append(context_cfg["target"]["private_ip"])
362                     else:
363                         ip_list.append(context_cfg["target"]["ip"])
364             context_cfg['target']['ipaddr'] = ','.join(ip_list)
365
366         if "nodes" in scenario_cfg:
367             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
368             context_cfg["networks"] = get_networks_from_nodes(
369                 context_cfg["nodes"])
370         runner = base_runner.Runner.get(runner_cfg)
371
372         print("Starting runner of type '%s'" % runner_cfg["type"])
373         runner.run(scenario_cfg, context_cfg)
374
375         return runner
376
377     def _is_same_heat_context(self, host_attr, target_attr):
378         """check if two servers are in the same heat context
379         host_attr: either a name for a server created by yardstick or a dict
380         with attribute name mapping when using external heat templates
381         target_attr: either a name for a server created by yardstick or a dict
382         with attribute name mapping when using external heat templates
383         """
384         host = None
385         target = None
386         for context in self.contexts:
387             if context.__context_type__ != "Heat":
388                 continue
389
390             host = context._get_server(host_attr)
391             if host is None:
392                 continue
393
394             target = context._get_server(target_attr)
395             if target is None:
396                 return False
397
398             # Both host and target is not None, then they are in the
399             # same heat context.
400             return True
401
402         return False
403
404
405 class TaskParser(object):       # pragma: no cover
406     """Parser for task config files in yaml format"""
407
408     def __init__(self, path):
409         self.path = path
410
411     def _meet_constraint(self, task, cur_pod, cur_installer):
412         if "constraint" in task:
413             constraint = task.get('constraint', None)
414             if constraint is not None:
415                 tc_fit_pod = constraint.get('pod', None)
416                 tc_fit_installer = constraint.get('installer', None)
417                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
418                          cur_pod, cur_installer, constraint)
419                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
420                     return False
421                 if (cur_installer is None) or (tc_fit_installer and cur_installer
422                                                not in tc_fit_installer):
423                     return False
424         return True
425
426     def _get_task_para(self, task, cur_pod):
427         task_args = task.get('task_args', None)
428         if task_args is not None:
429             task_args = task_args.get(cur_pod, task_args.get('default'))
430         task_args_fnames = task.get('task_args_fnames', None)
431         if task_args_fnames is not None:
432             task_args_fnames = task_args_fnames.get(cur_pod, None)
433         return task_args, task_args_fnames
434
435     def parse_suite(self):
436         """parse the suite file and return a list of task config file paths
437            and lists of optional parameters if present"""
438         LOG.info("\nParsing suite file:%s", self.path)
439
440         try:
441             with open(self.path) as stream:
442                 cfg = yaml_load(stream)
443         except IOError as ioerror:
444             sys.exit(ioerror)
445
446         self._check_schema(cfg["schema"], "suite")
447         LOG.info("\nStarting scenario:%s", cfg["name"])
448
449         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
450         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
451                                       test_cases_dir)
452         if test_cases_dir[-1] != os.sep:
453             test_cases_dir += os.sep
454
455         cur_pod = os.environ.get('NODE_NAME', None)
456         cur_installer = os.environ.get('INSTALLER_TYPE', None)
457
458         valid_task_files = []
459         valid_task_args = []
460         valid_task_args_fnames = []
461
462         for task in cfg["test_cases"]:
463             # 1.check file_name
464             if "file_name" in task:
465                 task_fname = task.get('file_name', None)
466                 if task_fname is None:
467                     continue
468             else:
469                 continue
470             # 2.check constraint
471             if self._meet_constraint(task, cur_pod, cur_installer):
472                 valid_task_files.append(test_cases_dir + task_fname)
473             else:
474                 continue
475             # 3.fetch task parameters
476             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
477             valid_task_args.append(task_args)
478             valid_task_args_fnames.append(task_args_fnames)
479
480         return valid_task_files, valid_task_args, valid_task_args_fnames
481
482     def parse_task(self, task_id, task_args=None, task_args_file=None):
483         """parses the task file and return an context and scenario instances"""
484         print("Parsing task config:", self.path)
485
486         try:
487             kw = {}
488             if task_args_file:
489                 with open(task_args_file) as f:
490                     kw.update(parse_task_args("task_args_file", f.read()))
491             kw.update(parse_task_args("task_args", task_args))
492         except TypeError:
493             raise TypeError()
494
495         try:
496             with open(self.path) as f:
497                 try:
498                     input_task = f.read()
499                     rendered_task = TaskTemplate.render(input_task, **kw)
500                 except Exception as e:
501                     print("Failed to render template:\n%(task)s\n%(err)s\n"
502                           % {"task": input_task, "err": e})
503                     raise e
504                 print("Input task is:\n%s\n" % rendered_task)
505
506                 cfg = yaml_load(rendered_task)
507         except IOError as ioerror:
508             sys.exit(ioerror)
509
510         self._check_schema(cfg["schema"], "task")
511         meet_precondition = self._check_precondition(cfg)
512
513         # TODO: support one or many contexts? Many would simpler and precise
514         # TODO: support hybrid context type
515         if "context" in cfg:
516             context_cfgs = [cfg["context"]]
517         elif "contexts" in cfg:
518             context_cfgs = cfg["contexts"]
519         else:
520             context_cfgs = [{"type": "Dummy"}]
521
522         contexts = []
523         name_suffix = '-{}'.format(task_id[:8])
524         for cfg_attrs in context_cfgs:
525             try:
526                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
527                                                   name_suffix)
528             except KeyError:
529                 pass
530             # default to Heat context because we are testing OpenStack
531             context_type = cfg_attrs.get("type", "Heat")
532             context = Context.get(context_type)
533             context.init(cfg_attrs)
534             contexts.append(context)
535
536         run_in_parallel = cfg.get("run_in_parallel", False)
537
538         # add tc and task id for influxdb extended tags
539         for scenario in cfg["scenarios"]:
540             task_name = os.path.splitext(os.path.basename(self.path))[0]
541             scenario["tc"] = task_name
542             scenario["task_id"] = task_id
543             # embed task path into scenario so we can load other files
544             # relative to task path
545             scenario["task_path"] = os.path.dirname(self.path)
546
547             change_server_name(scenario, name_suffix)
548
549             try:
550                 for node in scenario['nodes']:
551                     scenario['nodes'][node] += name_suffix
552             except KeyError:
553                 pass
554
555         # TODO we need something better here, a class that represent the file
556         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
557
558     def _check_schema(self, cfg_schema, schema_type):
559         """Check if config file is using the correct schema type"""
560
561         if cfg_schema != "yardstick:" + schema_type + ":0.1":
562             sys.exit("error: file %s has unknown schema %s" % (self.path,
563                                                                cfg_schema))
564
565     def _check_precondition(self, cfg):
566         """Check if the environment meet the precondition"""
567
568         if "precondition" in cfg:
569             precondition = cfg["precondition"]
570             installer_type = precondition.get("installer_type", None)
571             deploy_scenarios = precondition.get("deploy_scenarios", None)
572             tc_fit_pods = precondition.get("pod_name", None)
573             installer_type_env = os.environ.get('INSTALL_TYPE', None)
574             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
575             pod_name_env = os.environ.get('NODE_NAME', None)
576
577             LOG.info("installer_type: %s, installer_type_env: %s",
578                      installer_type, installer_type_env)
579             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
580                      deploy_scenarios, deploy_scenario_env)
581             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
582                      tc_fit_pods, pod_name_env)
583             if installer_type and installer_type_env:
584                 if installer_type_env not in installer_type:
585                     return False
586             if deploy_scenarios and deploy_scenario_env:
587                 deploy_scenarios_list = deploy_scenarios.split(',')
588                 for deploy_scenario in deploy_scenarios_list:
589                     if deploy_scenario_env.startswith(deploy_scenario):
590                         return True
591                 return False
592             if tc_fit_pods and pod_name_env:
593                 if pod_name_env not in tc_fit_pods:
594                     return False
595         return True
596
597
598 def is_ip_addr(addr):
599     """check if string addr is an IP address"""
600     try:
601         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
602     except AttributeError:
603         pass
604
605     try:
606         ipaddress.ip_address(addr.encode('utf-8'))
607     except ValueError:
608         return False
609     else:
610         return True
611
612
613 def _is_background_scenario(scenario):
614     if "run_in_background" in scenario:
615         return scenario["run_in_background"]
616     else:
617         return False
618
619
620 def parse_nodes_with_context(scenario_cfg):
621     """parse the 'nodes' fields in scenario """
622     # ensure consistency in node instantiation order
623     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
624                        for nodename in sorted(scenario_cfg["nodes"]))
625
626
627 def get_networks_from_nodes(nodes):
628     """parse the 'nodes' fields in scenario """
629     networks = {}
630     for node in nodes.values():
631         if not node:
632             continue
633         interfaces = node.get('interfaces', {})
634         for interface in interfaces.values():
635             vld_id = interface.get('vld_id')
636             # mgmt network doesn't have vld_id
637             if not vld_id:
638                 continue
639             network = Context.get_network({"vld_id": vld_id})
640             if network:
641                 networks[network['name']] = network
642     return networks
643
644
645 def runner_join(runner):
646     """join (wait for) a runner, exit process at runner failure"""
647     status = runner.join()
648     base_runner.Runner.release(runner)
649     return status
650
651
652 def print_invalid_header(source_name, args):
653     print("Invalid %(source)s passed:\n\n %(args)s\n"
654           % {"source": source_name, "args": args})
655
656
657 def parse_task_args(src_name, args):
658     if isinstance(args, collections.Mapping):
659         return args
660
661     try:
662         kw = args and yaml_load(args)
663         kw = {} if kw is None else kw
664     except yaml.parser.ParserError as e:
665         print_invalid_header(src_name, args)
666         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
667               % {"source": src_name, "err": e})
668         raise TypeError()
669
670     if not isinstance(kw, dict):
671         print_invalid_header(src_name, args)
672         print("%(src)s had to be dict, actually %(src_type)s\n"
673               % {"src": src_name, "src_type": type(kw)})
674         raise TypeError()
675     return kw
676
677
678 def check_environment():
679     auth_url = os.environ.get('OS_AUTH_URL', None)
680     if not auth_url:
681         try:
682             source_env(constants.OPENRC)
683         except IOError as e:
684             if e.errno != errno.EEXIST:
685                 raise
686             LOG.debug('OPENRC file not found')
687
688
689 def change_server_name(scenario, suffix):
690     try:
691         host = scenario['host']
692     except KeyError:
693         pass
694     else:
695         try:
696             host['name'] += suffix
697         except TypeError:
698             scenario['host'] += suffix
699
700     try:
701         target = scenario['target']
702     except KeyError:
703         pass
704     else:
705         try:
706             target['name'] += suffix
707         except TypeError:
708             scenario['target'] += suffix
709
710     try:
711         key = 'targets'
712         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
713     except KeyError:
714         pass