Add host&targer in scenario['options']['server_name'] support
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import collections
25
26 from six.moves import filter
27 from jinja2 import Environment
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.runners import base as base_runner
31 from yardstick.common.yaml_loader import yaml_load
32 from yardstick.dispatcher.base import Base as DispatcherBase
33 from yardstick.common.task_template import TaskTemplate
34 from yardstick.common import utils
35 from yardstick.common import constants
36 from yardstick.common.html_template import report_template
37
38 output_file_default = "/tmp/yardstick.out"
39 config_file = '/etc/yardstick/yardstick.conf'
40 test_cases_dir_default = "tests/opnfv/test_cases/"
41 LOG = logging.getLogger(__name__)
42 JOIN_TIMEOUT = 60
43
44
45 class Task(object):     # pragma: no cover
46     """Task commands.
47
48        Set of commands to manage benchmark tasks.
49     """
50
51     def __init__(self):
52         self.contexts = []
53         self.outputs = {}
54
55     def _set_dispatchers(self, output_config):
56         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
57                                                            'file')
58         out_types = [s.strip() for s in dispatchers.split(',')]
59         output_config['DEFAULT']['dispatcher'] = out_types
60
61     def start(self, args, **kwargs):
62         """Start a benchmark scenario."""
63
64         atexit.register(self.atexit_handler)
65
66         task_id = getattr(args, 'task_id')
67         self.task_id = task_id if task_id else str(uuid.uuid4())
68
69         self._set_log()
70
71         try:
72             output_config = utils.parse_ini_file(config_file)
73         except Exception:
74             # all error will be ignore, the default value is {}
75             output_config = {}
76
77         self._init_output_config(output_config)
78         self._set_output_config(output_config, args.output_file)
79         LOG.debug('Output configuration is: %s', output_config)
80
81         self._set_dispatchers(output_config)
82
83         # update dispatcher list
84         if 'file' in output_config['DEFAULT']['dispatcher']:
85             result = {'status': 0, 'result': {}}
86             utils.write_json_to_file(args.output_file, result)
87
88         total_start_time = time.time()
89         parser = TaskParser(args.inputfile[0])
90
91         if args.suite:
92             # 1.parse suite, return suite_params info
93             task_files, task_args, task_args_fnames = \
94                 parser.parse_suite()
95         else:
96             task_files = [parser.path]
97             task_args = [args.task_args]
98             task_args_fnames = [args.task_args_file]
99
100         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
101                   task_files, task_args, task_args_fnames)
102
103         if args.parse_only:
104             sys.exit(0)
105
106         testcases = {}
107         # parse task_files
108         for i in range(0, len(task_files)):
109             one_task_start_time = time.time()
110             parser.path = task_files[i]
111             scenarios, run_in_parallel, meet_precondition, contexts = \
112                 parser.parse_task(self.task_id, task_args[i],
113                                   task_args_fnames[i])
114
115             self.contexts.extend(contexts)
116
117             if not meet_precondition:
118                 LOG.info("meet_precondition is %s, please check envrionment",
119                          meet_precondition)
120                 continue
121
122             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
123             try:
124                 data = self._run(scenarios, run_in_parallel, args.output_file)
125             except KeyboardInterrupt:
126                 raise
127             except Exception:
128                 LOG.error('Testcase: "%s" FAILED!!!', case_name, exe_info=True)
129                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
130             else:
131                 LOG.info('Testcase: "%s" SUCCESS!!!', case_name)
132                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
133
134             if args.keep_deploy:
135                 # keep deployment, forget about stack
136                 # (hide it for exit handler)
137                 self.contexts = []
138             else:
139                 for context in self.contexts[::-1]:
140                     context.undeploy()
141                 self.contexts = []
142             one_task_end_time = time.time()
143             LOG.info("Task %s finished in %d secs", task_files[i],
144                      one_task_end_time - one_task_start_time)
145
146         result = self._get_format_result(testcases)
147
148         self._do_output(output_config, result)
149         self._generate_reporting(result)
150
151         total_end_time = time.time()
152         LOG.info("Total finished in %d secs",
153                  total_end_time - total_start_time)
154
155         scenario = scenarios[0]
156         LOG.info("To generate report, execute command "
157                  "'yardstick report generate %(task_id)s %(tc)s'", scenario)
158         LOG.info("Task ALL DONE, exiting")
159         return result
160
161     def _generate_reporting(self, result):
162         env = Environment()
163         with open(constants.REPORTING_FILE, 'w') as f:
164             f.write(env.from_string(report_template).render(result))
165
166         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
167
168     def _set_log(self):
169         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
170         log_formatter = logging.Formatter(log_format)
171
172         utils.makedirs(constants.TASK_LOG_DIR)
173         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
174         log_handler = logging.FileHandler(log_path)
175         log_handler.setFormatter(log_formatter)
176         log_handler.setLevel(logging.DEBUG)
177
178         logging.root.addHandler(log_handler)
179
180     def _init_output_config(self, output_config):
181         output_config.setdefault('DEFAULT', {})
182         output_config.setdefault('dispatcher_http', {})
183         output_config.setdefault('dispatcher_file', {})
184         output_config.setdefault('dispatcher_influxdb', {})
185         output_config.setdefault('nsb', {})
186
187     def _set_output_config(self, output_config, file_path):
188         try:
189             out_type = os.environ['DISPATCHER']
190         except KeyError:
191             output_config['DEFAULT'].setdefault('dispatcher', 'file')
192         else:
193             output_config['DEFAULT']['dispatcher'] = out_type
194
195         output_config['dispatcher_file']['file_path'] = file_path
196
197         try:
198             target = os.environ['TARGET']
199         except KeyError:
200             pass
201         else:
202             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
203             output_config[k]['target'] = target
204
205     def _get_format_result(self, testcases):
206         criteria = self._get_task_criteria(testcases)
207
208         info = {
209             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
210             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
211             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
212             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
213         }
214
215         result = {
216             'status': 1,
217             'result': {
218                 'criteria': criteria,
219                 'task_id': self.task_id,
220                 'info': info,
221                 'testcases': testcases
222             }
223         }
224
225         return result
226
227     def _get_task_criteria(self, testcases):
228         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
229         if criteria:
230             return 'FAIL'
231         else:
232             return 'PASS'
233
234     def _do_output(self, output_config, result):
235         dispatchers = DispatcherBase.get(output_config)
236
237         for dispatcher in dispatchers:
238             dispatcher.flush_result_data(result)
239
240     def _run(self, scenarios, run_in_parallel, output_file):
241         """Deploys context and calls runners"""
242         for context in self.contexts:
243             context.deploy()
244
245         background_runners = []
246
247         result = []
248         # Start all background scenarios
249         for scenario in filter(_is_background_scenario, scenarios):
250             scenario["runner"] = dict(type="Duration", duration=1000000000)
251             runner = self.run_one_scenario(scenario, output_file)
252             background_runners.append(runner)
253
254         runners = []
255         if run_in_parallel:
256             for scenario in scenarios:
257                 if not _is_background_scenario(scenario):
258                     runner = self.run_one_scenario(scenario, output_file)
259                     runners.append(runner)
260
261             # Wait for runners to finish
262             for runner in runners:
263                 status = runner_join(runner)
264                 if status != 0:
265                     raise RuntimeError
266                 self.outputs.update(runner.get_output())
267                 result.extend(runner.get_result())
268                 LOG.info("Runner ended, output in %s", output_file)
269         else:
270             # run serially
271             for scenario in scenarios:
272                 if not _is_background_scenario(scenario):
273                     runner = self.run_one_scenario(scenario, output_file)
274                     status = runner_join(runner)
275                     if status != 0:
276                         LOG.error('Scenario NO.%s: "%s" ERROR!',
277                                   scenarios.index(scenario) + 1,
278                                   scenario.get('type'))
279                         raise RuntimeError
280                     self.outputs.update(runner.get_output())
281                     result.extend(runner.get_result())
282                     LOG.info("Runner ended, output in %s", output_file)
283
284         # Abort background runners
285         for runner in background_runners:
286             runner.abort()
287
288         # Wait for background runners to finish
289         for runner in background_runners:
290             status = runner.join(JOIN_TIMEOUT)
291             if status is None:
292                 # Nuke if it did not stop nicely
293                 base_runner.Runner.terminate(runner)
294                 runner.join(JOIN_TIMEOUT)
295             base_runner.Runner.release(runner)
296
297             self.outputs.update(runner.get_output())
298             result.extend(runner.get_result())
299             print("Background task ended")
300         return result
301
302     def atexit_handler(self):
303         """handler for process termination"""
304         base_runner.Runner.terminate_all()
305
306         if self.contexts:
307             LOG.info("Undeploying all contexts")
308             for context in self.contexts[::-1]:
309                 context.undeploy()
310
311     def _parse_options(self, op):
312         if isinstance(op, dict):
313             return {k: self._parse_options(v) for k, v in op.items()}
314         elif isinstance(op, list):
315             return [self._parse_options(v) for v in op]
316         elif isinstance(op, str):
317             return self.outputs.get(op[1:]) if op.startswith('$') else op
318         else:
319             return op
320
321     def run_one_scenario(self, scenario_cfg, output_file):
322         """run one scenario using context"""
323         runner_cfg = scenario_cfg["runner"]
324         runner_cfg['output_filename'] = output_file
325
326         options = scenario_cfg.get('options', {})
327         scenario_cfg['options'] = self._parse_options(options)
328
329         # TODO support get multi hosts/vms info
330         context_cfg = {}
331         server_name = scenario_cfg.get('options', {}).get('server_name', {})
332
333         def config_context_target(cfg):
334             target = cfg['target']
335             if is_ip_addr(target):
336                 context_cfg['target'] = {"ipaddr": target}
337             else:
338                 context_cfg['target'] = Context.get_server(target)
339                 if self._is_same_heat_context(cfg["host"], target):
340                     context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
341                 else:
342                     context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
343
344         host_name = server_name.get('host', scenario_cfg.get('host'))
345         if host_name:
346             context_cfg['host'] = Context.get_server(host_name)
347
348         for item in [server_name, scenario_cfg]:
349             try:
350                 config_context_target(item)
351             except KeyError:
352                 pass
353             else:
354                 break
355
356         if "targets" in scenario_cfg:
357             ip_list = []
358             for target in scenario_cfg["targets"]:
359                 if is_ip_addr(target):
360                     ip_list.append(target)
361                     context_cfg['target'] = {}
362                 else:
363                     context_cfg['target'] = Context.get_server(target)
364                     if self._is_same_heat_context(scenario_cfg["host"],
365                                                   target):
366                         ip_list.append(context_cfg["target"]["private_ip"])
367                     else:
368                         ip_list.append(context_cfg["target"]["ip"])
369             context_cfg['target']['ipaddr'] = ','.join(ip_list)
370
371         if "nodes" in scenario_cfg:
372             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
373             context_cfg["networks"] = get_networks_from_nodes(
374                 context_cfg["nodes"])
375         runner = base_runner.Runner.get(runner_cfg)
376
377         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
378         runner.run(scenario_cfg, context_cfg)
379
380         return runner
381
382     def _is_same_heat_context(self, host_attr, target_attr):
383         """check if two servers are in the same heat context
384         host_attr: either a name for a server created by yardstick or a dict
385         with attribute name mapping when using external heat templates
386         target_attr: either a name for a server created by yardstick or a dict
387         with attribute name mapping when using external heat templates
388         """
389         for context in self.contexts:
390             if context.__context_type__ != "Heat":
391                 continue
392
393             host = context._get_server(host_attr)
394             if host is None:
395                 continue
396
397             target = context._get_server(target_attr)
398             if target is None:
399                 return False
400
401             # Both host and target is not None, then they are in the
402             # same heat context.
403             return True
404
405         return False
406
407
408 class TaskParser(object):       # pragma: no cover
409     """Parser for task config files in yaml format"""
410
411     def __init__(self, path):
412         self.path = path
413
414     def _meet_constraint(self, task, cur_pod, cur_installer):
415         if "constraint" in task:
416             constraint = task.get('constraint', None)
417             if constraint is not None:
418                 tc_fit_pod = constraint.get('pod', None)
419                 tc_fit_installer = constraint.get('installer', None)
420                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
421                          cur_pod, cur_installer, constraint)
422                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
423                     return False
424                 if (cur_installer is None) or (tc_fit_installer and cur_installer
425                                                not in tc_fit_installer):
426                     return False
427         return True
428
429     def _get_task_para(self, task, cur_pod):
430         task_args = task.get('task_args', None)
431         if task_args is not None:
432             task_args = task_args.get(cur_pod, task_args.get('default'))
433         task_args_fnames = task.get('task_args_fnames', None)
434         if task_args_fnames is not None:
435             task_args_fnames = task_args_fnames.get(cur_pod, None)
436         return task_args, task_args_fnames
437
438     def parse_suite(self):
439         """parse the suite file and return a list of task config file paths
440            and lists of optional parameters if present"""
441         LOG.info("\nParsing suite file:%s", self.path)
442
443         try:
444             with open(self.path) as stream:
445                 cfg = yaml_load(stream)
446         except IOError as ioerror:
447             sys.exit(ioerror)
448
449         self._check_schema(cfg["schema"], "suite")
450         LOG.info("\nStarting scenario:%s", cfg["name"])
451
452         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
453         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
454                                       test_cases_dir)
455         if test_cases_dir[-1] != os.sep:
456             test_cases_dir += os.sep
457
458         cur_pod = os.environ.get('NODE_NAME', None)
459         cur_installer = os.environ.get('INSTALLER_TYPE', None)
460
461         valid_task_files = []
462         valid_task_args = []
463         valid_task_args_fnames = []
464
465         for task in cfg["test_cases"]:
466             # 1.check file_name
467             if "file_name" in task:
468                 task_fname = task.get('file_name', None)
469                 if task_fname is None:
470                     continue
471             else:
472                 continue
473             # 2.check constraint
474             if self._meet_constraint(task, cur_pod, cur_installer):
475                 valid_task_files.append(test_cases_dir + task_fname)
476             else:
477                 continue
478             # 3.fetch task parameters
479             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
480             valid_task_args.append(task_args)
481             valid_task_args_fnames.append(task_args_fnames)
482
483         return valid_task_files, valid_task_args, valid_task_args_fnames
484
485     def parse_task(self, task_id, task_args=None, task_args_file=None):
486         """parses the task file and return an context and scenario instances"""
487         LOG.info("Parsing task config: %s", self.path)
488
489         try:
490             kw = {}
491             if task_args_file:
492                 with open(task_args_file) as f:
493                     kw.update(parse_task_args("task_args_file", f.read()))
494             kw.update(parse_task_args("task_args", task_args))
495         except TypeError:
496             raise TypeError()
497
498         try:
499             with open(self.path) as f:
500                 try:
501                     input_task = f.read()
502                     rendered_task = TaskTemplate.render(input_task, **kw)
503                 except Exception as e:
504                     LOG.exception('Failed to render template:\n%s\n', input_task)
505                     raise e
506                 LOG.debug("Input task is:\n%s\n", rendered_task)
507
508                 cfg = yaml_load(rendered_task)
509         except IOError as ioerror:
510             sys.exit(ioerror)
511
512         self._check_schema(cfg["schema"], "task")
513         meet_precondition = self._check_precondition(cfg)
514
515         # TODO: support one or many contexts? Many would simpler and precise
516         # TODO: support hybrid context type
517         if "context" in cfg:
518             context_cfgs = [cfg["context"]]
519         elif "contexts" in cfg:
520             context_cfgs = cfg["contexts"]
521         else:
522             context_cfgs = [{"type": "Dummy"}]
523
524         contexts = []
525         name_suffix = '-{}'.format(task_id[:8])
526         for cfg_attrs in context_cfgs:
527             try:
528                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
529                                                   name_suffix)
530             except KeyError:
531                 pass
532             # default to Heat context because we are testing OpenStack
533             context_type = cfg_attrs.get("type", "Heat")
534             context = Context.get(context_type)
535             context.init(cfg_attrs)
536             contexts.append(context)
537
538         run_in_parallel = cfg.get("run_in_parallel", False)
539
540         # add tc and task id for influxdb extended tags
541         for scenario in cfg["scenarios"]:
542             task_name = os.path.splitext(os.path.basename(self.path))[0]
543             scenario["tc"] = task_name
544             scenario["task_id"] = task_id
545             # embed task path into scenario so we can load other files
546             # relative to task path
547             scenario["task_path"] = os.path.dirname(self.path)
548
549             change_server_name(scenario, name_suffix)
550
551             try:
552                 for node in scenario['nodes']:
553                     scenario['nodes'][node] += name_suffix
554             except KeyError:
555                 pass
556
557         # TODO we need something better here, a class that represent the file
558         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
559
560     def _check_schema(self, cfg_schema, schema_type):
561         """Check if config file is using the correct schema type"""
562
563         if cfg_schema != "yardstick:" + schema_type + ":0.1":
564             sys.exit("error: file %s has unknown schema %s" % (self.path,
565                                                                cfg_schema))
566
567     def _check_precondition(self, cfg):
568         """Check if the environment meet the precondition"""
569
570         if "precondition" in cfg:
571             precondition = cfg["precondition"]
572             installer_type = precondition.get("installer_type", None)
573             deploy_scenarios = precondition.get("deploy_scenarios", None)
574             tc_fit_pods = precondition.get("pod_name", None)
575             installer_type_env = os.environ.get('INSTALL_TYPE', None)
576             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
577             pod_name_env = os.environ.get('NODE_NAME', None)
578
579             LOG.info("installer_type: %s, installer_type_env: %s",
580                      installer_type, installer_type_env)
581             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
582                      deploy_scenarios, deploy_scenario_env)
583             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
584                      tc_fit_pods, pod_name_env)
585             if installer_type and installer_type_env:
586                 if installer_type_env not in installer_type:
587                     return False
588             if deploy_scenarios and deploy_scenario_env:
589                 deploy_scenarios_list = deploy_scenarios.split(',')
590                 for deploy_scenario in deploy_scenarios_list:
591                     if deploy_scenario_env.startswith(deploy_scenario):
592                         return True
593                 return False
594             if tc_fit_pods and pod_name_env:
595                 if pod_name_env not in tc_fit_pods:
596                     return False
597         return True
598
599
600 def is_ip_addr(addr):
601     """check if string addr is an IP address"""
602     try:
603         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
604     except AttributeError:
605         pass
606
607     try:
608         ipaddress.ip_address(addr.encode('utf-8'))
609     except ValueError:
610         return False
611     else:
612         return True
613
614
615 def _is_background_scenario(scenario):
616     if "run_in_background" in scenario:
617         return scenario["run_in_background"]
618     else:
619         return False
620
621
622 def parse_nodes_with_context(scenario_cfg):
623     """parse the 'nodes' fields in scenario """
624     # ensure consistency in node instantiation order
625     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
626                        for nodename in sorted(scenario_cfg["nodes"]))
627
628
629 def get_networks_from_nodes(nodes):
630     """parse the 'nodes' fields in scenario """
631     networks = {}
632     for node in nodes.values():
633         if not node:
634             continue
635         interfaces = node.get('interfaces', {})
636         for interface in interfaces.values():
637             # vld_id is network_name
638             network_name = interface.get('network_name')
639             if not network_name:
640                 continue
641             network = Context.get_network(network_name)
642             if network:
643                 networks[network['name']] = network
644     return networks
645
646
647 def runner_join(runner):
648     """join (wait for) a runner, exit process at runner failure"""
649     status = runner.join()
650     base_runner.Runner.release(runner)
651     return status
652
653
654 def print_invalid_header(source_name, args):
655     print("Invalid %(source)s passed:\n\n %(args)s\n"
656           % {"source": source_name, "args": args})
657
658
659 def parse_task_args(src_name, args):
660     if isinstance(args, collections.Mapping):
661         return args
662
663     try:
664         kw = args and yaml_load(args)
665         kw = {} if kw is None else kw
666     except yaml.parser.ParserError as e:
667         print_invalid_header(src_name, args)
668         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
669               % {"source": src_name, "err": e})
670         raise TypeError()
671
672     if not isinstance(kw, dict):
673         print_invalid_header(src_name, args)
674         print("%(src)s had to be dict, actually %(src_type)s\n"
675               % {"src": src_name, "src_type": type(kw)})
676         raise TypeError()
677     return kw
678
679
680 def change_server_name(scenario, suffix):
681
682     def add_suffix(cfg, key):
683         try:
684             value = cfg[key]
685         except KeyError:
686             pass
687         else:
688             try:
689                 value['name'] += suffix
690             except TypeError:
691                 cfg[key] += suffix
692
693     server_name = scenario.get('options', {}).get('server_name', {})
694
695     add_suffix(scenario, 'host')
696     add_suffix(scenario, 'target')
697     add_suffix(server_name, 'host')
698     add_suffix(server_name, 'target')
699
700     try:
701         key = 'targets'
702         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
703     except KeyError:
704         pass