9e1896888bbd60f8c79c56f58ec952a23426196b
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import errno
25 import collections
26
27 from six.moves import filter
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.runners import base as base_runner
31 from yardstick.dispatcher.base import Base as DispatcherBase
32 from yardstick.common.task_template import TaskTemplate
33 from yardstick.common.utils import source_env
34 from yardstick.common import utils
35 from yardstick.common import constants
36
37 output_file_default = "/tmp/yardstick.out"
38 config_file = '/etc/yardstick/yardstick.conf'
39 test_cases_dir_default = "tests/opnfv/test_cases/"
40 LOG = logging.getLogger(__name__)
41 JOIN_TIMEOUT = 60
42
43
44 class Task(object):     # pragma: no cover
45     """Task commands.
46
47        Set of commands to manage benchmark tasks.
48     """
49
50     def __init__(self):
51         self.contexts = []
52         self.outputs = {}
53
54     def _set_dispatchers(self, output_config):
55         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
56                                                            'file')
57         out_types = [s.strip() for s in dispatchers.split(',')]
58         output_config['DEFAULT']['dispatcher'] = out_types
59
60     def start(self, args, **kwargs):
61         """Start a benchmark scenario."""
62
63         atexit.register(self.atexit_handler)
64
65         task_id = getattr(args, 'task_id')
66         self.task_id = task_id if task_id else str(uuid.uuid4())
67
68         self._set_log()
69
70         check_environment()
71
72         try:
73             output_config = utils.parse_ini_file(config_file)
74         except Exception:
75             # all error will be ignore, the default value is {}
76             output_config = {}
77
78         self._init_output_config(output_config)
79         self._set_output_config(output_config, args.output_file)
80         LOG.debug('Output configuration is: %s', output_config)
81
82         self._set_dispatchers(output_config)
83
84         # update dispatcher list
85         if 'file' in output_config['DEFAULT']['dispatcher']:
86             result = {'status': 0, 'result': {}}
87             utils.write_json_to_file(args.output_file, result)
88
89         total_start_time = time.time()
90         parser = TaskParser(args.inputfile[0])
91
92         if args.suite:
93             # 1.parse suite, return suite_params info
94             task_files, task_args, task_args_fnames = \
95                 parser.parse_suite()
96         else:
97             task_files = [parser.path]
98             task_args = [args.task_args]
99             task_args_fnames = [args.task_args_file]
100
101         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
102                  task_files, task_args, task_args_fnames)
103
104         if args.parse_only:
105             sys.exit(0)
106
107         testcases = {}
108         # parse task_files
109         for i in range(0, len(task_files)):
110             one_task_start_time = time.time()
111             parser.path = task_files[i]
112             scenarios, run_in_parallel, meet_precondition, contexts = \
113                 parser.parse_task(self.task_id, task_args[i],
114                                   task_args_fnames[i])
115
116             self.contexts.extend(contexts)
117
118             if not meet_precondition:
119                 LOG.info("meet_precondition is %s, please check envrionment",
120                          meet_precondition)
121                 continue
122
123             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
124             try:
125                 data = self._run(scenarios, run_in_parallel, args.output_file)
126             except KeyboardInterrupt:
127                 raise
128             except Exception:
129                 LOG.error('Testcase: "%s" FAILED!!!', case_name, exe_info=True)
130                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
131             else:
132                 LOG.info('Testcase: "%s" SUCCESS!!!', case_name)
133                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
134
135             if args.keep_deploy:
136                 # keep deployment, forget about stack
137                 # (hide it for exit handler)
138                 self.contexts = []
139             else:
140                 for context in self.contexts[::-1]:
141                     context.undeploy()
142                 self.contexts = []
143             one_task_end_time = time.time()
144             LOG.info("task %s finished in %d secs", task_files[i],
145                      one_task_end_time - one_task_start_time)
146
147         result = self._get_format_result(testcases)
148
149         self._do_output(output_config, result)
150
151         total_end_time = time.time()
152         LOG.info("total finished in %d secs",
153                  total_end_time - total_start_time)
154
155         scenario = scenarios[0]
156         print("To generate report execute => yardstick report generate ",
157               scenario['task_id'], scenario['tc'])
158
159         print("Done, exiting")
160         return result
161
162     def _set_log(self):
163         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
164         log_formatter = logging.Formatter(log_format)
165
166         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
167         log_handler = logging.FileHandler(log_path)
168         log_handler.setFormatter(log_formatter)
169         log_handler.setLevel(logging.DEBUG)
170
171         logging.root.addHandler(log_handler)
172
173     def _init_output_config(self, output_config):
174         output_config.setdefault('DEFAULT', {})
175         output_config.setdefault('dispatcher_http', {})
176         output_config.setdefault('dispatcher_file', {})
177         output_config.setdefault('dispatcher_influxdb', {})
178         output_config.setdefault('nsb', {})
179
180     def _set_output_config(self, output_config, file_path):
181         try:
182             out_type = os.environ['DISPATCHER']
183         except KeyError:
184             output_config['DEFAULT'].setdefault('dispatcher', 'file')
185         else:
186             output_config['DEFAULT']['dispatcher'] = out_type
187
188         output_config['dispatcher_file']['file_path'] = file_path
189
190         try:
191             target = os.environ['TARGET']
192         except KeyError:
193             pass
194         else:
195             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
196             output_config[k]['target'] = target
197
198     def _get_format_result(self, testcases):
199         criteria = self._get_task_criteria(testcases)
200
201         info = {
202             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
203             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
204             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
205             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
206         }
207
208         result = {
209             'status': 1,
210             'result': {
211                 'criteria': criteria,
212                 'task_id': self.task_id,
213                 'info': info,
214                 'testcases': testcases
215             }
216         }
217
218         return result
219
220     def _get_task_criteria(self, testcases):
221         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
222         if criteria:
223             return 'FAIL'
224         else:
225             return 'PASS'
226
227     def _do_output(self, output_config, result):
228         dispatchers = DispatcherBase.get(output_config)
229
230         for dispatcher in dispatchers:
231             dispatcher.flush_result_data(result)
232
233     def _run(self, scenarios, run_in_parallel, output_file):
234         """Deploys context and calls runners"""
235         for context in self.contexts:
236             context.deploy()
237
238         background_runners = []
239
240         result = []
241         # Start all background scenarios
242         for scenario in filter(_is_background_scenario, scenarios):
243             scenario["runner"] = dict(type="Duration", duration=1000000000)
244             runner = self.run_one_scenario(scenario, output_file)
245             background_runners.append(runner)
246
247         runners = []
248         if run_in_parallel:
249             for scenario in scenarios:
250                 if not _is_background_scenario(scenario):
251                     runner = self.run_one_scenario(scenario, output_file)
252                     runners.append(runner)
253
254             # Wait for runners to finish
255             for runner in runners:
256                 status = runner_join(runner)
257                 if status != 0:
258                     raise RuntimeError
259                 self.outputs.update(runner.get_output())
260                 result.extend(runner.get_result())
261                 print("Runner ended, output in", output_file)
262         else:
263             # run serially
264             for scenario in scenarios:
265                 if not _is_background_scenario(scenario):
266                     runner = self.run_one_scenario(scenario, output_file)
267                     status = runner_join(runner)
268                     if status != 0:
269                         LOG.error('Scenario NO.%s: "%s" ERROR!',
270                                   scenarios.index(scenario) + 1,
271                                   scenario.get('type'))
272                         raise RuntimeError
273                     self.outputs.update(runner.get_output())
274                     result.extend(runner.get_result())
275                     print("Runner ended, output in", output_file)
276
277         # Abort background runners
278         for runner in background_runners:
279             runner.abort()
280
281         # Wait for background runners to finish
282         for runner in background_runners:
283             status = runner.join(JOIN_TIMEOUT)
284             if status is None:
285                 # Nuke if it did not stop nicely
286                 base_runner.Runner.terminate(runner)
287                 runner.join(JOIN_TIMEOUT)
288             base_runner.Runner.release(runner)
289
290             self.outputs.update(runner.get_output())
291             result.extend(runner.get_result())
292             print("Background task ended")
293         return result
294
295     def atexit_handler(self):
296         """handler for process termination"""
297         base_runner.Runner.terminate_all()
298
299         if self.contexts:
300             print("Undeploying all contexts")
301             for context in self.contexts[::-1]:
302                 context.undeploy()
303
304     def _parse_options(self, op):
305         if isinstance(op, dict):
306             return {k: self._parse_options(v) for k, v in op.items()}
307         elif isinstance(op, list):
308             return [self._parse_options(v) for v in op]
309         elif isinstance(op, str):
310             return self.outputs.get(op[1:]) if op.startswith('$') else op
311         else:
312             return op
313
314     def run_one_scenario(self, scenario_cfg, output_file):
315         """run one scenario using context"""
316         runner_cfg = scenario_cfg["runner"]
317         runner_cfg['output_filename'] = output_file
318
319         options = scenario_cfg.get('options', {})
320         scenario_cfg['options'] = self._parse_options(options)
321
322         # TODO support get multi hosts/vms info
323         context_cfg = {}
324         if "host" in scenario_cfg:
325             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
326
327         if "target" in scenario_cfg:
328             if is_ip_addr(scenario_cfg["target"]):
329                 context_cfg['target'] = {}
330                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
331             else:
332                 context_cfg['target'] = Context.get_server(
333                     scenario_cfg["target"])
334                 if self._is_same_heat_context(scenario_cfg["host"],
335                                               scenario_cfg["target"]):
336                     context_cfg["target"]["ipaddr"] = \
337                         context_cfg["target"]["private_ip"]
338                 else:
339                     context_cfg["target"]["ipaddr"] = \
340                         context_cfg["target"]["ip"]
341
342         if "targets" in scenario_cfg:
343             ip_list = []
344             for target in scenario_cfg["targets"]:
345                 if is_ip_addr(target):
346                     ip_list.append(target)
347                     context_cfg['target'] = {}
348                 else:
349                     context_cfg['target'] = Context.get_server(target)
350                     if self._is_same_heat_context(scenario_cfg["host"],
351                                                   target):
352                         ip_list.append(context_cfg["target"]["private_ip"])
353                     else:
354                         ip_list.append(context_cfg["target"]["ip"])
355             context_cfg['target']['ipaddr'] = ','.join(ip_list)
356
357         if "nodes" in scenario_cfg:
358             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
359             context_cfg["networks"] = get_networks_from_nodes(
360                 context_cfg["nodes"])
361         runner = base_runner.Runner.get(runner_cfg)
362
363         print("Starting runner of type '%s'" % runner_cfg["type"])
364         runner.run(scenario_cfg, context_cfg)
365
366         return runner
367
368     def _is_same_heat_context(self, host_attr, target_attr):
369         """check if two servers are in the same heat context
370         host_attr: either a name for a server created by yardstick or a dict
371         with attribute name mapping when using external heat templates
372         target_attr: either a name for a server created by yardstick or a dict
373         with attribute name mapping when using external heat templates
374         """
375         host = None
376         target = None
377         for context in self.contexts:
378             if context.__context_type__ != "Heat":
379                 continue
380
381             host = context._get_server(host_attr)
382             if host is None:
383                 continue
384
385             target = context._get_server(target_attr)
386             if target is None:
387                 return False
388
389             # Both host and target is not None, then they are in the
390             # same heat context.
391             return True
392
393         return False
394
395
396 class TaskParser(object):       # pragma: no cover
397     """Parser for task config files in yaml format"""
398
399     def __init__(self, path):
400         self.path = path
401
402     def _meet_constraint(self, task, cur_pod, cur_installer):
403         if "constraint" in task:
404             constraint = task.get('constraint', None)
405             if constraint is not None:
406                 tc_fit_pod = constraint.get('pod', None)
407                 tc_fit_installer = constraint.get('installer', None)
408                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
409                          cur_pod, cur_installer, constraint)
410                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
411                     return False
412                 if (cur_installer is None) or (tc_fit_installer and cur_installer
413                                                not in tc_fit_installer):
414                     return False
415         return True
416
417     def _get_task_para(self, task, cur_pod):
418         task_args = task.get('task_args', None)
419         if task_args is not None:
420             task_args = task_args.get(cur_pod, task_args.get('default'))
421         task_args_fnames = task.get('task_args_fnames', None)
422         if task_args_fnames is not None:
423             task_args_fnames = task_args_fnames.get(cur_pod, None)
424         return task_args, task_args_fnames
425
426     def parse_suite(self):
427         """parse the suite file and return a list of task config file paths
428            and lists of optional parameters if present"""
429         LOG.info("\nParsing suite file:%s", self.path)
430
431         try:
432             with open(self.path) as stream:
433                 cfg = yaml.load(stream)
434         except IOError as ioerror:
435             sys.exit(ioerror)
436
437         self._check_schema(cfg["schema"], "suite")
438         LOG.info("\nStarting scenario:%s", cfg["name"])
439
440         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
441         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
442                                       test_cases_dir)
443         if test_cases_dir[-1] != os.sep:
444             test_cases_dir += os.sep
445
446         cur_pod = os.environ.get('NODE_NAME', None)
447         cur_installer = os.environ.get('INSTALLER_TYPE', None)
448
449         valid_task_files = []
450         valid_task_args = []
451         valid_task_args_fnames = []
452
453         for task in cfg["test_cases"]:
454             # 1.check file_name
455             if "file_name" in task:
456                 task_fname = task.get('file_name', None)
457                 if task_fname is None:
458                     continue
459             else:
460                 continue
461             # 2.check constraint
462             if self._meet_constraint(task, cur_pod, cur_installer):
463                 valid_task_files.append(test_cases_dir + task_fname)
464             else:
465                 continue
466             # 3.fetch task parameters
467             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
468             valid_task_args.append(task_args)
469             valid_task_args_fnames.append(task_args_fnames)
470
471         return valid_task_files, valid_task_args, valid_task_args_fnames
472
473     def parse_task(self, task_id, task_args=None, task_args_file=None):
474         """parses the task file and return an context and scenario instances"""
475         print("Parsing task config:", self.path)
476
477         try:
478             kw = {}
479             if task_args_file:
480                 with open(task_args_file) as f:
481                     kw.update(parse_task_args("task_args_file", f.read()))
482             kw.update(parse_task_args("task_args", task_args))
483         except TypeError:
484             raise TypeError()
485
486         try:
487             with open(self.path) as f:
488                 try:
489                     input_task = f.read()
490                     rendered_task = TaskTemplate.render(input_task, **kw)
491                 except Exception as e:
492                     print("Failed to render template:\n%(task)s\n%(err)s\n"
493                           % {"task": input_task, "err": e})
494                     raise e
495                 print("Input task is:\n%s\n" % rendered_task)
496
497                 cfg = yaml.load(rendered_task)
498         except IOError as ioerror:
499             sys.exit(ioerror)
500
501         self._check_schema(cfg["schema"], "task")
502         meet_precondition = self._check_precondition(cfg)
503
504         # TODO: support one or many contexts? Many would simpler and precise
505         # TODO: support hybrid context type
506         if "context" in cfg:
507             context_cfgs = [cfg["context"]]
508         elif "contexts" in cfg:
509             context_cfgs = cfg["contexts"]
510         else:
511             context_cfgs = [{"type": "Dummy"}]
512
513         contexts = []
514         name_suffix = '-{}'.format(task_id[:8])
515         for cfg_attrs in context_cfgs:
516             try:
517                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
518                                                   name_suffix)
519             except KeyError:
520                 pass
521             # default to Heat context because we are testing OpenStack
522             context_type = cfg_attrs.get("type", "Heat")
523             context = Context.get(context_type)
524             context.init(cfg_attrs)
525             contexts.append(context)
526
527         run_in_parallel = cfg.get("run_in_parallel", False)
528
529         # add tc and task id for influxdb extended tags
530         for scenario in cfg["scenarios"]:
531             task_name = os.path.splitext(os.path.basename(self.path))[0]
532             scenario["tc"] = task_name
533             scenario["task_id"] = task_id
534             # embed task path into scenario so we can load other files
535             # relative to task path
536             scenario["task_path"] = os.path.dirname(self.path)
537
538             change_server_name(scenario, name_suffix)
539
540             try:
541                 for node in scenario['nodes']:
542                     scenario['nodes'][node] += name_suffix
543             except KeyError:
544                 pass
545
546         # TODO we need something better here, a class that represent the file
547         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
548
549     def _check_schema(self, cfg_schema, schema_type):
550         """Check if config file is using the correct schema type"""
551
552         if cfg_schema != "yardstick:" + schema_type + ":0.1":
553             sys.exit("error: file %s has unknown schema %s" % (self.path,
554                                                                cfg_schema))
555
556     def _check_precondition(self, cfg):
557         """Check if the environment meet the precondition"""
558
559         if "precondition" in cfg:
560             precondition = cfg["precondition"]
561             installer_type = precondition.get("installer_type", None)
562             deploy_scenarios = precondition.get("deploy_scenarios", None)
563             tc_fit_pods = precondition.get("pod_name", None)
564             installer_type_env = os.environ.get('INSTALL_TYPE', None)
565             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
566             pod_name_env = os.environ.get('NODE_NAME', None)
567
568             LOG.info("installer_type: %s, installer_type_env: %s",
569                      installer_type, installer_type_env)
570             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
571                      deploy_scenarios, deploy_scenario_env)
572             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
573                      tc_fit_pods, pod_name_env)
574             if installer_type and installer_type_env:
575                 if installer_type_env not in installer_type:
576                     return False
577             if deploy_scenarios and deploy_scenario_env:
578                 deploy_scenarios_list = deploy_scenarios.split(',')
579                 for deploy_scenario in deploy_scenarios_list:
580                     if deploy_scenario_env.startswith(deploy_scenario):
581                         return True
582                 return False
583             if tc_fit_pods and pod_name_env:
584                 if pod_name_env not in tc_fit_pods:
585                     return False
586         return True
587
588
589 def is_ip_addr(addr):
590     """check if string addr is an IP address"""
591     try:
592         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
593     except AttributeError:
594         pass
595
596     try:
597         ipaddress.ip_address(addr.encode('utf-8'))
598     except ValueError:
599         return False
600     else:
601         return True
602
603
604 def _is_background_scenario(scenario):
605     if "run_in_background" in scenario:
606         return scenario["run_in_background"]
607     else:
608         return False
609
610
611 def parse_nodes_with_context(scenario_cfg):
612     """parse the 'nodes' fields in scenario """
613     # ensure consistency in node instantiation order
614     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
615                        for nodename in sorted(scenario_cfg["nodes"]))
616
617
618 def get_networks_from_nodes(nodes):
619     """parse the 'nodes' fields in scenario """
620     networks = {}
621     for node in nodes.values():
622         if not node:
623             continue
624         interfaces = node.get('interfaces', {})
625         for interface in interfaces.values():
626             vld_id = interface.get('vld_id')
627             # mgmt network doesn't have vld_id
628             if not vld_id:
629                 continue
630             network = Context.get_network({"vld_id": vld_id})
631             if network:
632                 networks[network['name']] = network
633     return networks
634
635
636 def runner_join(runner):
637     """join (wait for) a runner, exit process at runner failure"""
638     status = runner.join()
639     base_runner.Runner.release(runner)
640     return status
641
642
643 def print_invalid_header(source_name, args):
644     print("Invalid %(source)s passed:\n\n %(args)s\n"
645           % {"source": source_name, "args": args})
646
647
648 def parse_task_args(src_name, args):
649     if isinstance(args, collections.Mapping):
650         return args
651
652     try:
653         kw = args and yaml.safe_load(args)
654         kw = {} if kw is None else kw
655     except yaml.parser.ParserError as e:
656         print_invalid_header(src_name, args)
657         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
658               % {"source": src_name, "err": e})
659         raise TypeError()
660
661     if not isinstance(kw, dict):
662         print_invalid_header(src_name, args)
663         print("%(src)s had to be dict, actually %(src_type)s\n"
664               % {"src": src_name, "src_type": type(kw)})
665         raise TypeError()
666     return kw
667
668
669 def check_environment():
670     auth_url = os.environ.get('OS_AUTH_URL', None)
671     if not auth_url:
672         try:
673             source_env(constants.OPENRC)
674         except IOError as e:
675             if e.errno != errno.EEXIST:
676                 raise
677             LOG.debug('OPENRC file not found')
678
679
680 def change_server_name(scenario, suffix):
681     try:
682         host = scenario['host']
683     except KeyError:
684         pass
685     else:
686         try:
687             host['name'] += suffix
688         except TypeError:
689             scenario['host'] += suffix
690
691     try:
692         target = scenario['target']
693     except KeyError:
694         pass
695     else:
696         try:
697             target['name'] += suffix
698         except TypeError:
699             scenario['target'] += suffix
700
701     try:
702         key = 'targets'
703         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
704     except KeyError:
705         pass