yardstick env influxdb/grafana cmd support centos
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 import collections
24
25 from six.moves import filter
26
27 from yardstick.benchmark.contexts.base import Context
28 from yardstick.benchmark.runners import base as base_runner
29 from yardstick.dispatcher.base import Base as DispatcherBase
30 from yardstick.common.task_template import TaskTemplate
31 from yardstick.common.utils import source_env
32 from yardstick.common import utils
33 from yardstick.common import constants
34
35 output_file_default = "/tmp/yardstick.out"
36 config_file = '/etc/yardstick/yardstick.conf'
37 test_cases_dir_default = "tests/opnfv/test_cases/"
38 LOG = logging.getLogger(__name__)
39
40
41 class Task(object):     # pragma: no cover
42     """Task commands.
43
44        Set of commands to manage benchmark tasks.
45     """
46
47     def __init__(self):
48         self.contexts = []
49         self.outputs = {}
50
51     def start(self, args, **kwargs):
52         """Start a benchmark scenario."""
53
54         atexit.register(self.atexit_handler)
55
56         task_id = getattr(args, 'task_id')
57         self.task_id = task_id if task_id else str(uuid.uuid4())
58
59         check_environment()
60
61         try:
62             output_config = utils.parse_ini_file(config_file)
63         except Exception:
64             # all error will be ignore, the default value is {}
65             output_config = {}
66
67         self._init_output_config(output_config)
68         self._set_output_config(output_config, args.output_file)
69         LOG.debug('Output configuration is: %s', output_config)
70
71         if output_config['DEFAULT'].get('dispatcher') == 'file':
72             result = {'status': 0, 'result': {}}
73             utils.write_json_to_file(args.output_file, result)
74
75         total_start_time = time.time()
76         parser = TaskParser(args.inputfile[0])
77
78         if args.suite:
79             # 1.parse suite, return suite_params info
80             task_files, task_args, task_args_fnames = \
81                 parser.parse_suite()
82         else:
83             task_files = [parser.path]
84             task_args = [args.task_args]
85             task_args_fnames = [args.task_args_file]
86
87         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
88                  task_files, task_args, task_args_fnames)
89
90         if args.parse_only:
91             sys.exit(0)
92
93         testcases = {}
94         # parse task_files
95         for i in range(0, len(task_files)):
96             one_task_start_time = time.time()
97             parser.path = task_files[i]
98             scenarios, run_in_parallel, meet_precondition, contexts = \
99                 parser.parse_task(self.task_id, task_args[i],
100                                   task_args_fnames[i])
101
102             self.contexts.extend(contexts)
103
104             if not meet_precondition:
105                 LOG.info("meet_precondition is %s, please check envrionment",
106                          meet_precondition)
107                 continue
108
109             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
110             try:
111                 data = self._run(scenarios, run_in_parallel, args.output_file)
112             except KeyboardInterrupt:
113                 raise
114             except Exception:
115                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
116             else:
117                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
118
119             if args.keep_deploy:
120                 # keep deployment, forget about stack
121                 # (hide it for exit handler)
122                 self.contexts = []
123             else:
124                 for context in self.contexts[::-1]:
125                     context.undeploy()
126                 self.contexts = []
127             one_task_end_time = time.time()
128             LOG.info("task %s finished in %d secs", task_files[i],
129                      one_task_end_time - one_task_start_time)
130
131         result = self._get_format_result(testcases)
132
133         self._do_output(output_config, result)
134
135         total_end_time = time.time()
136         LOG.info("total finished in %d secs",
137                  total_end_time - total_start_time)
138
139         scenario = scenarios[0]
140         print("To generate report execute => yardstick report generate ",
141               scenario['task_id'], scenario['tc'])
142
143         print("Done, exiting")
144         return result
145
146     def _init_output_config(self, output_config):
147         output_config.setdefault('DEFAULT', {})
148         output_config.setdefault('dispatcher_http', {})
149         output_config.setdefault('dispatcher_file', {})
150         output_config.setdefault('dispatcher_influxdb', {})
151         output_config.setdefault('nsb', {})
152
153     def _set_output_config(self, output_config, file_path):
154         try:
155             out_type = os.environ['DISPATCHER']
156         except KeyError:
157             output_config['DEFAULT'].setdefault('dispatcher', 'file')
158         else:
159             output_config['DEFAULT']['dispatcher'] = out_type
160
161         output_config['dispatcher_file']['file_path'] = file_path
162
163         try:
164             target = os.environ['TARGET']
165         except KeyError:
166             pass
167         else:
168             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
169             output_config[k]['target'] = target
170
171     def _get_format_result(self, testcases):
172         criteria = self._get_task_criteria(testcases)
173
174         info = {
175             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
176             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
177             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
178             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
179         }
180
181         result = {
182             'status': 1,
183             'result': {
184                 'criteria': criteria,
185                 'task_id': self.task_id,
186                 'info': info,
187                 'testcases': testcases
188             }
189         }
190
191         return result
192
193     def _get_task_criteria(self, testcases):
194         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
195         if criteria:
196             return 'FAIL'
197         else:
198             return 'PASS'
199
200     def _do_output(self, output_config, result):
201
202         dispatcher = DispatcherBase.get(output_config)
203         dispatcher.flush_result_data(result)
204
205     def _run(self, scenarios, run_in_parallel, output_file):
206         """Deploys context and calls runners"""
207         for context in self.contexts:
208             context.deploy()
209
210         background_runners = []
211
212         result = []
213         # Start all background scenarios
214         for scenario in filter(_is_background_scenario, scenarios):
215             scenario["runner"] = dict(type="Duration", duration=1000000000)
216             runner = self.run_one_scenario(scenario, output_file)
217             background_runners.append(runner)
218
219         runners = []
220         if run_in_parallel:
221             for scenario in scenarios:
222                 if not _is_background_scenario(scenario):
223                     runner = self.run_one_scenario(scenario, output_file)
224                     runners.append(runner)
225
226             # Wait for runners to finish
227             for runner in runners:
228                 status = runner_join(runner)
229                 if status != 0:
230                     raise RuntimeError
231                 self.outputs.update(runner.get_output())
232                 result.extend(runner.get_result())
233                 print("Runner ended, output in", output_file)
234         else:
235             # run serially
236             for scenario in scenarios:
237                 if not _is_background_scenario(scenario):
238                     runner = self.run_one_scenario(scenario, output_file)
239                     status = runner_join(runner)
240                     if status != 0:
241                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
242                         raise RuntimeError
243                     self.outputs.update(runner.get_output())
244                     result.extend(runner.get_result())
245                     print("Runner ended, output in", output_file)
246
247         # Abort background runners
248         for runner in background_runners:
249             runner.abort()
250
251         # Wait for background runners to finish
252         for runner in background_runners:
253             status = runner.join(timeout=60)
254             if status is None:
255                 # Nuke if it did not stop nicely
256                 base_runner.Runner.terminate(runner)
257                 status = runner_join(runner)
258             else:
259                 base_runner.Runner.release(runner)
260
261             self.outputs.update(runner.get_output())
262             result.extend(runner.get_result())
263             print("Background task ended")
264         return result
265
266     def atexit_handler(self):
267         """handler for process termination"""
268         base_runner.Runner.terminate_all()
269
270         if self.contexts:
271             print("Undeploying all contexts")
272             for context in self.contexts[::-1]:
273                 context.undeploy()
274
275     def _parse_options(self, op):
276         if isinstance(op, dict):
277             return {k: self._parse_options(v) for k, v in op.items()}
278         elif isinstance(op, list):
279             return [self._parse_options(v) for v in op]
280         elif isinstance(op, str):
281             return self.outputs.get(op[1:]) if op.startswith('$') else op
282         else:
283             return op
284
285     def run_one_scenario(self, scenario_cfg, output_file):
286         """run one scenario using context"""
287         runner_cfg = scenario_cfg["runner"]
288         runner_cfg['output_filename'] = output_file
289
290         options = scenario_cfg.get('options', {})
291         scenario_cfg['options'] = self._parse_options(options)
292
293         # TODO support get multi hosts/vms info
294         context_cfg = {}
295         if "host" in scenario_cfg:
296             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
297
298         if "target" in scenario_cfg:
299             if is_ip_addr(scenario_cfg["target"]):
300                 context_cfg['target'] = {}
301                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
302             else:
303                 context_cfg['target'] = Context.get_server(
304                     scenario_cfg["target"])
305                 if self._is_same_heat_context(scenario_cfg["host"],
306                                               scenario_cfg["target"]):
307                     context_cfg["target"]["ipaddr"] = \
308                         context_cfg["target"]["private_ip"]
309                 else:
310                     context_cfg["target"]["ipaddr"] = \
311                         context_cfg["target"]["ip"]
312
313         if "targets" in scenario_cfg:
314             ip_list = []
315             for target in scenario_cfg["targets"]:
316                 if is_ip_addr(target):
317                     ip_list.append(target)
318                     context_cfg['target'] = {}
319                 else:
320                     context_cfg['target'] = Context.get_server(target)
321                     if self._is_same_heat_context(scenario_cfg["host"],
322                                                   target):
323                         ip_list.append(context_cfg["target"]["private_ip"])
324                     else:
325                         ip_list.append(context_cfg["target"]["ip"])
326             context_cfg['target']['ipaddr'] = ','.join(ip_list)
327
328         if "nodes" in scenario_cfg:
329             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
330         runner = base_runner.Runner.get(runner_cfg)
331
332         print("Starting runner of type '%s'" % runner_cfg["type"])
333         runner.run(scenario_cfg, context_cfg)
334
335         return runner
336
337     def _is_same_heat_context(self, host_attr, target_attr):
338         """check if two servers are in the same heat context
339         host_attr: either a name for a server created by yardstick or a dict
340         with attribute name mapping when using external heat templates
341         target_attr: either a name for a server created by yardstick or a dict
342         with attribute name mapping when using external heat templates
343         """
344         host = None
345         target = None
346         for context in self.contexts:
347             if context.__context_type__ != "Heat":
348                 continue
349
350             host = context._get_server(host_attr)
351             if host is None:
352                 continue
353
354             target = context._get_server(target_attr)
355             if target is None:
356                 return False
357
358             # Both host and target is not None, then they are in the
359             # same heat context.
360             return True
361
362         return False
363
364
365 class TaskParser(object):       # pragma: no cover
366     """Parser for task config files in yaml format"""
367
368     def __init__(self, path):
369         self.path = path
370
371     def _meet_constraint(self, task, cur_pod, cur_installer):
372         if "constraint" in task:
373             constraint = task.get('constraint', None)
374             if constraint is not None:
375                 tc_fit_pod = constraint.get('pod', None)
376                 tc_fit_installer = constraint.get('installer', None)
377                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
378                          cur_pod, cur_installer, constraint)
379                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
380                     return False
381                 if (cur_installer is None) or (tc_fit_installer and cur_installer
382                                                not in tc_fit_installer):
383                     return False
384         return True
385
386     def _get_task_para(self, task, cur_pod):
387         task_args = task.get('task_args', None)
388         if task_args is not None:
389             task_args = task_args.get(cur_pod, task_args.get('default'))
390         task_args_fnames = task.get('task_args_fnames', None)
391         if task_args_fnames is not None:
392             task_args_fnames = task_args_fnames.get(cur_pod, None)
393         return task_args, task_args_fnames
394
395     def parse_suite(self):
396         """parse the suite file and return a list of task config file paths
397            and lists of optional parameters if present"""
398         LOG.info("\nParsing suite file:%s", self.path)
399
400         try:
401             with open(self.path) as stream:
402                 cfg = yaml.load(stream)
403         except IOError as ioerror:
404             sys.exit(ioerror)
405
406         self._check_schema(cfg["schema"], "suite")
407         LOG.info("\nStarting scenario:%s", cfg["name"])
408
409         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
410         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
411                                       test_cases_dir)
412         if test_cases_dir[-1] != os.sep:
413             test_cases_dir += os.sep
414
415         cur_pod = os.environ.get('NODE_NAME', None)
416         cur_installer = os.environ.get('INSTALLER_TYPE', None)
417
418         valid_task_files = []
419         valid_task_args = []
420         valid_task_args_fnames = []
421
422         for task in cfg["test_cases"]:
423             # 1.check file_name
424             if "file_name" in task:
425                 task_fname = task.get('file_name', None)
426                 if task_fname is None:
427                     continue
428             else:
429                 continue
430             # 2.check constraint
431             if self._meet_constraint(task, cur_pod, cur_installer):
432                 valid_task_files.append(test_cases_dir + task_fname)
433             else:
434                 continue
435             # 3.fetch task parameters
436             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
437             valid_task_args.append(task_args)
438             valid_task_args_fnames.append(task_args_fnames)
439
440         return valid_task_files, valid_task_args, valid_task_args_fnames
441
442     def parse_task(self, task_id, task_args=None, task_args_file=None):
443         """parses the task file and return an context and scenario instances"""
444         print("Parsing task config:", self.path)
445
446         try:
447             kw = {}
448             if task_args_file:
449                 with open(task_args_file) as f:
450                     kw.update(parse_task_args("task_args_file", f.read()))
451             kw.update(parse_task_args("task_args", task_args))
452         except TypeError:
453             raise TypeError()
454
455         try:
456             with open(self.path) as f:
457                 try:
458                     input_task = f.read()
459                     rendered_task = TaskTemplate.render(input_task, **kw)
460                 except Exception as e:
461                     print("Failed to render template:\n%(task)s\n%(err)s\n"
462                           % {"task": input_task, "err": e})
463                     raise e
464                 print("Input task is:\n%s\n" % rendered_task)
465
466                 cfg = yaml.load(rendered_task)
467         except IOError as ioerror:
468             sys.exit(ioerror)
469
470         self._check_schema(cfg["schema"], "task")
471         meet_precondition = self._check_precondition(cfg)
472
473         # TODO: support one or many contexts? Many would simpler and precise
474         # TODO: support hybrid context type
475         if "context" in cfg:
476             context_cfgs = [cfg["context"]]
477         elif "contexts" in cfg:
478             context_cfgs = cfg["contexts"]
479         else:
480             context_cfgs = [{"type": "Dummy"}]
481
482         contexts = []
483         name_suffix = '-{}'.format(task_id[:8])
484         for cfg_attrs in context_cfgs:
485             try:
486                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
487                                                   name_suffix)
488             except KeyError:
489                 pass
490             # default to Heat context because we are testing OpenStack
491             context_type = cfg_attrs.get("type", "Heat")
492             context = Context.get(context_type)
493             context.init(cfg_attrs)
494             contexts.append(context)
495
496         run_in_parallel = cfg.get("run_in_parallel", False)
497
498         # add tc and task id for influxdb extended tags
499         for scenario in cfg["scenarios"]:
500             task_name = os.path.splitext(os.path.basename(self.path))[0]
501             scenario["tc"] = task_name
502             scenario["task_id"] = task_id
503             # embed task path into scenario so we can load other files
504             # relative to task path
505             scenario["task_path"] = os.path.dirname(self.path)
506
507             change_server_name(scenario, name_suffix)
508
509             try:
510                 for node in scenario['nodes']:
511                     scenario['nodes'][node] += name_suffix
512             except KeyError:
513                 pass
514
515         # TODO we need something better here, a class that represent the file
516         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
517
518     def _check_schema(self, cfg_schema, schema_type):
519         """Check if config file is using the correct schema type"""
520
521         if cfg_schema != "yardstick:" + schema_type + ":0.1":
522             sys.exit("error: file %s has unknown schema %s" % (self.path,
523                                                                cfg_schema))
524
525     def _check_precondition(self, cfg):
526         """Check if the envrionment meet the preconditon"""
527
528         if "precondition" in cfg:
529             precondition = cfg["precondition"]
530             installer_type = precondition.get("installer_type", None)
531             deploy_scenarios = precondition.get("deploy_scenarios", None)
532             tc_fit_pods = precondition.get("pod_name", None)
533             installer_type_env = os.environ.get('INSTALL_TYPE', None)
534             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
535             pod_name_env = os.environ.get('NODE_NAME', None)
536
537             LOG.info("installer_type: %s, installer_type_env: %s",
538                      installer_type, installer_type_env)
539             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
540                      deploy_scenarios, deploy_scenario_env)
541             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
542                      tc_fit_pods, pod_name_env)
543             if installer_type and installer_type_env:
544                 if installer_type_env not in installer_type:
545                     return False
546             if deploy_scenarios and deploy_scenario_env:
547                 deploy_scenarios_list = deploy_scenarios.split(',')
548                 for deploy_scenario in deploy_scenarios_list:
549                     if deploy_scenario_env.startswith(deploy_scenario):
550                         return True
551                 return False
552             if tc_fit_pods and pod_name_env:
553                 if pod_name_env not in tc_fit_pods:
554                     return False
555         return True
556
557
558 def is_ip_addr(addr):
559     """check if string addr is an IP address"""
560     try:
561         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
562     except AttributeError:
563         pass
564
565     try:
566         ipaddress.ip_address(addr.encode('utf-8'))
567     except ValueError:
568         return False
569     else:
570         return True
571
572
573 def _is_background_scenario(scenario):
574     if "run_in_background" in scenario:
575         return scenario["run_in_background"]
576     else:
577         return False
578
579
580 def parse_nodes_with_context(scenario_cfg):
581     """paras the 'nodes' fields in scenario """
582     nodes = scenario_cfg["nodes"]
583
584     nodes_cfg = {}
585     for nodename in nodes:
586         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
587
588     return nodes_cfg
589
590
591 def runner_join(runner):
592     """join (wait for) a runner, exit process at runner failure"""
593     status = runner.join()
594     base_runner.Runner.release(runner)
595     return status
596
597
598 def print_invalid_header(source_name, args):
599     print("Invalid %(source)s passed:\n\n %(args)s\n"
600           % {"source": source_name, "args": args})
601
602
603 def parse_task_args(src_name, args):
604     if isinstance(args, collections.Mapping):
605         return args
606
607     try:
608         kw = args and yaml.safe_load(args)
609         kw = {} if kw is None else kw
610     except yaml.parser.ParserError as e:
611         print_invalid_header(src_name, args)
612         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
613               % {"source": src_name, "err": e})
614         raise TypeError()
615
616     if not isinstance(kw, dict):
617         print_invalid_header(src_name, args)
618         print("%(src)s had to be dict, actually %(src_type)s\n"
619               % {"src": src_name, "src_type": type(kw)})
620         raise TypeError()
621     return kw
622
623
624 def check_environment():
625     auth_url = os.environ.get('OS_AUTH_URL', None)
626     if not auth_url:
627         try:
628             source_env(constants.OPENRC)
629         except IOError as e:
630             if e.errno != errno.EEXIST:
631                 raise
632             LOG.debug('OPENRC file not found')
633
634
635 def change_server_name(scenario, suffix):
636     try:
637         host = scenario['host']
638     except KeyError:
639         pass
640     else:
641         try:
642             host['name'] += suffix
643         except TypeError:
644             scenario['host'] += suffix
645
646     try:
647         target = scenario['target']
648     except KeyError:
649         pass
650     else:
651         try:
652             target['name'] += suffix
653         except TypeError:
654             scenario['target'] += suffix
655
656     try:
657         key = 'targets'
658         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
659     except KeyError:
660         pass