2b10c61b32a750d03a8f92181e8a87738606aed5
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import errno
25 import collections
26
27 from six.moves import filter
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.runners import base as base_runner
31 from yardstick.dispatcher.base import Base as DispatcherBase
32 from yardstick.common.task_template import TaskTemplate
33 from yardstick.common.utils import source_env
34 from yardstick.common import utils
35 from yardstick.common import constants
36
37 output_file_default = "/tmp/yardstick.out"
38 config_file = '/etc/yardstick/yardstick.conf'
39 test_cases_dir_default = "tests/opnfv/test_cases/"
40 LOG = logging.getLogger(__name__)
41
42
43 class Task(object):     # pragma: no cover
44     """Task commands.
45
46        Set of commands to manage benchmark tasks.
47     """
48
49     def __init__(self):
50         self.contexts = []
51         self.outputs = {}
52
53     def _set_dispatchers(self, output_config):
54         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
55                                                            'file')
56         out_types = [s.strip() for s in dispatchers.split(',')]
57         output_config['DEFAULT']['dispatcher'] = out_types
58
59     def start(self, args, **kwargs):
60         """Start a benchmark scenario."""
61
62         atexit.register(self.atexit_handler)
63
64         task_id = getattr(args, 'task_id')
65         self.task_id = task_id if task_id else str(uuid.uuid4())
66
67         check_environment()
68
69         try:
70             output_config = utils.parse_ini_file(config_file)
71         except Exception:
72             # all error will be ignore, the default value is {}
73             output_config = {}
74
75         self._init_output_config(output_config)
76         self._set_output_config(output_config, args.output_file)
77         LOG.debug('Output configuration is: %s', output_config)
78
79         self._set_dispatchers(output_config)
80
81         # update dispatcher list
82         if 'file' in output_config['DEFAULT']['dispatcher']:
83             result = {'status': 0, 'result': {}}
84             utils.write_json_to_file(args.output_file, result)
85
86         total_start_time = time.time()
87         parser = TaskParser(args.inputfile[0])
88
89         if args.suite:
90             # 1.parse suite, return suite_params info
91             task_files, task_args, task_args_fnames = \
92                 parser.parse_suite()
93         else:
94             task_files = [parser.path]
95             task_args = [args.task_args]
96             task_args_fnames = [args.task_args_file]
97
98         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
99                  task_files, task_args, task_args_fnames)
100
101         if args.parse_only:
102             sys.exit(0)
103
104         testcases = {}
105         # parse task_files
106         for i in range(0, len(task_files)):
107             one_task_start_time = time.time()
108             parser.path = task_files[i]
109             scenarios, run_in_parallel, meet_precondition, contexts = \
110                 parser.parse_task(self.task_id, task_args[i],
111                                   task_args_fnames[i])
112
113             self.contexts.extend(contexts)
114
115             if not meet_precondition:
116                 LOG.info("meet_precondition is %s, please check envrionment",
117                          meet_precondition)
118                 continue
119
120             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
121             try:
122                 data = self._run(scenarios, run_in_parallel, args.output_file)
123             except KeyboardInterrupt:
124                 raise
125             except Exception:
126                 LOG.exception('')
127                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
128             else:
129                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
130
131             if args.keep_deploy:
132                 # keep deployment, forget about stack
133                 # (hide it for exit handler)
134                 self.contexts = []
135             else:
136                 for context in self.contexts[::-1]:
137                     context.undeploy()
138                 self.contexts = []
139             one_task_end_time = time.time()
140             LOG.info("task %s finished in %d secs", task_files[i],
141                      one_task_end_time - one_task_start_time)
142
143         result = self._get_format_result(testcases)
144
145         self._do_output(output_config, result)
146
147         total_end_time = time.time()
148         LOG.info("total finished in %d secs",
149                  total_end_time - total_start_time)
150
151         scenario = scenarios[0]
152         print("To generate report execute => yardstick report generate ",
153               scenario['task_id'], scenario['tc'])
154
155         print("Done, exiting")
156         return result
157
158     def _init_output_config(self, output_config):
159         output_config.setdefault('DEFAULT', {})
160         output_config.setdefault('dispatcher_http', {})
161         output_config.setdefault('dispatcher_file', {})
162         output_config.setdefault('dispatcher_influxdb', {})
163         output_config.setdefault('nsb', {})
164
165     def _set_output_config(self, output_config, file_path):
166         try:
167             out_type = os.environ['DISPATCHER']
168         except KeyError:
169             output_config['DEFAULT'].setdefault('dispatcher', 'file')
170         else:
171             output_config['DEFAULT']['dispatcher'] = out_type
172
173         output_config['dispatcher_file']['file_path'] = file_path
174
175         try:
176             target = os.environ['TARGET']
177         except KeyError:
178             pass
179         else:
180             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
181             output_config[k]['target'] = target
182
183     def _get_format_result(self, testcases):
184         criteria = self._get_task_criteria(testcases)
185
186         info = {
187             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
188             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
189             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
190             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
191         }
192
193         result = {
194             'status': 1,
195             'result': {
196                 'criteria': criteria,
197                 'task_id': self.task_id,
198                 'info': info,
199                 'testcases': testcases
200             }
201         }
202
203         return result
204
205     def _get_task_criteria(self, testcases):
206         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
207         if criteria:
208             return 'FAIL'
209         else:
210             return 'PASS'
211
212     def _do_output(self, output_config, result):
213         dispatchers = DispatcherBase.get(output_config)
214
215         for dispatcher in dispatchers:
216             dispatcher.flush_result_data(result)
217
218     def _run(self, scenarios, run_in_parallel, output_file):
219         """Deploys context and calls runners"""
220         for context in self.contexts:
221             context.deploy()
222
223         background_runners = []
224
225         result = []
226         # Start all background scenarios
227         for scenario in filter(_is_background_scenario, scenarios):
228             scenario["runner"] = dict(type="Duration", duration=1000000000)
229             runner = self.run_one_scenario(scenario, output_file)
230             background_runners.append(runner)
231
232         runners = []
233         if run_in_parallel:
234             for scenario in scenarios:
235                 if not _is_background_scenario(scenario):
236                     runner = self.run_one_scenario(scenario, output_file)
237                     runners.append(runner)
238
239             # Wait for runners to finish
240             for runner in runners:
241                 status = runner_join(runner)
242                 if status != 0:
243                     raise RuntimeError
244                 self.outputs.update(runner.get_output())
245                 result.extend(runner.get_result())
246                 print("Runner ended, output in", output_file)
247         else:
248             # run serially
249             for scenario in scenarios:
250                 if not _is_background_scenario(scenario):
251                     runner = self.run_one_scenario(scenario, output_file)
252                     status = runner_join(runner)
253                     if status != 0:
254                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
255                         raise RuntimeError
256                     self.outputs.update(runner.get_output())
257                     result.extend(runner.get_result())
258                     print("Runner ended, output in", output_file)
259
260         # Abort background runners
261         for runner in background_runners:
262             runner.abort()
263
264         # Wait for background runners to finish
265         for runner in background_runners:
266             status = runner.join(timeout=60)
267             if status is None:
268                 # Nuke if it did not stop nicely
269                 base_runner.Runner.terminate(runner)
270                 status = runner_join(runner)
271             else:
272                 base_runner.Runner.release(runner)
273
274             self.outputs.update(runner.get_output())
275             result.extend(runner.get_result())
276             print("Background task ended")
277         return result
278
279     def atexit_handler(self):
280         """handler for process termination"""
281         base_runner.Runner.terminate_all()
282
283         if self.contexts:
284             print("Undeploying all contexts")
285             for context in self.contexts[::-1]:
286                 context.undeploy()
287
288     def _parse_options(self, op):
289         if isinstance(op, dict):
290             return {k: self._parse_options(v) for k, v in op.items()}
291         elif isinstance(op, list):
292             return [self._parse_options(v) for v in op]
293         elif isinstance(op, str):
294             return self.outputs.get(op[1:]) if op.startswith('$') else op
295         else:
296             return op
297
298     def run_one_scenario(self, scenario_cfg, output_file):
299         """run one scenario using context"""
300         runner_cfg = scenario_cfg["runner"]
301         runner_cfg['output_filename'] = output_file
302
303         options = scenario_cfg.get('options', {})
304         scenario_cfg['options'] = self._parse_options(options)
305
306         # TODO support get multi hosts/vms info
307         context_cfg = {}
308         if "host" in scenario_cfg:
309             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
310
311         if "target" in scenario_cfg:
312             if is_ip_addr(scenario_cfg["target"]):
313                 context_cfg['target'] = {}
314                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
315             else:
316                 context_cfg['target'] = Context.get_server(
317                     scenario_cfg["target"])
318                 if self._is_same_heat_context(scenario_cfg["host"],
319                                               scenario_cfg["target"]):
320                     context_cfg["target"]["ipaddr"] = \
321                         context_cfg["target"]["private_ip"]
322                 else:
323                     context_cfg["target"]["ipaddr"] = \
324                         context_cfg["target"]["ip"]
325
326         if "targets" in scenario_cfg:
327             ip_list = []
328             for target in scenario_cfg["targets"]:
329                 if is_ip_addr(target):
330                     ip_list.append(target)
331                     context_cfg['target'] = {}
332                 else:
333                     context_cfg['target'] = Context.get_server(target)
334                     if self._is_same_heat_context(scenario_cfg["host"],
335                                                   target):
336                         ip_list.append(context_cfg["target"]["private_ip"])
337                     else:
338                         ip_list.append(context_cfg["target"]["ip"])
339             context_cfg['target']['ipaddr'] = ','.join(ip_list)
340
341         if "nodes" in scenario_cfg:
342             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
343             context_cfg["networks"] = get_networks_from_nodes(
344                 context_cfg["nodes"])
345         runner = base_runner.Runner.get(runner_cfg)
346
347         print("Starting runner of type '%s'" % runner_cfg["type"])
348         runner.run(scenario_cfg, context_cfg)
349
350         return runner
351
352     def _is_same_heat_context(self, host_attr, target_attr):
353         """check if two servers are in the same heat context
354         host_attr: either a name for a server created by yardstick or a dict
355         with attribute name mapping when using external heat templates
356         target_attr: either a name for a server created by yardstick or a dict
357         with attribute name mapping when using external heat templates
358         """
359         host = None
360         target = None
361         for context in self.contexts:
362             if context.__context_type__ != "Heat":
363                 continue
364
365             host = context._get_server(host_attr)
366             if host is None:
367                 continue
368
369             target = context._get_server(target_attr)
370             if target is None:
371                 return False
372
373             # Both host and target is not None, then they are in the
374             # same heat context.
375             return True
376
377         return False
378
379
380 class TaskParser(object):       # pragma: no cover
381     """Parser for task config files in yaml format"""
382
383     def __init__(self, path):
384         self.path = path
385
386     def _meet_constraint(self, task, cur_pod, cur_installer):
387         if "constraint" in task:
388             constraint = task.get('constraint', None)
389             if constraint is not None:
390                 tc_fit_pod = constraint.get('pod', None)
391                 tc_fit_installer = constraint.get('installer', None)
392                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
393                          cur_pod, cur_installer, constraint)
394                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
395                     return False
396                 if (cur_installer is None) or (tc_fit_installer and cur_installer
397                                                not in tc_fit_installer):
398                     return False
399         return True
400
401     def _get_task_para(self, task, cur_pod):
402         task_args = task.get('task_args', None)
403         if task_args is not None:
404             task_args = task_args.get(cur_pod, task_args.get('default'))
405         task_args_fnames = task.get('task_args_fnames', None)
406         if task_args_fnames is not None:
407             task_args_fnames = task_args_fnames.get(cur_pod, None)
408         return task_args, task_args_fnames
409
410     def parse_suite(self):
411         """parse the suite file and return a list of task config file paths
412            and lists of optional parameters if present"""
413         LOG.info("\nParsing suite file:%s", self.path)
414
415         try:
416             with open(self.path) as stream:
417                 cfg = yaml.safe_load(stream)
418         except IOError as ioerror:
419             sys.exit(ioerror)
420
421         self._check_schema(cfg["schema"], "suite")
422         LOG.info("\nStarting scenario:%s", cfg["name"])
423
424         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
425         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
426                                       test_cases_dir)
427         if test_cases_dir[-1] != os.sep:
428             test_cases_dir += os.sep
429
430         cur_pod = os.environ.get('NODE_NAME', None)
431         cur_installer = os.environ.get('INSTALLER_TYPE', None)
432
433         valid_task_files = []
434         valid_task_args = []
435         valid_task_args_fnames = []
436
437         for task in cfg["test_cases"]:
438             # 1.check file_name
439             if "file_name" in task:
440                 task_fname = task.get('file_name', None)
441                 if task_fname is None:
442                     continue
443             else:
444                 continue
445             # 2.check constraint
446             if self._meet_constraint(task, cur_pod, cur_installer):
447                 valid_task_files.append(test_cases_dir + task_fname)
448             else:
449                 continue
450             # 3.fetch task parameters
451             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
452             valid_task_args.append(task_args)
453             valid_task_args_fnames.append(task_args_fnames)
454
455         return valid_task_files, valid_task_args, valid_task_args_fnames
456
457     def parse_task(self, task_id, task_args=None, task_args_file=None):
458         """parses the task file and return an context and scenario instances"""
459         print("Parsing task config:", self.path)
460
461         try:
462             kw = {}
463             if task_args_file:
464                 with open(task_args_file) as f:
465                     kw.update(parse_task_args("task_args_file", f.read()))
466             kw.update(parse_task_args("task_args", task_args))
467         except TypeError:
468             raise TypeError()
469
470         try:
471             with open(self.path) as f:
472                 try:
473                     input_task = f.read()
474                     rendered_task = TaskTemplate.render(input_task, **kw)
475                 except Exception as e:
476                     print("Failed to render template:\n%(task)s\n%(err)s\n"
477                           % {"task": input_task, "err": e})
478                     raise e
479                 print("Input task is:\n%s\n" % rendered_task)
480
481                 cfg = yaml.safe_load(rendered_task)
482         except IOError as ioerror:
483             sys.exit(ioerror)
484
485         self._check_schema(cfg["schema"], "task")
486         meet_precondition = self._check_precondition(cfg)
487
488         # TODO: support one or many contexts? Many would simpler and precise
489         # TODO: support hybrid context type
490         if "context" in cfg:
491             context_cfgs = [cfg["context"]]
492         elif "contexts" in cfg:
493             context_cfgs = cfg["contexts"]
494         else:
495             context_cfgs = [{"type": "Dummy"}]
496
497         contexts = []
498         name_suffix = '-{}'.format(task_id[:8])
499         for cfg_attrs in context_cfgs:
500             try:
501                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
502                                                   name_suffix)
503             except KeyError:
504                 pass
505             # default to Heat context because we are testing OpenStack
506             context_type = cfg_attrs.get("type", "Heat")
507             context = Context.get(context_type)
508             context.init(cfg_attrs)
509             contexts.append(context)
510
511         run_in_parallel = cfg.get("run_in_parallel", False)
512
513         # add tc and task id for influxdb extended tags
514         for scenario in cfg["scenarios"]:
515             task_name = os.path.splitext(os.path.basename(self.path))[0]
516             scenario["tc"] = task_name
517             scenario["task_id"] = task_id
518             # embed task path into scenario so we can load other files
519             # relative to task path
520             scenario["task_path"] = os.path.dirname(self.path)
521
522             change_server_name(scenario, name_suffix)
523
524             try:
525                 for node in scenario['nodes']:
526                     scenario['nodes'][node] += name_suffix
527             except KeyError:
528                 pass
529
530         # TODO we need something better here, a class that represent the file
531         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
532
533     def _check_schema(self, cfg_schema, schema_type):
534         """Check if config file is using the correct schema type"""
535
536         if cfg_schema != "yardstick:" + schema_type + ":0.1":
537             sys.exit("error: file %s has unknown schema %s" % (self.path,
538                                                                cfg_schema))
539
540     def _check_precondition(self, cfg):
541         """Check if the environment meet the precondition"""
542
543         if "precondition" in cfg:
544             precondition = cfg["precondition"]
545             installer_type = precondition.get("installer_type", None)
546             deploy_scenarios = precondition.get("deploy_scenarios", None)
547             tc_fit_pods = precondition.get("pod_name", None)
548             installer_type_env = os.environ.get('INSTALL_TYPE', None)
549             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
550             pod_name_env = os.environ.get('NODE_NAME', None)
551
552             LOG.info("installer_type: %s, installer_type_env: %s",
553                      installer_type, installer_type_env)
554             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
555                      deploy_scenarios, deploy_scenario_env)
556             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
557                      tc_fit_pods, pod_name_env)
558             if installer_type and installer_type_env:
559                 if installer_type_env not in installer_type:
560                     return False
561             if deploy_scenarios and deploy_scenario_env:
562                 deploy_scenarios_list = deploy_scenarios.split(',')
563                 for deploy_scenario in deploy_scenarios_list:
564                     if deploy_scenario_env.startswith(deploy_scenario):
565                         return True
566                 return False
567             if tc_fit_pods and pod_name_env:
568                 if pod_name_env not in tc_fit_pods:
569                     return False
570         return True
571
572
573 def is_ip_addr(addr):
574     """check if string addr is an IP address"""
575     try:
576         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
577     except AttributeError:
578         pass
579
580     try:
581         ipaddress.ip_address(addr.encode('utf-8'))
582     except ValueError:
583         return False
584     else:
585         return True
586
587
588 def _is_background_scenario(scenario):
589     if "run_in_background" in scenario:
590         return scenario["run_in_background"]
591     else:
592         return False
593
594
595 def parse_nodes_with_context(scenario_cfg):
596     """parse the 'nodes' fields in scenario """
597     # ensure consistency in node instantiation order
598     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
599                        for nodename in sorted(scenario_cfg["nodes"]))
600
601
602 def get_networks_from_nodes(nodes):
603     """parse the 'nodes' fields in scenario """
604     networks = {}
605     for node in nodes.values():
606         if not node:
607             continue
608         for interface in node['interfaces'].values():
609             vld_id = interface.get('vld_id')
610             # mgmt network doesn't have vld_id
611             if not vld_id:
612                 continue
613             network = Context.get_network({"vld_id": vld_id})
614             if network:
615                 networks[network['name']] = network
616     return networks
617
618
619 def runner_join(runner):
620     """join (wait for) a runner, exit process at runner failure"""
621     status = runner.join()
622     base_runner.Runner.release(runner)
623     return status
624
625
626 def print_invalid_header(source_name, args):
627     print("Invalid %(source)s passed:\n\n %(args)s\n"
628           % {"source": source_name, "args": args})
629
630
631 def parse_task_args(src_name, args):
632     if isinstance(args, collections.Mapping):
633         return args
634
635     try:
636         kw = args and yaml.safe_load(args)
637         kw = {} if kw is None else kw
638     except yaml.parser.ParserError as e:
639         print_invalid_header(src_name, args)
640         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
641               % {"source": src_name, "err": e})
642         raise TypeError()
643
644     if not isinstance(kw, dict):
645         print_invalid_header(src_name, args)
646         print("%(src)s had to be dict, actually %(src_type)s\n"
647               % {"src": src_name, "src_type": type(kw)})
648         raise TypeError()
649     return kw
650
651
652 def check_environment():
653     auth_url = os.environ.get('OS_AUTH_URL', None)
654     if not auth_url:
655         try:
656             source_env(constants.OPENRC)
657         except IOError as e:
658             if e.errno != errno.EEXIST:
659                 raise
660             LOG.debug('OPENRC file not found')
661
662
663 def change_server_name(scenario, suffix):
664     try:
665         host = scenario['host']
666     except KeyError:
667         pass
668     else:
669         try:
670             host['name'] += suffix
671         except TypeError:
672             scenario['host'] += suffix
673
674     try:
675         target = scenario['target']
676     except KeyError:
677         pass
678     else:
679         try:
680             target['name'] += suffix
681         except TypeError:
682             scenario['target'] += suffix
683
684     try:
685         key = 'targets'
686         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
687     except KeyError:
688         pass