Adding support for multi-dispatcher
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 import collections
24
25 from six.moves import filter
26
27 from yardstick.benchmark.contexts.base import Context
28 from yardstick.benchmark.runners import base as base_runner
29 from yardstick.dispatcher.base import Base as DispatcherBase
30 from yardstick.common.task_template import TaskTemplate
31 from yardstick.common.utils import source_env
32 from yardstick.common import utils
33 from yardstick.common import constants
34
35 output_file_default = "/tmp/yardstick.out"
36 config_file = '/etc/yardstick/yardstick.conf'
37 test_cases_dir_default = "tests/opnfv/test_cases/"
38 LOG = logging.getLogger(__name__)
39
40
41 class Task(object):     # pragma: no cover
42     """Task commands.
43
44        Set of commands to manage benchmark tasks.
45     """
46
47     def __init__(self):
48         self.contexts = []
49         self.outputs = {}
50
51     def _set_dispatchers(self, output_config):
52         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
53                                                            'file')
54         out_types = [s.strip() for s in dispatchers.split(',')]
55         output_config['DEFAULT']['dispatcher'] = out_types
56
57     def start(self, args, **kwargs):
58         """Start a benchmark scenario."""
59
60         atexit.register(self.atexit_handler)
61
62         task_id = getattr(args, 'task_id')
63         self.task_id = task_id if task_id else str(uuid.uuid4())
64
65         check_environment()
66
67         try:
68             output_config = utils.parse_ini_file(config_file)
69         except Exception:
70             # all error will be ignore, the default value is {}
71             output_config = {}
72
73         self._init_output_config(output_config)
74         self._set_output_config(output_config, args.output_file)
75         LOG.debug('Output configuration is: %s', output_config)
76
77         self._set_dispatchers(output_config)
78
79         # update dispatcher list
80         if 'file' in output_config['DEFAULT']['dispatcher']:
81             result = {'status': 0, 'result': {}}
82             utils.write_json_to_file(args.output_file, result)
83
84         total_start_time = time.time()
85         parser = TaskParser(args.inputfile[0])
86
87         if args.suite:
88             # 1.parse suite, return suite_params info
89             task_files, task_args, task_args_fnames = \
90                 parser.parse_suite()
91         else:
92             task_files = [parser.path]
93             task_args = [args.task_args]
94             task_args_fnames = [args.task_args_file]
95
96         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
97                  task_files, task_args, task_args_fnames)
98
99         if args.parse_only:
100             sys.exit(0)
101
102         testcases = {}
103         # parse task_files
104         for i in range(0, len(task_files)):
105             one_task_start_time = time.time()
106             parser.path = task_files[i]
107             scenarios, run_in_parallel, meet_precondition, contexts = \
108                 parser.parse_task(self.task_id, task_args[i],
109                                   task_args_fnames[i])
110
111             self.contexts.extend(contexts)
112
113             if not meet_precondition:
114                 LOG.info("meet_precondition is %s, please check envrionment",
115                          meet_precondition)
116                 continue
117
118             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
119             try:
120                 data = self._run(scenarios, run_in_parallel, args.output_file)
121             except KeyboardInterrupt:
122                 raise
123             except Exception:
124                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
125             else:
126                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
127
128             if args.keep_deploy:
129                 # keep deployment, forget about stack
130                 # (hide it for exit handler)
131                 self.contexts = []
132             else:
133                 for context in self.contexts[::-1]:
134                     context.undeploy()
135                 self.contexts = []
136             one_task_end_time = time.time()
137             LOG.info("task %s finished in %d secs", task_files[i],
138                      one_task_end_time - one_task_start_time)
139
140         result = self._get_format_result(testcases)
141
142         self._do_output(output_config, result)
143
144         total_end_time = time.time()
145         LOG.info("total finished in %d secs",
146                  total_end_time - total_start_time)
147
148         scenario = scenarios[0]
149         print("To generate report execute => yardstick report generate ",
150               scenario['task_id'], scenario['tc'])
151
152         print("Done, exiting")
153         return result
154
155     def _init_output_config(self, output_config):
156         output_config.setdefault('DEFAULT', {})
157         output_config.setdefault('dispatcher_http', {})
158         output_config.setdefault('dispatcher_file', {})
159         output_config.setdefault('dispatcher_influxdb', {})
160         output_config.setdefault('nsb', {})
161
162     def _set_output_config(self, output_config, file_path):
163         try:
164             out_type = os.environ['DISPATCHER']
165         except KeyError:
166             output_config['DEFAULT'].setdefault('dispatcher', 'file')
167         else:
168             output_config['DEFAULT']['dispatcher'] = out_type
169
170         output_config['dispatcher_file']['file_path'] = file_path
171
172         try:
173             target = os.environ['TARGET']
174         except KeyError:
175             pass
176         else:
177             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
178             output_config[k]['target'] = target
179
180     def _get_format_result(self, testcases):
181         criteria = self._get_task_criteria(testcases)
182
183         info = {
184             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
185             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
186             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
187             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
188         }
189
190         result = {
191             'status': 1,
192             'result': {
193                 'criteria': criteria,
194                 'task_id': self.task_id,
195                 'info': info,
196                 'testcases': testcases
197             }
198         }
199
200         return result
201
202     def _get_task_criteria(self, testcases):
203         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
204         if criteria:
205             return 'FAIL'
206         else:
207             return 'PASS'
208
209     def _do_output(self, output_config, result):
210         dispatchers = DispatcherBase.get(output_config)
211
212         for dispatcher in dispatchers:
213             dispatcher.flush_result_data(result)
214
215     def _run(self, scenarios, run_in_parallel, output_file):
216         """Deploys context and calls runners"""
217         for context in self.contexts:
218             context.deploy()
219
220         background_runners = []
221
222         result = []
223         # Start all background scenarios
224         for scenario in filter(_is_background_scenario, scenarios):
225             scenario["runner"] = dict(type="Duration", duration=1000000000)
226             runner = self.run_one_scenario(scenario, output_file)
227             background_runners.append(runner)
228
229         runners = []
230         if run_in_parallel:
231             for scenario in scenarios:
232                 if not _is_background_scenario(scenario):
233                     runner = self.run_one_scenario(scenario, output_file)
234                     runners.append(runner)
235
236             # Wait for runners to finish
237             for runner in runners:
238                 status = runner_join(runner)
239                 if status != 0:
240                     raise RuntimeError
241                 self.outputs.update(runner.get_output())
242                 result.extend(runner.get_result())
243                 print("Runner ended, output in", output_file)
244         else:
245             # run serially
246             for scenario in scenarios:
247                 if not _is_background_scenario(scenario):
248                     runner = self.run_one_scenario(scenario, output_file)
249                     status = runner_join(runner)
250                     if status != 0:
251                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
252                         raise RuntimeError
253                     self.outputs.update(runner.get_output())
254                     result.extend(runner.get_result())
255                     print("Runner ended, output in", output_file)
256
257         # Abort background runners
258         for runner in background_runners:
259             runner.abort()
260
261         # Wait for background runners to finish
262         for runner in background_runners:
263             status = runner.join(timeout=60)
264             if status is None:
265                 # Nuke if it did not stop nicely
266                 base_runner.Runner.terminate(runner)
267                 status = runner_join(runner)
268             else:
269                 base_runner.Runner.release(runner)
270
271             self.outputs.update(runner.get_output())
272             result.extend(runner.get_result())
273             print("Background task ended")
274         return result
275
276     def atexit_handler(self):
277         """handler for process termination"""
278         base_runner.Runner.terminate_all()
279
280         if self.contexts:
281             print("Undeploying all contexts")
282             for context in self.contexts[::-1]:
283                 context.undeploy()
284
285     def _parse_options(self, op):
286         if isinstance(op, dict):
287             return {k: self._parse_options(v) for k, v in op.items()}
288         elif isinstance(op, list):
289             return [self._parse_options(v) for v in op]
290         elif isinstance(op, str):
291             return self.outputs.get(op[1:]) if op.startswith('$') else op
292         else:
293             return op
294
295     def run_one_scenario(self, scenario_cfg, output_file):
296         """run one scenario using context"""
297         runner_cfg = scenario_cfg["runner"]
298         runner_cfg['output_filename'] = output_file
299
300         options = scenario_cfg.get('options', {})
301         scenario_cfg['options'] = self._parse_options(options)
302
303         # TODO support get multi hosts/vms info
304         context_cfg = {}
305         if "host" in scenario_cfg:
306             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
307
308         if "target" in scenario_cfg:
309             if is_ip_addr(scenario_cfg["target"]):
310                 context_cfg['target'] = {}
311                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
312             else:
313                 context_cfg['target'] = Context.get_server(
314                     scenario_cfg["target"])
315                 if self._is_same_heat_context(scenario_cfg["host"],
316                                               scenario_cfg["target"]):
317                     context_cfg["target"]["ipaddr"] = \
318                         context_cfg["target"]["private_ip"]
319                 else:
320                     context_cfg["target"]["ipaddr"] = \
321                         context_cfg["target"]["ip"]
322
323         if "targets" in scenario_cfg:
324             ip_list = []
325             for target in scenario_cfg["targets"]:
326                 if is_ip_addr(target):
327                     ip_list.append(target)
328                     context_cfg['target'] = {}
329                 else:
330                     context_cfg['target'] = Context.get_server(target)
331                     if self._is_same_heat_context(scenario_cfg["host"],
332                                                   target):
333                         ip_list.append(context_cfg["target"]["private_ip"])
334                     else:
335                         ip_list.append(context_cfg["target"]["ip"])
336             context_cfg['target']['ipaddr'] = ','.join(ip_list)
337
338         if "nodes" in scenario_cfg:
339             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
340             context_cfg["networks"] = get_networks_from_nodes(
341                 context_cfg["nodes"])
342         runner = base_runner.Runner.get(runner_cfg)
343
344         print("Starting runner of type '%s'" % runner_cfg["type"])
345         runner.run(scenario_cfg, context_cfg)
346
347         return runner
348
349     def _is_same_heat_context(self, host_attr, target_attr):
350         """check if two servers are in the same heat context
351         host_attr: either a name for a server created by yardstick or a dict
352         with attribute name mapping when using external heat templates
353         target_attr: either a name for a server created by yardstick or a dict
354         with attribute name mapping when using external heat templates
355         """
356         host = None
357         target = None
358         for context in self.contexts:
359             if context.__context_type__ != "Heat":
360                 continue
361
362             host = context._get_server(host_attr)
363             if host is None:
364                 continue
365
366             target = context._get_server(target_attr)
367             if target is None:
368                 return False
369
370             # Both host and target is not None, then they are in the
371             # same heat context.
372             return True
373
374         return False
375
376
377 class TaskParser(object):       # pragma: no cover
378     """Parser for task config files in yaml format"""
379
380     def __init__(self, path):
381         self.path = path
382
383     def _meet_constraint(self, task, cur_pod, cur_installer):
384         if "constraint" in task:
385             constraint = task.get('constraint', None)
386             if constraint is not None:
387                 tc_fit_pod = constraint.get('pod', None)
388                 tc_fit_installer = constraint.get('installer', None)
389                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
390                          cur_pod, cur_installer, constraint)
391                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
392                     return False
393                 if (cur_installer is None) or (tc_fit_installer and cur_installer
394                                                not in tc_fit_installer):
395                     return False
396         return True
397
398     def _get_task_para(self, task, cur_pod):
399         task_args = task.get('task_args', None)
400         if task_args is not None:
401             task_args = task_args.get(cur_pod, task_args.get('default'))
402         task_args_fnames = task.get('task_args_fnames', None)
403         if task_args_fnames is not None:
404             task_args_fnames = task_args_fnames.get(cur_pod, None)
405         return task_args, task_args_fnames
406
407     def parse_suite(self):
408         """parse the suite file and return a list of task config file paths
409            and lists of optional parameters if present"""
410         LOG.info("\nParsing suite file:%s", self.path)
411
412         try:
413             with open(self.path) as stream:
414                 cfg = yaml.load(stream)
415         except IOError as ioerror:
416             sys.exit(ioerror)
417
418         self._check_schema(cfg["schema"], "suite")
419         LOG.info("\nStarting scenario:%s", cfg["name"])
420
421         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
422         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
423                                       test_cases_dir)
424         if test_cases_dir[-1] != os.sep:
425             test_cases_dir += os.sep
426
427         cur_pod = os.environ.get('NODE_NAME', None)
428         cur_installer = os.environ.get('INSTALLER_TYPE', None)
429
430         valid_task_files = []
431         valid_task_args = []
432         valid_task_args_fnames = []
433
434         for task in cfg["test_cases"]:
435             # 1.check file_name
436             if "file_name" in task:
437                 task_fname = task.get('file_name', None)
438                 if task_fname is None:
439                     continue
440             else:
441                 continue
442             # 2.check constraint
443             if self._meet_constraint(task, cur_pod, cur_installer):
444                 valid_task_files.append(test_cases_dir + task_fname)
445             else:
446                 continue
447             # 3.fetch task parameters
448             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
449             valid_task_args.append(task_args)
450             valid_task_args_fnames.append(task_args_fnames)
451
452         return valid_task_files, valid_task_args, valid_task_args_fnames
453
454     def parse_task(self, task_id, task_args=None, task_args_file=None):
455         """parses the task file and return an context and scenario instances"""
456         print("Parsing task config:", self.path)
457
458         try:
459             kw = {}
460             if task_args_file:
461                 with open(task_args_file) as f:
462                     kw.update(parse_task_args("task_args_file", f.read()))
463             kw.update(parse_task_args("task_args", task_args))
464         except TypeError:
465             raise TypeError()
466
467         try:
468             with open(self.path) as f:
469                 try:
470                     input_task = f.read()
471                     rendered_task = TaskTemplate.render(input_task, **kw)
472                 except Exception as e:
473                     print("Failed to render template:\n%(task)s\n%(err)s\n"
474                           % {"task": input_task, "err": e})
475                     raise e
476                 print("Input task is:\n%s\n" % rendered_task)
477
478                 cfg = yaml.load(rendered_task)
479         except IOError as ioerror:
480             sys.exit(ioerror)
481
482         self._check_schema(cfg["schema"], "task")
483         meet_precondition = self._check_precondition(cfg)
484
485         # TODO: support one or many contexts? Many would simpler and precise
486         # TODO: support hybrid context type
487         if "context" in cfg:
488             context_cfgs = [cfg["context"]]
489         elif "contexts" in cfg:
490             context_cfgs = cfg["contexts"]
491         else:
492             context_cfgs = [{"type": "Dummy"}]
493
494         contexts = []
495         name_suffix = '-{}'.format(task_id[:8])
496         for cfg_attrs in context_cfgs:
497             try:
498                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
499                                                   name_suffix)
500             except KeyError:
501                 pass
502             # default to Heat context because we are testing OpenStack
503             context_type = cfg_attrs.get("type", "Heat")
504             context = Context.get(context_type)
505             context.init(cfg_attrs)
506             contexts.append(context)
507
508         run_in_parallel = cfg.get("run_in_parallel", False)
509
510         # add tc and task id for influxdb extended tags
511         for scenario in cfg["scenarios"]:
512             task_name = os.path.splitext(os.path.basename(self.path))[0]
513             scenario["tc"] = task_name
514             scenario["task_id"] = task_id
515             # embed task path into scenario so we can load other files
516             # relative to task path
517             scenario["task_path"] = os.path.dirname(self.path)
518
519             change_server_name(scenario, name_suffix)
520
521             try:
522                 for node in scenario['nodes']:
523                     scenario['nodes'][node] += name_suffix
524             except KeyError:
525                 pass
526
527         # TODO we need something better here, a class that represent the file
528         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
529
530     def _check_schema(self, cfg_schema, schema_type):
531         """Check if config file is using the correct schema type"""
532
533         if cfg_schema != "yardstick:" + schema_type + ":0.1":
534             sys.exit("error: file %s has unknown schema %s" % (self.path,
535                                                                cfg_schema))
536
537     def _check_precondition(self, cfg):
538         """Check if the environment meet the precondition"""
539
540         if "precondition" in cfg:
541             precondition = cfg["precondition"]
542             installer_type = precondition.get("installer_type", None)
543             deploy_scenarios = precondition.get("deploy_scenarios", None)
544             tc_fit_pods = precondition.get("pod_name", None)
545             installer_type_env = os.environ.get('INSTALL_TYPE', None)
546             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
547             pod_name_env = os.environ.get('NODE_NAME', None)
548
549             LOG.info("installer_type: %s, installer_type_env: %s",
550                      installer_type, installer_type_env)
551             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
552                      deploy_scenarios, deploy_scenario_env)
553             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
554                      tc_fit_pods, pod_name_env)
555             if installer_type and installer_type_env:
556                 if installer_type_env not in installer_type:
557                     return False
558             if deploy_scenarios and deploy_scenario_env:
559                 deploy_scenarios_list = deploy_scenarios.split(',')
560                 for deploy_scenario in deploy_scenarios_list:
561                     if deploy_scenario_env.startswith(deploy_scenario):
562                         return True
563                 return False
564             if tc_fit_pods and pod_name_env:
565                 if pod_name_env not in tc_fit_pods:
566                     return False
567         return True
568
569
570 def is_ip_addr(addr):
571     """check if string addr is an IP address"""
572     try:
573         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
574     except AttributeError:
575         pass
576
577     try:
578         ipaddress.ip_address(addr.encode('utf-8'))
579     except ValueError:
580         return False
581     else:
582         return True
583
584
585 def _is_background_scenario(scenario):
586     if "run_in_background" in scenario:
587         return scenario["run_in_background"]
588     else:
589         return False
590
591
592 def parse_nodes_with_context(scenario_cfg):
593     """parse the 'nodes' fields in scenario """
594     nodes = scenario_cfg["nodes"]
595     return {nodename: Context.get_server(node) for nodename, node in nodes.items()}
596
597
598 def get_networks_from_nodes(nodes):
599     """parse the 'nodes' fields in scenario """
600     networks = {}
601     for node in nodes.values():
602         if not node:
603             continue
604         for interface in node['interfaces'].values():
605             vld_id = interface.get('vld_id')
606             # mgmt network doesn't have vld_id
607             if not vld_id:
608                 continue
609             network = Context.get_network({"vld_id": vld_id})
610             if network:
611                 networks[network['name']] = network
612     return networks
613
614
615 def runner_join(runner):
616     """join (wait for) a runner, exit process at runner failure"""
617     status = runner.join()
618     base_runner.Runner.release(runner)
619     return status
620
621
622 def print_invalid_header(source_name, args):
623     print("Invalid %(source)s passed:\n\n %(args)s\n"
624           % {"source": source_name, "args": args})
625
626
627 def parse_task_args(src_name, args):
628     if isinstance(args, collections.Mapping):
629         return args
630
631     try:
632         kw = args and yaml.safe_load(args)
633         kw = {} if kw is None else kw
634     except yaml.parser.ParserError as e:
635         print_invalid_header(src_name, args)
636         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
637               % {"source": src_name, "err": e})
638         raise TypeError()
639
640     if not isinstance(kw, dict):
641         print_invalid_header(src_name, args)
642         print("%(src)s had to be dict, actually %(src_type)s\n"
643               % {"src": src_name, "src_type": type(kw)})
644         raise TypeError()
645     return kw
646
647
648 def check_environment():
649     auth_url = os.environ.get('OS_AUTH_URL', None)
650     if not auth_url:
651         try:
652             source_env(constants.OPENRC)
653         except IOError as e:
654             if e.errno != errno.EEXIST:
655                 raise
656             LOG.debug('OPENRC file not found')
657
658
659 def change_server_name(scenario, suffix):
660     try:
661         host = scenario['host']
662     except KeyError:
663         pass
664     else:
665         try:
666             host['name'] += suffix
667         except TypeError:
668             scenario['host'] += suffix
669
670     try:
671         target = scenario['target']
672     except KeyError:
673         pass
674     else:
675         try:
676             target['name'] += suffix
677         except TypeError:
678             scenario['target'] += suffix
679
680     try:
681         key = 'targets'
682         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
683     except KeyError:
684         pass