Yardstick API architecture improvement
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 import collections
24
25 from six.moves import filter
26
27 from yardstick.benchmark.contexts.base import Context
28 from yardstick.benchmark.runners import base as base_runner
29 from yardstick.dispatcher.base import Base as DispatcherBase
30 from yardstick.common.task_template import TaskTemplate
31 from yardstick.common.utils import source_env
32 from yardstick.common import utils
33 from yardstick.common import constants
34
35 output_file_default = "/tmp/yardstick.out"
36 config_file = '/etc/yardstick/yardstick.conf'
37 test_cases_dir_default = "tests/opnfv/test_cases/"
38 LOG = logging.getLogger(__name__)
39
40
41 class Task(object):     # pragma: no cover
42     """Task commands.
43
44        Set of commands to manage benchmark tasks.
45     """
46
47     def __init__(self):
48         self.contexts = []
49         self.outputs = {}
50
51     def start(self, args, **kwargs):
52         """Start a benchmark scenario."""
53
54         atexit.register(self.atexit_handler)
55
56         task_id = getattr(args, 'task_id')
57         self.task_id = task_id if task_id else str(uuid.uuid4())
58
59         check_environment()
60
61         output_config = utils.parse_ini_file(config_file)
62         self._init_output_config(output_config)
63         self._set_output_config(output_config, args.output_file)
64         LOG.debug('Output configuration is: %s', output_config)
65
66         if output_config['DEFAULT'].get('dispatcher') == 'file':
67             result = {'status': 0, 'result': {}}
68             utils.write_json_to_file(args.output_file, result)
69
70         total_start_time = time.time()
71         parser = TaskParser(args.inputfile[0])
72
73         if args.suite:
74             # 1.parse suite, return suite_params info
75             task_files, task_args, task_args_fnames = \
76                 parser.parse_suite()
77         else:
78             task_files = [parser.path]
79             task_args = [args.task_args]
80             task_args_fnames = [args.task_args_file]
81
82         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
83                  task_files, task_args, task_args_fnames)
84
85         if args.parse_only:
86             sys.exit(0)
87
88         testcases = {}
89         # parse task_files
90         for i in range(0, len(task_files)):
91             one_task_start_time = time.time()
92             parser.path = task_files[i]
93             scenarios, run_in_parallel, meet_precondition, contexts = \
94                 parser.parse_task(self.task_id, task_args[i],
95                                   task_args_fnames[i])
96
97             self.contexts.extend(contexts)
98
99             if not meet_precondition:
100                 LOG.info("meet_precondition is %s, please check envrionment",
101                          meet_precondition)
102                 continue
103
104             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
105             try:
106                 data = self._run(scenarios, run_in_parallel, args.output_file)
107             except KeyboardInterrupt:
108                 raise
109             except Exception:
110                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
111             else:
112                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
113
114             if args.keep_deploy:
115                 # keep deployment, forget about stack
116                 # (hide it for exit handler)
117                 self.contexts = []
118             else:
119                 for context in self.contexts[::-1]:
120                     context.undeploy()
121                 self.contexts = []
122             one_task_end_time = time.time()
123             LOG.info("task %s finished in %d secs", task_files[i],
124                      one_task_end_time - one_task_start_time)
125
126         result = self._get_format_result(testcases)
127
128         self._do_output(output_config, result)
129
130         total_end_time = time.time()
131         LOG.info("total finished in %d secs",
132                  total_end_time - total_start_time)
133
134         scenario = scenarios[0]
135         print("To generate report execute => yardstick report generate ",
136               scenario['task_id'], scenario['tc'])
137
138         print("Done, exiting")
139         return result
140
141     def _init_output_config(self, output_config):
142         output_config.setdefault('DEFAULT', {})
143         output_config.setdefault('dispatcher_http', {})
144         output_config.setdefault('dispatcher_file', {})
145         output_config.setdefault('dispatcher_influxdb', {})
146         output_config.setdefault('nsb', {})
147
148     def _set_output_config(self, output_config, file_path):
149         try:
150             out_type = os.environ['DISPATCHER']
151         except KeyError:
152             output_config['DEFAULT'].setdefault('dispatcher', 'file')
153         else:
154             output_config['DEFAULT']['dispatcher'] = out_type
155
156         output_config['dispatcher_file']['file_path'] = file_path
157
158         try:
159             target = os.environ['TARGET']
160         except KeyError:
161             pass
162         else:
163             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
164             output_config[k]['target'] = target
165
166     def _get_format_result(self, testcases):
167         criteria = self._get_task_criteria(testcases)
168
169         info = {
170             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
171             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
172             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
173             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
174         }
175
176         result = {
177             'status': 1,
178             'result': {
179                 'criteria': criteria,
180                 'task_id': self.task_id,
181                 'info': info,
182                 'testcases': testcases
183             }
184         }
185
186         return result
187
188     def _get_task_criteria(self, testcases):
189         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
190         if criteria:
191             return 'FAIL'
192         else:
193             return 'PASS'
194
195     def _do_output(self, output_config, result):
196
197         dispatcher = DispatcherBase.get(output_config)
198         dispatcher.flush_result_data(result)
199
200     def _run(self, scenarios, run_in_parallel, output_file):
201         """Deploys context and calls runners"""
202         for context in self.contexts:
203             context.deploy()
204
205         background_runners = []
206
207         result = []
208         # Start all background scenarios
209         for scenario in filter(_is_background_scenario, scenarios):
210             scenario["runner"] = dict(type="Duration", duration=1000000000)
211             runner = self.run_one_scenario(scenario, output_file)
212             background_runners.append(runner)
213
214         runners = []
215         if run_in_parallel:
216             for scenario in scenarios:
217                 if not _is_background_scenario(scenario):
218                     runner = self.run_one_scenario(scenario, output_file)
219                     runners.append(runner)
220
221             # Wait for runners to finish
222             for runner in runners:
223                 status = runner_join(runner)
224                 if status != 0:
225                     raise RuntimeError
226                 self.outputs.update(runner.get_output())
227                 result.extend(runner.get_result())
228                 print("Runner ended, output in", output_file)
229         else:
230             # run serially
231             for scenario in scenarios:
232                 if not _is_background_scenario(scenario):
233                     runner = self.run_one_scenario(scenario, output_file)
234                     status = runner_join(runner)
235                     if status != 0:
236                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
237                         raise RuntimeError
238                     self.outputs.update(runner.get_output())
239                     result.extend(runner.get_result())
240                     print("Runner ended, output in", output_file)
241
242         # Abort background runners
243         for runner in background_runners:
244             runner.abort()
245
246         # Wait for background runners to finish
247         for runner in background_runners:
248             status = runner.join(timeout=60)
249             if status is None:
250                 # Nuke if it did not stop nicely
251                 base_runner.Runner.terminate(runner)
252                 status = runner_join(runner)
253             else:
254                 base_runner.Runner.release(runner)
255
256             self.outputs.update(runner.get_output())
257             result.extend(runner.get_result())
258             print("Background task ended")
259         return result
260
261     def atexit_handler(self):
262         """handler for process termination"""
263         base_runner.Runner.terminate_all()
264
265         if self.contexts:
266             print("Undeploying all contexts")
267             for context in self.contexts[::-1]:
268                 context.undeploy()
269
270     def _parse_options(self, op):
271         if isinstance(op, dict):
272             return {k: self._parse_options(v) for k, v in op.items()}
273         elif isinstance(op, list):
274             return [self._parse_options(v) for v in op]
275         elif isinstance(op, str):
276             return self.outputs.get(op[1:]) if op.startswith('$') else op
277         else:
278             return op
279
280     def run_one_scenario(self, scenario_cfg, output_file):
281         """run one scenario using context"""
282         runner_cfg = scenario_cfg["runner"]
283         runner_cfg['output_filename'] = output_file
284
285         options = scenario_cfg.get('options', {})
286         scenario_cfg['options'] = self._parse_options(options)
287
288         # TODO support get multi hosts/vms info
289         context_cfg = {}
290         if "host" in scenario_cfg:
291             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
292
293         if "target" in scenario_cfg:
294             if is_ip_addr(scenario_cfg["target"]):
295                 context_cfg['target'] = {}
296                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
297             else:
298                 context_cfg['target'] = Context.get_server(
299                     scenario_cfg["target"])
300                 if self._is_same_heat_context(scenario_cfg["host"],
301                                               scenario_cfg["target"]):
302                     context_cfg["target"]["ipaddr"] = \
303                         context_cfg["target"]["private_ip"]
304                 else:
305                     context_cfg["target"]["ipaddr"] = \
306                         context_cfg["target"]["ip"]
307
308         if "targets" in scenario_cfg:
309             ip_list = []
310             for target in scenario_cfg["targets"]:
311                 if is_ip_addr(target):
312                     ip_list.append(target)
313                     context_cfg['target'] = {}
314                 else:
315                     context_cfg['target'] = Context.get_server(target)
316                     if self._is_same_heat_context(scenario_cfg["host"],
317                                                   target):
318                         ip_list.append(context_cfg["target"]["private_ip"])
319                     else:
320                         ip_list.append(context_cfg["target"]["ip"])
321             context_cfg['target']['ipaddr'] = ','.join(ip_list)
322
323         if "nodes" in scenario_cfg:
324             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
325         runner = base_runner.Runner.get(runner_cfg)
326
327         print("Starting runner of type '%s'" % runner_cfg["type"])
328         runner.run(scenario_cfg, context_cfg)
329
330         return runner
331
332     def _is_same_heat_context(self, host_attr, target_attr):
333         """check if two servers are in the same heat context
334         host_attr: either a name for a server created by yardstick or a dict
335         with attribute name mapping when using external heat templates
336         target_attr: either a name for a server created by yardstick or a dict
337         with attribute name mapping when using external heat templates
338         """
339         host = None
340         target = None
341         for context in self.contexts:
342             if context.__context_type__ != "Heat":
343                 continue
344
345             host = context._get_server(host_attr)
346             if host is None:
347                 continue
348
349             target = context._get_server(target_attr)
350             if target is None:
351                 return False
352
353             # Both host and target is not None, then they are in the
354             # same heat context.
355             return True
356
357         return False
358
359
360 class TaskParser(object):       # pragma: no cover
361     """Parser for task config files in yaml format"""
362
363     def __init__(self, path):
364         self.path = path
365
366     def _meet_constraint(self, task, cur_pod, cur_installer):
367         if "constraint" in task:
368             constraint = task.get('constraint', None)
369             if constraint is not None:
370                 tc_fit_pod = constraint.get('pod', None)
371                 tc_fit_installer = constraint.get('installer', None)
372                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
373                          cur_pod, cur_installer, constraint)
374                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
375                     return False
376                 if (cur_installer is None) or (tc_fit_installer and cur_installer
377                                                not in tc_fit_installer):
378                     return False
379         return True
380
381     def _get_task_para(self, task, cur_pod):
382         task_args = task.get('task_args', None)
383         if task_args is not None:
384             task_args = task_args.get(cur_pod, task_args.get('default'))
385         task_args_fnames = task.get('task_args_fnames', None)
386         if task_args_fnames is not None:
387             task_args_fnames = task_args_fnames.get(cur_pod, None)
388         return task_args, task_args_fnames
389
390     def parse_suite(self):
391         """parse the suite file and return a list of task config file paths
392            and lists of optional parameters if present"""
393         LOG.info("\nParsing suite file:%s", self.path)
394
395         try:
396             with open(self.path) as stream:
397                 cfg = yaml.load(stream)
398         except IOError as ioerror:
399             sys.exit(ioerror)
400
401         self._check_schema(cfg["schema"], "suite")
402         LOG.info("\nStarting scenario:%s", cfg["name"])
403
404         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
405         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
406                                       test_cases_dir)
407         if test_cases_dir[-1] != os.sep:
408             test_cases_dir += os.sep
409
410         cur_pod = os.environ.get('NODE_NAME', None)
411         cur_installer = os.environ.get('INSTALLER_TYPE', None)
412
413         valid_task_files = []
414         valid_task_args = []
415         valid_task_args_fnames = []
416
417         for task in cfg["test_cases"]:
418             # 1.check file_name
419             if "file_name" in task:
420                 task_fname = task.get('file_name', None)
421                 if task_fname is None:
422                     continue
423             else:
424                 continue
425             # 2.check constraint
426             if self._meet_constraint(task, cur_pod, cur_installer):
427                 valid_task_files.append(test_cases_dir + task_fname)
428             else:
429                 continue
430             # 3.fetch task parameters
431             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
432             valid_task_args.append(task_args)
433             valid_task_args_fnames.append(task_args_fnames)
434
435         return valid_task_files, valid_task_args, valid_task_args_fnames
436
437     def parse_task(self, task_id, task_args=None, task_args_file=None):
438         """parses the task file and return an context and scenario instances"""
439         print("Parsing task config:", self.path)
440
441         try:
442             kw = {}
443             if task_args_file:
444                 with open(task_args_file) as f:
445                     kw.update(parse_task_args("task_args_file", f.read()))
446             kw.update(parse_task_args("task_args", task_args))
447         except TypeError:
448             raise TypeError()
449
450         try:
451             with open(self.path) as f:
452                 try:
453                     input_task = f.read()
454                     rendered_task = TaskTemplate.render(input_task, **kw)
455                 except Exception as e:
456                     print("Failed to render template:\n%(task)s\n%(err)s\n"
457                           % {"task": input_task, "err": e})
458                     raise e
459                 print("Input task is:\n%s\n" % rendered_task)
460
461                 cfg = yaml.load(rendered_task)
462         except IOError as ioerror:
463             sys.exit(ioerror)
464
465         self._check_schema(cfg["schema"], "task")
466         meet_precondition = self._check_precondition(cfg)
467
468         # TODO: support one or many contexts? Many would simpler and precise
469         # TODO: support hybrid context type
470         if "context" in cfg:
471             context_cfgs = [cfg["context"]]
472         elif "contexts" in cfg:
473             context_cfgs = cfg["contexts"]
474         else:
475             context_cfgs = [{"type": "Dummy"}]
476
477         contexts = []
478         name_suffix = '-{}'.format(task_id[:8])
479         for cfg_attrs in context_cfgs:
480             try:
481                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
482                                                   name_suffix)
483             except KeyError:
484                 pass
485             # default to Heat context because we are testing OpenStack
486             context_type = cfg_attrs.get("type", "Heat")
487             context = Context.get(context_type)
488             context.init(cfg_attrs)
489             contexts.append(context)
490
491         run_in_parallel = cfg.get("run_in_parallel", False)
492
493         # add tc and task id for influxdb extended tags
494         for scenario in cfg["scenarios"]:
495             task_name = os.path.splitext(os.path.basename(self.path))[0]
496             scenario["tc"] = task_name
497             scenario["task_id"] = task_id
498             # embed task path into scenario so we can load other files
499             # relative to task path
500             scenario["task_path"] = os.path.dirname(self.path)
501
502             change_server_name(scenario, name_suffix)
503
504             try:
505                 for node in scenario['nodes']:
506                     scenario['nodes'][node] += name_suffix
507             except KeyError:
508                 pass
509
510         # TODO we need something better here, a class that represent the file
511         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
512
513     def _check_schema(self, cfg_schema, schema_type):
514         """Check if config file is using the correct schema type"""
515
516         if cfg_schema != "yardstick:" + schema_type + ":0.1":
517             sys.exit("error: file %s has unknown schema %s" % (self.path,
518                                                                cfg_schema))
519
520     def _check_precondition(self, cfg):
521         """Check if the envrionment meet the preconditon"""
522
523         if "precondition" in cfg:
524             precondition = cfg["precondition"]
525             installer_type = precondition.get("installer_type", None)
526             deploy_scenarios = precondition.get("deploy_scenarios", None)
527             tc_fit_pods = precondition.get("pod_name", None)
528             installer_type_env = os.environ.get('INSTALL_TYPE', None)
529             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
530             pod_name_env = os.environ.get('NODE_NAME', None)
531
532             LOG.info("installer_type: %s, installer_type_env: %s",
533                      installer_type, installer_type_env)
534             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
535                      deploy_scenarios, deploy_scenario_env)
536             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
537                      tc_fit_pods, pod_name_env)
538             if installer_type and installer_type_env:
539                 if installer_type_env not in installer_type:
540                     return False
541             if deploy_scenarios and deploy_scenario_env:
542                 deploy_scenarios_list = deploy_scenarios.split(',')
543                 for deploy_scenario in deploy_scenarios_list:
544                     if deploy_scenario_env.startswith(deploy_scenario):
545                         return True
546                 return False
547             if tc_fit_pods and pod_name_env:
548                 if pod_name_env not in tc_fit_pods:
549                     return False
550         return True
551
552
553 def is_ip_addr(addr):
554     """check if string addr is an IP address"""
555     try:
556         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
557     except AttributeError:
558         pass
559
560     try:
561         ipaddress.ip_address(addr.encode('utf-8'))
562     except ValueError:
563         return False
564     else:
565         return True
566
567
568 def _is_background_scenario(scenario):
569     if "run_in_background" in scenario:
570         return scenario["run_in_background"]
571     else:
572         return False
573
574
575 def parse_nodes_with_context(scenario_cfg):
576     """paras the 'nodes' fields in scenario """
577     nodes = scenario_cfg["nodes"]
578
579     nodes_cfg = {}
580     for nodename in nodes:
581         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
582
583     return nodes_cfg
584
585
586 def runner_join(runner):
587     """join (wait for) a runner, exit process at runner failure"""
588     status = runner.join()
589     base_runner.Runner.release(runner)
590     return status
591
592
593 def print_invalid_header(source_name, args):
594     print("Invalid %(source)s passed:\n\n %(args)s\n"
595           % {"source": source_name, "args": args})
596
597
598 def parse_task_args(src_name, args):
599     if isinstance(args, collections.Mapping):
600         return args
601
602     try:
603         kw = args and yaml.safe_load(args)
604         kw = {} if kw is None else kw
605     except yaml.parser.ParserError as e:
606         print_invalid_header(src_name, args)
607         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
608               % {"source": src_name, "err": e})
609         raise TypeError()
610
611     if not isinstance(kw, dict):
612         print_invalid_header(src_name, args)
613         print("%(src)s had to be dict, actually %(src_type)s\n"
614               % {"src": src_name, "src_type": type(kw)})
615         raise TypeError()
616     return kw
617
618
619 def check_environment():
620     auth_url = os.environ.get('OS_AUTH_URL', None)
621     if not auth_url:
622         try:
623             source_env(constants.OPENRC)
624         except IOError as e:
625             if e.errno != errno.EEXIST:
626                 raise
627             LOG.debug('OPENRC file not found')
628
629
630 def change_server_name(scenario, suffix):
631     try:
632         host = scenario['host']
633     except KeyError:
634         pass
635     else:
636         try:
637             host['name'] += suffix
638         except TypeError:
639             scenario['host'] += suffix
640
641     try:
642         target = scenario['target']
643     except KeyError:
644         pass
645     else:
646         try:
647             target['name'] += suffix
648         except TypeError:
649             scenario['target'] += suffix
650
651     try:
652         key = 'targets'
653         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
654     except KeyError:
655         pass