ea3239edfb5bccf60690400ca77db13a5cf3afed
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import errno
25 import collections
26
27 from six.moves import filter
28 from jinja2 import Environment
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.runners import base as base_runner
32 from yardstick.dispatcher.base import Base as DispatcherBase
33 from yardstick.common.task_template import TaskTemplate
34 from yardstick.common.utils import source_env
35 from yardstick.common import utils
36 from yardstick.common import constants
37 from yardstick.common.html_template import report_template
38
39 output_file_default = "/tmp/yardstick.out"
40 config_file = '/etc/yardstick/yardstick.conf'
41 test_cases_dir_default = "tests/opnfv/test_cases/"
42 LOG = logging.getLogger(__name__)
43 JOIN_TIMEOUT = 60
44
45
46 class Task(object):     # pragma: no cover
47     """Task commands.
48
49        Set of commands to manage benchmark tasks.
50     """
51
52     def __init__(self):
53         self.contexts = []
54         self.outputs = {}
55
56     def _set_dispatchers(self, output_config):
57         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
58                                                            'file')
59         out_types = [s.strip() for s in dispatchers.split(',')]
60         output_config['DEFAULT']['dispatcher'] = out_types
61
62     def start(self, args, **kwargs):
63         """Start a benchmark scenario."""
64
65         atexit.register(self.atexit_handler)
66
67         task_id = getattr(args, 'task_id')
68         self.task_id = task_id if task_id else str(uuid.uuid4())
69
70         self._set_log()
71
72         check_environment()
73
74         try:
75             output_config = utils.parse_ini_file(config_file)
76         except Exception:
77             # all error will be ignore, the default value is {}
78             output_config = {}
79
80         self._init_output_config(output_config)
81         self._set_output_config(output_config, args.output_file)
82         LOG.debug('Output configuration is: %s', output_config)
83
84         self._set_dispatchers(output_config)
85
86         # update dispatcher list
87         if 'file' in output_config['DEFAULT']['dispatcher']:
88             result = {'status': 0, 'result': {}}
89             utils.write_json_to_file(args.output_file, result)
90
91         total_start_time = time.time()
92         parser = TaskParser(args.inputfile[0])
93
94         if args.suite:
95             # 1.parse suite, return suite_params info
96             task_files, task_args, task_args_fnames = \
97                 parser.parse_suite()
98         else:
99             task_files = [parser.path]
100             task_args = [args.task_args]
101             task_args_fnames = [args.task_args_file]
102
103         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
104                  task_files, task_args, task_args_fnames)
105
106         if args.parse_only:
107             sys.exit(0)
108
109         testcases = {}
110         # parse task_files
111         for i in range(0, len(task_files)):
112             one_task_start_time = time.time()
113             parser.path = task_files[i]
114             scenarios, run_in_parallel, meet_precondition, contexts = \
115                 parser.parse_task(self.task_id, task_args[i],
116                                   task_args_fnames[i])
117
118             self.contexts.extend(contexts)
119
120             if not meet_precondition:
121                 LOG.info("meet_precondition is %s, please check envrionment",
122                          meet_precondition)
123                 continue
124
125             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
126             try:
127                 data = self._run(scenarios, run_in_parallel, args.output_file)
128             except KeyboardInterrupt:
129                 raise
130             except Exception:
131                 LOG.exception("Running test case %s failed!", case_name)
132                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
133             else:
134                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
135
136             if args.keep_deploy:
137                 # keep deployment, forget about stack
138                 # (hide it for exit handler)
139                 self.contexts = []
140             else:
141                 for context in self.contexts[::-1]:
142                     context.undeploy()
143                 self.contexts = []
144             one_task_end_time = time.time()
145             LOG.info("task %s finished in %d secs", task_files[i],
146                      one_task_end_time - one_task_start_time)
147
148         result = self._get_format_result(testcases)
149
150         self._do_output(output_config, result)
151         self._generate_reporting(result)
152
153         total_end_time = time.time()
154         LOG.info("total finished in %d secs",
155                  total_end_time - total_start_time)
156
157         scenario = scenarios[0]
158         print("To generate report execute => yardstick report generate ",
159               scenario['task_id'], scenario['tc'])
160
161         print("Done, exiting")
162         return result
163
164     def _generate_reporting(self, result):
165         env = Environment()
166         with open(constants.REPORTING_FILE, 'w') as f:
167             f.write(env.from_string(report_template).render(result))
168
169         LOG.info('yardstick reporting generate in %s', constants.REPORTING_FILE)
170
171     def _set_log(self):
172         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
173         log_formatter = logging.Formatter(log_format)
174
175         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
176         log_handler = logging.FileHandler(log_path)
177         log_handler.setFormatter(log_formatter)
178         log_handler.setLevel(logging.DEBUG)
179
180         logging.root.addHandler(log_handler)
181
182     def _init_output_config(self, output_config):
183         output_config.setdefault('DEFAULT', {})
184         output_config.setdefault('dispatcher_http', {})
185         output_config.setdefault('dispatcher_file', {})
186         output_config.setdefault('dispatcher_influxdb', {})
187         output_config.setdefault('nsb', {})
188
189     def _set_output_config(self, output_config, file_path):
190         try:
191             out_type = os.environ['DISPATCHER']
192         except KeyError:
193             output_config['DEFAULT'].setdefault('dispatcher', 'file')
194         else:
195             output_config['DEFAULT']['dispatcher'] = out_type
196
197         output_config['dispatcher_file']['file_path'] = file_path
198
199         try:
200             target = os.environ['TARGET']
201         except KeyError:
202             pass
203         else:
204             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
205             output_config[k]['target'] = target
206
207     def _get_format_result(self, testcases):
208         criteria = self._get_task_criteria(testcases)
209
210         info = {
211             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
212             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
213             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
214             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
215         }
216
217         result = {
218             'status': 1,
219             'result': {
220                 'criteria': criteria,
221                 'task_id': self.task_id,
222                 'info': info,
223                 'testcases': testcases
224             }
225         }
226
227         return result
228
229     def _get_task_criteria(self, testcases):
230         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
231         if criteria:
232             return 'FAIL'
233         else:
234             return 'PASS'
235
236     def _do_output(self, output_config, result):
237         dispatchers = DispatcherBase.get(output_config)
238
239         for dispatcher in dispatchers:
240             dispatcher.flush_result_data(result)
241
242     def _run(self, scenarios, run_in_parallel, output_file):
243         """Deploys context and calls runners"""
244         for context in self.contexts:
245             context.deploy()
246
247         background_runners = []
248
249         result = []
250         # Start all background scenarios
251         for scenario in filter(_is_background_scenario, scenarios):
252             scenario["runner"] = dict(type="Duration", duration=1000000000)
253             runner = self.run_one_scenario(scenario, output_file)
254             background_runners.append(runner)
255
256         runners = []
257         if run_in_parallel:
258             for scenario in scenarios:
259                 if not _is_background_scenario(scenario):
260                     runner = self.run_one_scenario(scenario, output_file)
261                     runners.append(runner)
262
263             # Wait for runners to finish
264             for runner in runners:
265                 status = runner_join(runner)
266                 if status != 0:
267                     raise RuntimeError
268                 self.outputs.update(runner.get_output())
269                 result.extend(runner.get_result())
270                 print("Runner ended, output in", output_file)
271         else:
272             # run serially
273             for scenario in scenarios:
274                 if not _is_background_scenario(scenario):
275                     runner = self.run_one_scenario(scenario, output_file)
276                     status = runner_join(runner)
277                     if status != 0:
278                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
279                         raise RuntimeError
280                     self.outputs.update(runner.get_output())
281                     result.extend(runner.get_result())
282                     print("Runner ended, output in", output_file)
283
284         # Abort background runners
285         for runner in background_runners:
286             runner.abort()
287
288         # Wait for background runners to finish
289         for runner in background_runners:
290             status = runner.join(JOIN_TIMEOUT)
291             if status is None:
292                 # Nuke if it did not stop nicely
293                 base_runner.Runner.terminate(runner)
294                 runner.join(JOIN_TIMEOUT)
295             base_runner.Runner.release(runner)
296
297             self.outputs.update(runner.get_output())
298             result.extend(runner.get_result())
299             print("Background task ended")
300         return result
301
302     def atexit_handler(self):
303         """handler for process termination"""
304         base_runner.Runner.terminate_all()
305
306         if self.contexts:
307             print("Undeploying all contexts")
308             for context in self.contexts[::-1]:
309                 context.undeploy()
310
311     def _parse_options(self, op):
312         if isinstance(op, dict):
313             return {k: self._parse_options(v) for k, v in op.items()}
314         elif isinstance(op, list):
315             return [self._parse_options(v) for v in op]
316         elif isinstance(op, str):
317             return self.outputs.get(op[1:]) if op.startswith('$') else op
318         else:
319             return op
320
321     def run_one_scenario(self, scenario_cfg, output_file):
322         """run one scenario using context"""
323         runner_cfg = scenario_cfg["runner"]
324         runner_cfg['output_filename'] = output_file
325
326         options = scenario_cfg.get('options', {})
327         scenario_cfg['options'] = self._parse_options(options)
328
329         # TODO support get multi hosts/vms info
330         context_cfg = {}
331         if "host" in scenario_cfg:
332             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
333
334         if "target" in scenario_cfg:
335             if is_ip_addr(scenario_cfg["target"]):
336                 context_cfg['target'] = {}
337                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
338             else:
339                 context_cfg['target'] = Context.get_server(
340                     scenario_cfg["target"])
341                 if self._is_same_heat_context(scenario_cfg["host"],
342                                               scenario_cfg["target"]):
343                     context_cfg["target"]["ipaddr"] = \
344                         context_cfg["target"]["private_ip"]
345                 else:
346                     context_cfg["target"]["ipaddr"] = \
347                         context_cfg["target"]["ip"]
348
349         if "targets" in scenario_cfg:
350             ip_list = []
351             for target in scenario_cfg["targets"]:
352                 if is_ip_addr(target):
353                     ip_list.append(target)
354                     context_cfg['target'] = {}
355                 else:
356                     context_cfg['target'] = Context.get_server(target)
357                     if self._is_same_heat_context(scenario_cfg["host"],
358                                                   target):
359                         ip_list.append(context_cfg["target"]["private_ip"])
360                     else:
361                         ip_list.append(context_cfg["target"]["ip"])
362             context_cfg['target']['ipaddr'] = ','.join(ip_list)
363
364         if "nodes" in scenario_cfg:
365             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
366             context_cfg["networks"] = get_networks_from_nodes(
367                 context_cfg["nodes"])
368         runner = base_runner.Runner.get(runner_cfg)
369
370         print("Starting runner of type '%s'" % runner_cfg["type"])
371         runner.run(scenario_cfg, context_cfg)
372
373         return runner
374
375     def _is_same_heat_context(self, host_attr, target_attr):
376         """check if two servers are in the same heat context
377         host_attr: either a name for a server created by yardstick or a dict
378         with attribute name mapping when using external heat templates
379         target_attr: either a name for a server created by yardstick or a dict
380         with attribute name mapping when using external heat templates
381         """
382         host = None
383         target = None
384         for context in self.contexts:
385             if context.__context_type__ != "Heat":
386                 continue
387
388             host = context._get_server(host_attr)
389             if host is None:
390                 continue
391
392             target = context._get_server(target_attr)
393             if target is None:
394                 return False
395
396             # Both host and target is not None, then they are in the
397             # same heat context.
398             return True
399
400         return False
401
402
403 class TaskParser(object):       # pragma: no cover
404     """Parser for task config files in yaml format"""
405
406     def __init__(self, path):
407         self.path = path
408
409     def _meet_constraint(self, task, cur_pod, cur_installer):
410         if "constraint" in task:
411             constraint = task.get('constraint', None)
412             if constraint is not None:
413                 tc_fit_pod = constraint.get('pod', None)
414                 tc_fit_installer = constraint.get('installer', None)
415                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
416                          cur_pod, cur_installer, constraint)
417                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
418                     return False
419                 if (cur_installer is None) or (tc_fit_installer and cur_installer
420                                                not in tc_fit_installer):
421                     return False
422         return True
423
424     def _get_task_para(self, task, cur_pod):
425         task_args = task.get('task_args', None)
426         if task_args is not None:
427             task_args = task_args.get(cur_pod, task_args.get('default'))
428         task_args_fnames = task.get('task_args_fnames', None)
429         if task_args_fnames is not None:
430             task_args_fnames = task_args_fnames.get(cur_pod, None)
431         return task_args, task_args_fnames
432
433     def parse_suite(self):
434         """parse the suite file and return a list of task config file paths
435            and lists of optional parameters if present"""
436         LOG.info("\nParsing suite file:%s", self.path)
437
438         try:
439             with open(self.path) as stream:
440                 cfg = yaml.load(stream)
441         except IOError as ioerror:
442             sys.exit(ioerror)
443
444         self._check_schema(cfg["schema"], "suite")
445         LOG.info("\nStarting scenario:%s", cfg["name"])
446
447         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
448         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
449                                       test_cases_dir)
450         if test_cases_dir[-1] != os.sep:
451             test_cases_dir += os.sep
452
453         cur_pod = os.environ.get('NODE_NAME', None)
454         cur_installer = os.environ.get('INSTALLER_TYPE', None)
455
456         valid_task_files = []
457         valid_task_args = []
458         valid_task_args_fnames = []
459
460         for task in cfg["test_cases"]:
461             # 1.check file_name
462             if "file_name" in task:
463                 task_fname = task.get('file_name', None)
464                 if task_fname is None:
465                     continue
466             else:
467                 continue
468             # 2.check constraint
469             if self._meet_constraint(task, cur_pod, cur_installer):
470                 valid_task_files.append(test_cases_dir + task_fname)
471             else:
472                 continue
473             # 3.fetch task parameters
474             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
475             valid_task_args.append(task_args)
476             valid_task_args_fnames.append(task_args_fnames)
477
478         return valid_task_files, valid_task_args, valid_task_args_fnames
479
480     def parse_task(self, task_id, task_args=None, task_args_file=None):
481         """parses the task file and return an context and scenario instances"""
482         print("Parsing task config:", self.path)
483
484         try:
485             kw = {}
486             if task_args_file:
487                 with open(task_args_file) as f:
488                     kw.update(parse_task_args("task_args_file", f.read()))
489             kw.update(parse_task_args("task_args", task_args))
490         except TypeError:
491             raise TypeError()
492
493         try:
494             with open(self.path) as f:
495                 try:
496                     input_task = f.read()
497                     rendered_task = TaskTemplate.render(input_task, **kw)
498                 except Exception as e:
499                     print("Failed to render template:\n%(task)s\n%(err)s\n"
500                           % {"task": input_task, "err": e})
501                     raise e
502                 print("Input task is:\n%s\n" % rendered_task)
503
504                 cfg = yaml.load(rendered_task)
505         except IOError as ioerror:
506             sys.exit(ioerror)
507
508         self._check_schema(cfg["schema"], "task")
509         meet_precondition = self._check_precondition(cfg)
510
511         # TODO: support one or many contexts? Many would simpler and precise
512         # TODO: support hybrid context type
513         if "context" in cfg:
514             context_cfgs = [cfg["context"]]
515         elif "contexts" in cfg:
516             context_cfgs = cfg["contexts"]
517         else:
518             context_cfgs = [{"type": "Dummy"}]
519
520         contexts = []
521         name_suffix = '-{}'.format(task_id[:8])
522         for cfg_attrs in context_cfgs:
523             try:
524                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
525                                                   name_suffix)
526             except KeyError:
527                 pass
528             # default to Heat context because we are testing OpenStack
529             context_type = cfg_attrs.get("type", "Heat")
530             context = Context.get(context_type)
531             context.init(cfg_attrs)
532             contexts.append(context)
533
534         run_in_parallel = cfg.get("run_in_parallel", False)
535
536         # add tc and task id for influxdb extended tags
537         for scenario in cfg["scenarios"]:
538             task_name = os.path.splitext(os.path.basename(self.path))[0]
539             scenario["tc"] = task_name
540             scenario["task_id"] = task_id
541             # embed task path into scenario so we can load other files
542             # relative to task path
543             scenario["task_path"] = os.path.dirname(self.path)
544
545             change_server_name(scenario, name_suffix)
546
547             try:
548                 for node in scenario['nodes']:
549                     scenario['nodes'][node] += name_suffix
550             except KeyError:
551                 pass
552
553         # TODO we need something better here, a class that represent the file
554         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
555
556     def _check_schema(self, cfg_schema, schema_type):
557         """Check if config file is using the correct schema type"""
558
559         if cfg_schema != "yardstick:" + schema_type + ":0.1":
560             sys.exit("error: file %s has unknown schema %s" % (self.path,
561                                                                cfg_schema))
562
563     def _check_precondition(self, cfg):
564         """Check if the environment meet the precondition"""
565
566         if "precondition" in cfg:
567             precondition = cfg["precondition"]
568             installer_type = precondition.get("installer_type", None)
569             deploy_scenarios = precondition.get("deploy_scenarios", None)
570             tc_fit_pods = precondition.get("pod_name", None)
571             installer_type_env = os.environ.get('INSTALL_TYPE', None)
572             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
573             pod_name_env = os.environ.get('NODE_NAME', None)
574
575             LOG.info("installer_type: %s, installer_type_env: %s",
576                      installer_type, installer_type_env)
577             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
578                      deploy_scenarios, deploy_scenario_env)
579             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
580                      tc_fit_pods, pod_name_env)
581             if installer_type and installer_type_env:
582                 if installer_type_env not in installer_type:
583                     return False
584             if deploy_scenarios and deploy_scenario_env:
585                 deploy_scenarios_list = deploy_scenarios.split(',')
586                 for deploy_scenario in deploy_scenarios_list:
587                     if deploy_scenario_env.startswith(deploy_scenario):
588                         return True
589                 return False
590             if tc_fit_pods and pod_name_env:
591                 if pod_name_env not in tc_fit_pods:
592                     return False
593         return True
594
595
596 def is_ip_addr(addr):
597     """check if string addr is an IP address"""
598     try:
599         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
600     except AttributeError:
601         pass
602
603     try:
604         ipaddress.ip_address(addr.encode('utf-8'))
605     except ValueError:
606         return False
607     else:
608         return True
609
610
611 def _is_background_scenario(scenario):
612     if "run_in_background" in scenario:
613         return scenario["run_in_background"]
614     else:
615         return False
616
617
618 def parse_nodes_with_context(scenario_cfg):
619     """parse the 'nodes' fields in scenario """
620     # ensure consistency in node instantiation order
621     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
622                        for nodename in sorted(scenario_cfg["nodes"]))
623
624
625 def get_networks_from_nodes(nodes):
626     """parse the 'nodes' fields in scenario """
627     networks = {}
628     for node in nodes.values():
629         if not node:
630             continue
631         interfaces = node.get('interfaces', {})
632         for interface in interfaces.values():
633             vld_id = interface.get('vld_id')
634             # mgmt network doesn't have vld_id
635             if not vld_id:
636                 continue
637             network = Context.get_network({"vld_id": vld_id})
638             if network:
639                 networks[network['name']] = network
640     return networks
641
642
643 def runner_join(runner):
644     """join (wait for) a runner, exit process at runner failure"""
645     status = runner.join()
646     base_runner.Runner.release(runner)
647     return status
648
649
650 def print_invalid_header(source_name, args):
651     print("Invalid %(source)s passed:\n\n %(args)s\n"
652           % {"source": source_name, "args": args})
653
654
655 def parse_task_args(src_name, args):
656     if isinstance(args, collections.Mapping):
657         return args
658
659     try:
660         kw = args and yaml.safe_load(args)
661         kw = {} if kw is None else kw
662     except yaml.parser.ParserError as e:
663         print_invalid_header(src_name, args)
664         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
665               % {"source": src_name, "err": e})
666         raise TypeError()
667
668     if not isinstance(kw, dict):
669         print_invalid_header(src_name, args)
670         print("%(src)s had to be dict, actually %(src_type)s\n"
671               % {"src": src_name, "src_type": type(kw)})
672         raise TypeError()
673     return kw
674
675
676 def check_environment():
677     auth_url = os.environ.get('OS_AUTH_URL', None)
678     if not auth_url:
679         try:
680             source_env(constants.OPENRC)
681         except IOError as e:
682             if e.errno != errno.EEXIST:
683                 raise
684             LOG.debug('OPENRC file not found')
685
686
687 def change_server_name(scenario, suffix):
688     try:
689         host = scenario['host']
690     except KeyError:
691         pass
692     else:
693         try:
694             host['name'] += suffix
695         except TypeError:
696             scenario['host'] += suffix
697
698     try:
699         target = scenario['target']
700     except KeyError:
701         pass
702     else:
703         try:
704             target['name'] += suffix
705         except TypeError:
706             scenario['target'] += suffix
707
708     try:
709         key = 'targets'
710         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
711     except KeyError:
712         pass