Merge changes from topic '35521'
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 import collections
24
25 from six.moves import filter
26
27 from yardstick.benchmark.contexts.base import Context
28 from yardstick.benchmark.runners import base as base_runner
29 from yardstick.dispatcher.base import Base as DispatcherBase
30 from yardstick.common.task_template import TaskTemplate
31 from yardstick.common.utils import source_env
32 from yardstick.common import utils
33 from yardstick.common import constants
34
35 output_file_default = "/tmp/yardstick.out"
36 config_file = '/etc/yardstick/yardstick.conf'
37 test_cases_dir_default = "tests/opnfv/test_cases/"
38 LOG = logging.getLogger(__name__)
39
40
41 class Task(object):     # pragma: no cover
42     """Task commands.
43
44        Set of commands to manage benchmark tasks.
45     """
46
47     def __init__(self):
48         self.contexts = []
49         self.outputs = {}
50
51     def start(self, args, **kwargs):
52         """Start a benchmark scenario."""
53
54         atexit.register(self.atexit_handler)
55
56         task_id = getattr(args, 'task_id')
57         self.task_id = task_id if task_id else str(uuid.uuid4())
58
59         check_environment()
60
61         try:
62             output_config = utils.parse_ini_file(config_file)
63         except Exception:
64             # all error will be ignore, the default value is {}
65             output_config = {}
66
67         self._init_output_config(output_config)
68         self._set_output_config(output_config, args.output_file)
69         LOG.debug('Output configuration is: %s', output_config)
70
71         if output_config['DEFAULT'].get('dispatcher') == 'file':
72             result = {'status': 0, 'result': {}}
73             utils.write_json_to_file(args.output_file, result)
74
75         total_start_time = time.time()
76         parser = TaskParser(args.inputfile[0])
77
78         if args.suite:
79             # 1.parse suite, return suite_params info
80             task_files, task_args, task_args_fnames = \
81                 parser.parse_suite()
82         else:
83             task_files = [parser.path]
84             task_args = [args.task_args]
85             task_args_fnames = [args.task_args_file]
86
87         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
88                  task_files, task_args, task_args_fnames)
89
90         if args.parse_only:
91             sys.exit(0)
92
93         testcases = {}
94         # parse task_files
95         for i in range(0, len(task_files)):
96             one_task_start_time = time.time()
97             parser.path = task_files[i]
98             scenarios, run_in_parallel, meet_precondition, contexts = \
99                 parser.parse_task(self.task_id, task_args[i],
100                                   task_args_fnames[i])
101
102             self.contexts.extend(contexts)
103
104             if not meet_precondition:
105                 LOG.info("meet_precondition is %s, please check envrionment",
106                          meet_precondition)
107                 continue
108
109             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
110             try:
111                 data = self._run(scenarios, run_in_parallel, args.output_file)
112             except KeyboardInterrupt:
113                 raise
114             except Exception:
115                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
116             else:
117                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
118
119             if args.keep_deploy:
120                 # keep deployment, forget about stack
121                 # (hide it for exit handler)
122                 self.contexts = []
123             else:
124                 for context in self.contexts[::-1]:
125                     context.undeploy()
126                 self.contexts = []
127             one_task_end_time = time.time()
128             LOG.info("task %s finished in %d secs", task_files[i],
129                      one_task_end_time - one_task_start_time)
130
131         result = self._get_format_result(testcases)
132
133         self._do_output(output_config, result)
134
135         total_end_time = time.time()
136         LOG.info("total finished in %d secs",
137                  total_end_time - total_start_time)
138
139         scenario = scenarios[0]
140         print("To generate report execute => yardstick report generate ",
141               scenario['task_id'], scenario['tc'])
142
143         print("Done, exiting")
144         return result
145
146     def _init_output_config(self, output_config):
147         output_config.setdefault('DEFAULT', {})
148         output_config.setdefault('dispatcher_http', {})
149         output_config.setdefault('dispatcher_file', {})
150         output_config.setdefault('dispatcher_influxdb', {})
151         output_config.setdefault('nsb', {})
152
153     def _set_output_config(self, output_config, file_path):
154         try:
155             out_type = os.environ['DISPATCHER']
156         except KeyError:
157             output_config['DEFAULT'].setdefault('dispatcher', 'file')
158         else:
159             output_config['DEFAULT']['dispatcher'] = out_type
160
161         output_config['dispatcher_file']['file_path'] = file_path
162
163         try:
164             target = os.environ['TARGET']
165         except KeyError:
166             pass
167         else:
168             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
169             output_config[k]['target'] = target
170
171     def _get_format_result(self, testcases):
172         criteria = self._get_task_criteria(testcases)
173
174         info = {
175             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
176             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
177             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
178             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
179         }
180
181         result = {
182             'status': 1,
183             'result': {
184                 'criteria': criteria,
185                 'task_id': self.task_id,
186                 'info': info,
187                 'testcases': testcases
188             }
189         }
190
191         return result
192
193     def _get_task_criteria(self, testcases):
194         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
195         if criteria:
196             return 'FAIL'
197         else:
198             return 'PASS'
199
200     def _do_output(self, output_config, result):
201
202         dispatcher = DispatcherBase.get(output_config)
203         dispatcher.flush_result_data(result)
204
205     def _run(self, scenarios, run_in_parallel, output_file):
206         """Deploys context and calls runners"""
207         for context in self.contexts:
208             context.deploy()
209
210         background_runners = []
211
212         result = []
213         # Start all background scenarios
214         for scenario in filter(_is_background_scenario, scenarios):
215             scenario["runner"] = dict(type="Duration", duration=1000000000)
216             runner = self.run_one_scenario(scenario, output_file)
217             background_runners.append(runner)
218
219         runners = []
220         if run_in_parallel:
221             for scenario in scenarios:
222                 if not _is_background_scenario(scenario):
223                     runner = self.run_one_scenario(scenario, output_file)
224                     runners.append(runner)
225
226             # Wait for runners to finish
227             for runner in runners:
228                 status = runner_join(runner)
229                 if status != 0:
230                     raise RuntimeError
231                 self.outputs.update(runner.get_output())
232                 result.extend(runner.get_result())
233                 print("Runner ended, output in", output_file)
234         else:
235             # run serially
236             for scenario in scenarios:
237                 if not _is_background_scenario(scenario):
238                     runner = self.run_one_scenario(scenario, output_file)
239                     status = runner_join(runner)
240                     if status != 0:
241                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
242                         raise RuntimeError
243                     self.outputs.update(runner.get_output())
244                     result.extend(runner.get_result())
245                     print("Runner ended, output in", output_file)
246
247         # Abort background runners
248         for runner in background_runners:
249             runner.abort()
250
251         # Wait for background runners to finish
252         for runner in background_runners:
253             status = runner.join(timeout=60)
254             if status is None:
255                 # Nuke if it did not stop nicely
256                 base_runner.Runner.terminate(runner)
257                 status = runner_join(runner)
258             else:
259                 base_runner.Runner.release(runner)
260
261             self.outputs.update(runner.get_output())
262             result.extend(runner.get_result())
263             print("Background task ended")
264         return result
265
266     def atexit_handler(self):
267         """handler for process termination"""
268         base_runner.Runner.terminate_all()
269
270         if self.contexts:
271             print("Undeploying all contexts")
272             for context in self.contexts[::-1]:
273                 context.undeploy()
274
275     def _parse_options(self, op):
276         if isinstance(op, dict):
277             return {k: self._parse_options(v) for k, v in op.items()}
278         elif isinstance(op, list):
279             return [self._parse_options(v) for v in op]
280         elif isinstance(op, str):
281             return self.outputs.get(op[1:]) if op.startswith('$') else op
282         else:
283             return op
284
285     def run_one_scenario(self, scenario_cfg, output_file):
286         """run one scenario using context"""
287         runner_cfg = scenario_cfg["runner"]
288         runner_cfg['output_filename'] = output_file
289
290         options = scenario_cfg.get('options', {})
291         scenario_cfg['options'] = self._parse_options(options)
292
293         # TODO support get multi hosts/vms info
294         context_cfg = {}
295         if "host" in scenario_cfg:
296             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
297
298         if "target" in scenario_cfg:
299             if is_ip_addr(scenario_cfg["target"]):
300                 context_cfg['target'] = {}
301                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
302             else:
303                 context_cfg['target'] = Context.get_server(
304                     scenario_cfg["target"])
305                 if self._is_same_heat_context(scenario_cfg["host"],
306                                               scenario_cfg["target"]):
307                     context_cfg["target"]["ipaddr"] = \
308                         context_cfg["target"]["private_ip"]
309                 else:
310                     context_cfg["target"]["ipaddr"] = \
311                         context_cfg["target"]["ip"]
312
313         if "targets" in scenario_cfg:
314             ip_list = []
315             for target in scenario_cfg["targets"]:
316                 if is_ip_addr(target):
317                     ip_list.append(target)
318                     context_cfg['target'] = {}
319                 else:
320                     context_cfg['target'] = Context.get_server(target)
321                     if self._is_same_heat_context(scenario_cfg["host"],
322                                                   target):
323                         ip_list.append(context_cfg["target"]["private_ip"])
324                     else:
325                         ip_list.append(context_cfg["target"]["ip"])
326             context_cfg['target']['ipaddr'] = ','.join(ip_list)
327
328         if "nodes" in scenario_cfg:
329             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
330             context_cfg["networks"] = get_networks_from_nodes(
331                 context_cfg["nodes"])
332         runner = base_runner.Runner.get(runner_cfg)
333
334         print("Starting runner of type '%s'" % runner_cfg["type"])
335         runner.run(scenario_cfg, context_cfg)
336
337         return runner
338
339     def _is_same_heat_context(self, host_attr, target_attr):
340         """check if two servers are in the same heat context
341         host_attr: either a name for a server created by yardstick or a dict
342         with attribute name mapping when using external heat templates
343         target_attr: either a name for a server created by yardstick or a dict
344         with attribute name mapping when using external heat templates
345         """
346         host = None
347         target = None
348         for context in self.contexts:
349             if context.__context_type__ != "Heat":
350                 continue
351
352             host = context._get_server(host_attr)
353             if host is None:
354                 continue
355
356             target = context._get_server(target_attr)
357             if target is None:
358                 return False
359
360             # Both host and target is not None, then they are in the
361             # same heat context.
362             return True
363
364         return False
365
366
367 class TaskParser(object):       # pragma: no cover
368     """Parser for task config files in yaml format"""
369
370     def __init__(self, path):
371         self.path = path
372
373     def _meet_constraint(self, task, cur_pod, cur_installer):
374         if "constraint" in task:
375             constraint = task.get('constraint', None)
376             if constraint is not None:
377                 tc_fit_pod = constraint.get('pod', None)
378                 tc_fit_installer = constraint.get('installer', None)
379                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
380                          cur_pod, cur_installer, constraint)
381                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
382                     return False
383                 if (cur_installer is None) or (tc_fit_installer and cur_installer
384                                                not in tc_fit_installer):
385                     return False
386         return True
387
388     def _get_task_para(self, task, cur_pod):
389         task_args = task.get('task_args', None)
390         if task_args is not None:
391             task_args = task_args.get(cur_pod, task_args.get('default'))
392         task_args_fnames = task.get('task_args_fnames', None)
393         if task_args_fnames is not None:
394             task_args_fnames = task_args_fnames.get(cur_pod, None)
395         return task_args, task_args_fnames
396
397     def parse_suite(self):
398         """parse the suite file and return a list of task config file paths
399            and lists of optional parameters if present"""
400         LOG.info("\nParsing suite file:%s", self.path)
401
402         try:
403             with open(self.path) as stream:
404                 cfg = yaml.load(stream)
405         except IOError as ioerror:
406             sys.exit(ioerror)
407
408         self._check_schema(cfg["schema"], "suite")
409         LOG.info("\nStarting scenario:%s", cfg["name"])
410
411         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
412         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
413                                       test_cases_dir)
414         if test_cases_dir[-1] != os.sep:
415             test_cases_dir += os.sep
416
417         cur_pod = os.environ.get('NODE_NAME', None)
418         cur_installer = os.environ.get('INSTALLER_TYPE', None)
419
420         valid_task_files = []
421         valid_task_args = []
422         valid_task_args_fnames = []
423
424         for task in cfg["test_cases"]:
425             # 1.check file_name
426             if "file_name" in task:
427                 task_fname = task.get('file_name', None)
428                 if task_fname is None:
429                     continue
430             else:
431                 continue
432             # 2.check constraint
433             if self._meet_constraint(task, cur_pod, cur_installer):
434                 valid_task_files.append(test_cases_dir + task_fname)
435             else:
436                 continue
437             # 3.fetch task parameters
438             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
439             valid_task_args.append(task_args)
440             valid_task_args_fnames.append(task_args_fnames)
441
442         return valid_task_files, valid_task_args, valid_task_args_fnames
443
444     def parse_task(self, task_id, task_args=None, task_args_file=None):
445         """parses the task file and return an context and scenario instances"""
446         print("Parsing task config:", self.path)
447
448         try:
449             kw = {}
450             if task_args_file:
451                 with open(task_args_file) as f:
452                     kw.update(parse_task_args("task_args_file", f.read()))
453             kw.update(parse_task_args("task_args", task_args))
454         except TypeError:
455             raise TypeError()
456
457         try:
458             with open(self.path) as f:
459                 try:
460                     input_task = f.read()
461                     rendered_task = TaskTemplate.render(input_task, **kw)
462                 except Exception as e:
463                     print("Failed to render template:\n%(task)s\n%(err)s\n"
464                           % {"task": input_task, "err": e})
465                     raise e
466                 print("Input task is:\n%s\n" % rendered_task)
467
468                 cfg = yaml.load(rendered_task)
469         except IOError as ioerror:
470             sys.exit(ioerror)
471
472         self._check_schema(cfg["schema"], "task")
473         meet_precondition = self._check_precondition(cfg)
474
475         # TODO: support one or many contexts? Many would simpler and precise
476         # TODO: support hybrid context type
477         if "context" in cfg:
478             context_cfgs = [cfg["context"]]
479         elif "contexts" in cfg:
480             context_cfgs = cfg["contexts"]
481         else:
482             context_cfgs = [{"type": "Dummy"}]
483
484         contexts = []
485         name_suffix = '-{}'.format(task_id[:8])
486         for cfg_attrs in context_cfgs:
487             try:
488                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
489                                                   name_suffix)
490             except KeyError:
491                 pass
492             # default to Heat context because we are testing OpenStack
493             context_type = cfg_attrs.get("type", "Heat")
494             context = Context.get(context_type)
495             context.init(cfg_attrs)
496             contexts.append(context)
497
498         run_in_parallel = cfg.get("run_in_parallel", False)
499
500         # add tc and task id for influxdb extended tags
501         for scenario in cfg["scenarios"]:
502             task_name = os.path.splitext(os.path.basename(self.path))[0]
503             scenario["tc"] = task_name
504             scenario["task_id"] = task_id
505             # embed task path into scenario so we can load other files
506             # relative to task path
507             scenario["task_path"] = os.path.dirname(self.path)
508
509             change_server_name(scenario, name_suffix)
510
511             try:
512                 for node in scenario['nodes']:
513                     scenario['nodes'][node] += name_suffix
514             except KeyError:
515                 pass
516
517         # TODO we need something better here, a class that represent the file
518         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
519
520     def _check_schema(self, cfg_schema, schema_type):
521         """Check if config file is using the correct schema type"""
522
523         if cfg_schema != "yardstick:" + schema_type + ":0.1":
524             sys.exit("error: file %s has unknown schema %s" % (self.path,
525                                                                cfg_schema))
526
527     def _check_precondition(self, cfg):
528         """Check if the environment meet the precondition"""
529
530         if "precondition" in cfg:
531             precondition = cfg["precondition"]
532             installer_type = precondition.get("installer_type", None)
533             deploy_scenarios = precondition.get("deploy_scenarios", None)
534             tc_fit_pods = precondition.get("pod_name", None)
535             installer_type_env = os.environ.get('INSTALL_TYPE', None)
536             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
537             pod_name_env = os.environ.get('NODE_NAME', None)
538
539             LOG.info("installer_type: %s, installer_type_env: %s",
540                      installer_type, installer_type_env)
541             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
542                      deploy_scenarios, deploy_scenario_env)
543             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
544                      tc_fit_pods, pod_name_env)
545             if installer_type and installer_type_env:
546                 if installer_type_env not in installer_type:
547                     return False
548             if deploy_scenarios and deploy_scenario_env:
549                 deploy_scenarios_list = deploy_scenarios.split(',')
550                 for deploy_scenario in deploy_scenarios_list:
551                     if deploy_scenario_env.startswith(deploy_scenario):
552                         return True
553                 return False
554             if tc_fit_pods and pod_name_env:
555                 if pod_name_env not in tc_fit_pods:
556                     return False
557         return True
558
559
560 def is_ip_addr(addr):
561     """check if string addr is an IP address"""
562     try:
563         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
564     except AttributeError:
565         pass
566
567     try:
568         ipaddress.ip_address(addr.encode('utf-8'))
569     except ValueError:
570         return False
571     else:
572         return True
573
574
575 def _is_background_scenario(scenario):
576     if "run_in_background" in scenario:
577         return scenario["run_in_background"]
578     else:
579         return False
580
581
582 def parse_nodes_with_context(scenario_cfg):
583     """parse the 'nodes' fields in scenario """
584     nodes = scenario_cfg["nodes"]
585     return {nodename: Context.get_server(node) for nodename, node in nodes.items()}
586
587
588 def get_networks_from_nodes(nodes):
589     """parse the 'nodes' fields in scenario """
590     networks = {}
591     for node in nodes.values():
592         if not node:
593             continue
594         for interface in node['interfaces'].values():
595             vld_id = interface.get('vld_id')
596             # mgmt network doesn't have vld_id
597             if not vld_id:
598                 continue
599             network = Context.get_network({"vld_id": vld_id})
600             if network:
601                 networks[network['name']] = network
602     return networks
603
604
605 def runner_join(runner):
606     """join (wait for) a runner, exit process at runner failure"""
607     status = runner.join()
608     base_runner.Runner.release(runner)
609     return status
610
611
612 def print_invalid_header(source_name, args):
613     print("Invalid %(source)s passed:\n\n %(args)s\n"
614           % {"source": source_name, "args": args})
615
616
617 def parse_task_args(src_name, args):
618     if isinstance(args, collections.Mapping):
619         return args
620
621     try:
622         kw = args and yaml.safe_load(args)
623         kw = {} if kw is None else kw
624     except yaml.parser.ParserError as e:
625         print_invalid_header(src_name, args)
626         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
627               % {"source": src_name, "err": e})
628         raise TypeError()
629
630     if not isinstance(kw, dict):
631         print_invalid_header(src_name, args)
632         print("%(src)s had to be dict, actually %(src_type)s\n"
633               % {"src": src_name, "src_type": type(kw)})
634         raise TypeError()
635     return kw
636
637
638 def check_environment():
639     auth_url = os.environ.get('OS_AUTH_URL', None)
640     if not auth_url:
641         try:
642             source_env(constants.OPENRC)
643         except IOError as e:
644             if e.errno != errno.EEXIST:
645                 raise
646             LOG.debug('OPENRC file not found')
647
648
649 def change_server_name(scenario, suffix):
650     try:
651         host = scenario['host']
652     except KeyError:
653         pass
654     else:
655         try:
656             host['name'] += suffix
657         except TypeError:
658             scenario['host'] += suffix
659
660     try:
661         target = scenario['target']
662     except KeyError:
663         pass
664     else:
665         try:
666             target['name'] += suffix
667         except TypeError:
668             scenario['target'] += suffix
669
670     try:
671         key = 'targets'
672         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
673     except KeyError:
674         pass