Merge "Open storperf testcase to huawei-pod2"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 import collections
24
25 from six.moves import filter
26
27 from yardstick.benchmark.contexts.base import Context
28 from yardstick.benchmark.runners import base as base_runner
29 from yardstick.dispatcher.base import Base as DispatcherBase
30 from yardstick.common.task_template import TaskTemplate
31 from yardstick.common.utils import source_env
32 from yardstick.common import utils
33 from yardstick.common import constants
34
35 output_file_default = "/tmp/yardstick.out"
36 config_file = '/etc/yardstick/yardstick.conf'
37 test_cases_dir_default = "tests/opnfv/test_cases/"
38 LOG = logging.getLogger(__name__)
39
40
41 class Task(object):     # pragma: no cover
42     """Task commands.
43
44        Set of commands to manage benchmark tasks.
45     """
46
47     def __init__(self):
48         self.contexts = []
49         self.outputs = {}
50
51     def start(self, args, **kwargs):
52         """Start a benchmark scenario."""
53
54         atexit.register(self.atexit_handler)
55
56         task_id = getattr(args, 'task_id')
57         self.task_id = task_id if task_id else str(uuid.uuid4())
58
59         check_environment()
60
61         output_config = utils.parse_ini_file(config_file)
62         self._init_output_config(output_config)
63         self._set_output_config(output_config, args.output_file)
64         LOG.debug('Output configuration is: %s', output_config)
65
66         if output_config['DEFAULT'].get('dispatcher') == 'file':
67             result = {'status': 0, 'result': {}}
68             utils.write_json_to_file(args.output_file, result)
69
70         total_start_time = time.time()
71         parser = TaskParser(args.inputfile[0])
72
73         if args.suite:
74             # 1.parse suite, return suite_params info
75             task_files, task_args, task_args_fnames = \
76                 parser.parse_suite()
77         else:
78             task_files = [parser.path]
79             task_args = [args.task_args]
80             task_args_fnames = [args.task_args_file]
81
82         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
83                  task_files, task_args, task_args_fnames)
84
85         if args.parse_only:
86             sys.exit(0)
87
88         testcases = {}
89         # parse task_files
90         for i in range(0, len(task_files)):
91             one_task_start_time = time.time()
92             parser.path = task_files[i]
93             scenarios, run_in_parallel, meet_precondition, contexts = \
94                 parser.parse_task(self.task_id, task_args[i],
95                                   task_args_fnames[i])
96
97             self.contexts.extend(contexts)
98
99             if not meet_precondition:
100                 LOG.info("meet_precondition is %s, please check envrionment",
101                          meet_precondition)
102                 continue
103
104             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
105             try:
106                 data = self._run(scenarios, run_in_parallel, args.output_file)
107             except KeyboardInterrupt:
108                 raise
109             except Exception:
110                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
111             else:
112                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
113
114             if args.keep_deploy:
115                 # keep deployment, forget about stack
116                 # (hide it for exit handler)
117                 self.contexts = []
118             else:
119                 for context in self.contexts[::-1]:
120                     context.undeploy()
121                 self.contexts = []
122             one_task_end_time = time.time()
123             LOG.info("task %s finished in %d secs", task_files[i],
124                      one_task_end_time - one_task_start_time)
125
126         result = self._get_format_result(testcases)
127
128         self._do_output(output_config, result)
129
130         total_end_time = time.time()
131         LOG.info("total finished in %d secs",
132                  total_end_time - total_start_time)
133
134         scenario = scenarios[0]
135         print("To generate report execute => yardstick report generate ",
136               scenario['task_id'], scenario['tc'])
137
138         print("Done, exiting")
139         return result
140
141     def _init_output_config(self, output_config):
142         output_config.setdefault('DEFAULT', {})
143         output_config.setdefault('dispatcher_http', {})
144         output_config.setdefault('dispatcher_file', {})
145         output_config.setdefault('dispatcher_influxdb', {})
146         output_config.setdefault('nsb', {})
147
148     def _set_output_config(self, output_config, file_path):
149         try:
150             out_type = os.environ['DISPATCHER']
151         except KeyError:
152             output_config['DEFAULT'].setdefault('dispatcher', 'file')
153         else:
154             output_config['DEFAULT']['dispatcher'] = out_type
155
156         output_config['dispatcher_file']['file_path'] = file_path
157
158         try:
159             target = os.environ['TARGET']
160         except KeyError:
161             pass
162         else:
163             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
164             output_config[k]['target'] = target
165
166     def _get_format_result(self, testcases):
167         criteria = self._get_task_criteria(testcases)
168
169         info = {
170             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
171             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
172             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
173             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
174         }
175
176         result = {
177             'status': 1,
178             'result': {
179                 'criteria': criteria,
180                 'task_id': self.task_id,
181                 'info': info,
182                 'testcases': testcases
183             }
184         }
185
186         return result
187
188     def _get_task_criteria(self, testcases):
189         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
190         if criteria:
191             return 'FAIL'
192         else:
193             return 'PASS'
194
195     def _do_output(self, output_config, result):
196
197         dispatcher = DispatcherBase.get(output_config)
198         dispatcher.flush_result_data(result)
199
200     def _run(self, scenarios, run_in_parallel, output_file):
201         """Deploys context and calls runners"""
202         for context in self.contexts:
203             context.deploy()
204
205         background_runners = []
206
207         result = []
208         # Start all background scenarios
209         for scenario in filter(_is_background_scenario, scenarios):
210             scenario["runner"] = dict(type="Duration", duration=1000000000)
211             runner = self.run_one_scenario(scenario, output_file)
212             background_runners.append(runner)
213
214         runners = []
215         if run_in_parallel:
216             for scenario in scenarios:
217                 if not _is_background_scenario(scenario):
218                     runner = self.run_one_scenario(scenario, output_file)
219                     runners.append(runner)
220
221             # Wait for runners to finish
222             for runner in runners:
223                 status = runner_join(runner)
224                 if status != 0:
225                     raise RuntimeError
226                 self.outputs.update(runner.get_output())
227                 result.extend(runner.get_result())
228                 print("Runner ended, output in", output_file)
229         else:
230             # run serially
231             for scenario in scenarios:
232                 if not _is_background_scenario(scenario):
233                     runner = self.run_one_scenario(scenario, output_file)
234                     status = runner_join(runner)
235                     if status != 0:
236                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
237                         raise RuntimeError
238                     self.outputs.update(runner.get_output())
239                     result.extend(runner.get_result())
240                     print("Runner ended, output in", output_file)
241
242         # Abort background runners
243         for runner in background_runners:
244             runner.abort()
245
246         # Wait for background runners to finish
247         for runner in background_runners:
248             status = runner.join(timeout=60)
249             if status is None:
250                 # Nuke if it did not stop nicely
251                 base_runner.Runner.terminate(runner)
252                 status = runner_join(runner)
253             else:
254                 base_runner.Runner.release(runner)
255
256             self.outputs.update(runner.get_output())
257             result.extend(runner.get_result())
258             print("Background task ended")
259         return result
260
261     def atexit_handler(self):
262         """handler for process termination"""
263         base_runner.Runner.terminate_all()
264
265         if self.contexts:
266             print("Undeploying all contexts")
267             for context in self.contexts[::-1]:
268                 context.undeploy()
269
270     def _parse_options(self, op):
271         if isinstance(op, dict):
272             return {k: self._parse_options(v) for k, v in op.items()}
273         elif isinstance(op, list):
274             return [self._parse_options(v) for v in op]
275         elif isinstance(op, str):
276             return self.outputs.get(op[1:]) if op.startswith('$') else op
277         else:
278             return op
279
280     def run_one_scenario(self, scenario_cfg, output_file):
281         """run one scenario using context"""
282         runner_cfg = scenario_cfg["runner"]
283         runner_cfg['output_filename'] = output_file
284
285         options = scenario_cfg.get('options', {})
286         scenario_cfg['options'] = self._parse_options(options)
287
288         # TODO support get multi hosts/vms info
289         context_cfg = {}
290         if "host" in scenario_cfg:
291             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
292
293         if "target" in scenario_cfg:
294             if is_ip_addr(scenario_cfg["target"]):
295                 context_cfg['target'] = {}
296                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
297             else:
298                 context_cfg['target'] = Context.get_server(
299                     scenario_cfg["target"])
300                 if self._is_same_heat_context(scenario_cfg["host"],
301                                               scenario_cfg["target"]):
302                     context_cfg["target"]["ipaddr"] = \
303                         context_cfg["target"]["private_ip"]
304                 else:
305                     context_cfg["target"]["ipaddr"] = \
306                         context_cfg["target"]["ip"]
307
308         if "targets" in scenario_cfg:
309             ip_list = []
310             for target in scenario_cfg["targets"]:
311                 if is_ip_addr(target):
312                     ip_list.append(target)
313                     context_cfg['target'] = {}
314                 else:
315                     context_cfg['target'] = Context.get_server(target)
316                     if self._is_same_heat_context(scenario_cfg["host"],
317                                                   target):
318                         ip_list.append(context_cfg["target"]["private_ip"])
319                     else:
320                         ip_list.append(context_cfg["target"]["ip"])
321             context_cfg['target']['ipaddr'] = ','.join(ip_list)
322
323         if "nodes" in scenario_cfg:
324             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
325             context_cfg["networks"] = get_networks_from_nodes(
326                 context_cfg["nodes"])
327         runner = base_runner.Runner.get(runner_cfg)
328
329         print("Starting runner of type '%s'" % runner_cfg["type"])
330         runner.run(scenario_cfg, context_cfg)
331
332         return runner
333
334     def _is_same_heat_context(self, host_attr, target_attr):
335         """check if two servers are in the same heat context
336         host_attr: either a name for a server created by yardstick or a dict
337         with attribute name mapping when using external heat templates
338         target_attr: either a name for a server created by yardstick or a dict
339         with attribute name mapping when using external heat templates
340         """
341         host = None
342         target = None
343         for context in self.contexts:
344             if context.__context_type__ != "Heat":
345                 continue
346
347             host = context._get_server(host_attr)
348             if host is None:
349                 continue
350
351             target = context._get_server(target_attr)
352             if target is None:
353                 return False
354
355             # Both host and target is not None, then they are in the
356             # same heat context.
357             return True
358
359         return False
360
361
362 class TaskParser(object):       # pragma: no cover
363     """Parser for task config files in yaml format"""
364
365     def __init__(self, path):
366         self.path = path
367
368     def _meet_constraint(self, task, cur_pod, cur_installer):
369         if "constraint" in task:
370             constraint = task.get('constraint', None)
371             if constraint is not None:
372                 tc_fit_pod = constraint.get('pod', None)
373                 tc_fit_installer = constraint.get('installer', None)
374                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
375                          cur_pod, cur_installer, constraint)
376                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
377                     return False
378                 if (cur_installer is None) or (tc_fit_installer and cur_installer
379                                                not in tc_fit_installer):
380                     return False
381         return True
382
383     def _get_task_para(self, task, cur_pod):
384         task_args = task.get('task_args', None)
385         if task_args is not None:
386             task_args = task_args.get(cur_pod, task_args.get('default'))
387         task_args_fnames = task.get('task_args_fnames', None)
388         if task_args_fnames is not None:
389             task_args_fnames = task_args_fnames.get(cur_pod, None)
390         return task_args, task_args_fnames
391
392     def parse_suite(self):
393         """parse the suite file and return a list of task config file paths
394            and lists of optional parameters if present"""
395         LOG.info("\nParsing suite file:%s", self.path)
396
397         try:
398             with open(self.path) as stream:
399                 cfg = yaml.load(stream)
400         except IOError as ioerror:
401             sys.exit(ioerror)
402
403         self._check_schema(cfg["schema"], "suite")
404         LOG.info("\nStarting scenario:%s", cfg["name"])
405
406         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
407         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
408                                       test_cases_dir)
409         if test_cases_dir[-1] != os.sep:
410             test_cases_dir += os.sep
411
412         cur_pod = os.environ.get('NODE_NAME', None)
413         cur_installer = os.environ.get('INSTALLER_TYPE', None)
414
415         valid_task_files = []
416         valid_task_args = []
417         valid_task_args_fnames = []
418
419         for task in cfg["test_cases"]:
420             # 1.check file_name
421             if "file_name" in task:
422                 task_fname = task.get('file_name', None)
423                 if task_fname is None:
424                     continue
425             else:
426                 continue
427             # 2.check constraint
428             if self._meet_constraint(task, cur_pod, cur_installer):
429                 valid_task_files.append(test_cases_dir + task_fname)
430             else:
431                 continue
432             # 3.fetch task parameters
433             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
434             valid_task_args.append(task_args)
435             valid_task_args_fnames.append(task_args_fnames)
436
437         return valid_task_files, valid_task_args, valid_task_args_fnames
438
439     def parse_task(self, task_id, task_args=None, task_args_file=None):
440         """parses the task file and return an context and scenario instances"""
441         print("Parsing task config:", self.path)
442
443         try:
444             kw = {}
445             if task_args_file:
446                 with open(task_args_file) as f:
447                     kw.update(parse_task_args("task_args_file", f.read()))
448             kw.update(parse_task_args("task_args", task_args))
449         except TypeError:
450             raise TypeError()
451
452         try:
453             with open(self.path) as f:
454                 try:
455                     input_task = f.read()
456                     rendered_task = TaskTemplate.render(input_task, **kw)
457                 except Exception as e:
458                     print("Failed to render template:\n%(task)s\n%(err)s\n"
459                           % {"task": input_task, "err": e})
460                     raise e
461                 print("Input task is:\n%s\n" % rendered_task)
462
463                 cfg = yaml.load(rendered_task)
464         except IOError as ioerror:
465             sys.exit(ioerror)
466
467         self._check_schema(cfg["schema"], "task")
468         meet_precondition = self._check_precondition(cfg)
469
470         # TODO: support one or many contexts? Many would simpler and precise
471         # TODO: support hybrid context type
472         if "context" in cfg:
473             context_cfgs = [cfg["context"]]
474         elif "contexts" in cfg:
475             context_cfgs = cfg["contexts"]
476         else:
477             context_cfgs = [{"type": "Dummy"}]
478
479         contexts = []
480         name_suffix = '-{}'.format(task_id[:8])
481         for cfg_attrs in context_cfgs:
482             try:
483                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
484                                                   name_suffix)
485             except KeyError:
486                 pass
487             # default to Heat context because we are testing OpenStack
488             context_type = cfg_attrs.get("type", "Heat")
489             context = Context.get(context_type)
490             context.init(cfg_attrs)
491             contexts.append(context)
492
493         run_in_parallel = cfg.get("run_in_parallel", False)
494
495         # add tc and task id for influxdb extended tags
496         for scenario in cfg["scenarios"]:
497             task_name = os.path.splitext(os.path.basename(self.path))[0]
498             scenario["tc"] = task_name
499             scenario["task_id"] = task_id
500             # embed task path into scenario so we can load other files
501             # relative to task path
502             scenario["task_path"] = os.path.dirname(self.path)
503
504             change_server_name(scenario, name_suffix)
505
506             try:
507                 for node in scenario['nodes']:
508                     scenario['nodes'][node] += name_suffix
509             except KeyError:
510                 pass
511
512         # TODO we need something better here, a class that represent the file
513         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
514
515     def _check_schema(self, cfg_schema, schema_type):
516         """Check if config file is using the correct schema type"""
517
518         if cfg_schema != "yardstick:" + schema_type + ":0.1":
519             sys.exit("error: file %s has unknown schema %s" % (self.path,
520                                                                cfg_schema))
521
522     def _check_precondition(self, cfg):
523         """Check if the environment meet the precondition"""
524
525         if "precondition" in cfg:
526             precondition = cfg["precondition"]
527             installer_type = precondition.get("installer_type", None)
528             deploy_scenarios = precondition.get("deploy_scenarios", None)
529             tc_fit_pods = precondition.get("pod_name", None)
530             installer_type_env = os.environ.get('INSTALL_TYPE', None)
531             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
532             pod_name_env = os.environ.get('NODE_NAME', None)
533
534             LOG.info("installer_type: %s, installer_type_env: %s",
535                      installer_type, installer_type_env)
536             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
537                      deploy_scenarios, deploy_scenario_env)
538             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
539                      tc_fit_pods, pod_name_env)
540             if installer_type and installer_type_env:
541                 if installer_type_env not in installer_type:
542                     return False
543             if deploy_scenarios and deploy_scenario_env:
544                 deploy_scenarios_list = deploy_scenarios.split(',')
545                 for deploy_scenario in deploy_scenarios_list:
546                     if deploy_scenario_env.startswith(deploy_scenario):
547                         return True
548                 return False
549             if tc_fit_pods and pod_name_env:
550                 if pod_name_env not in tc_fit_pods:
551                     return False
552         return True
553
554
555 def is_ip_addr(addr):
556     """check if string addr is an IP address"""
557     try:
558         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
559     except AttributeError:
560         pass
561
562     try:
563         ipaddress.ip_address(addr.encode('utf-8'))
564     except ValueError:
565         return False
566     else:
567         return True
568
569
570 def _is_background_scenario(scenario):
571     if "run_in_background" in scenario:
572         return scenario["run_in_background"]
573     else:
574         return False
575
576
577 def parse_nodes_with_context(scenario_cfg):
578     """parse the 'nodes' fields in scenario """
579     nodes = scenario_cfg["nodes"]
580     return {nodename: Context.get_server(node) for nodename, node in nodes.items()}
581
582
583 def get_networks_from_nodes(nodes):
584     """parse the 'nodes' fields in scenario """
585     networks = {}
586     for node in nodes.values():
587         if not node:
588             continue
589         for interface in node['interfaces'].values():
590             vld_id = interface.get('vld_id')
591             # mgmt network doesn't have vld_id
592             if not vld_id:
593                 continue
594             network = Context.get_network({"vld_id": vld_id})
595             if network:
596                 networks[network['name']] = network
597     return networks
598
599
600 def runner_join(runner):
601     """join (wait for) a runner, exit process at runner failure"""
602     status = runner.join()
603     base_runner.Runner.release(runner)
604     return status
605
606
607 def print_invalid_header(source_name, args):
608     print("Invalid %(source)s passed:\n\n %(args)s\n"
609           % {"source": source_name, "args": args})
610
611
612 def parse_task_args(src_name, args):
613     if isinstance(args, collections.Mapping):
614         return args
615
616     try:
617         kw = args and yaml.safe_load(args)
618         kw = {} if kw is None else kw
619     except yaml.parser.ParserError as e:
620         print_invalid_header(src_name, args)
621         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
622               % {"source": src_name, "err": e})
623         raise TypeError()
624
625     if not isinstance(kw, dict):
626         print_invalid_header(src_name, args)
627         print("%(src)s had to be dict, actually %(src_type)s\n"
628               % {"src": src_name, "src_type": type(kw)})
629         raise TypeError()
630     return kw
631
632
633 def check_environment():
634     auth_url = os.environ.get('OS_AUTH_URL', None)
635     if not auth_url:
636         try:
637             source_env(constants.OPENRC)
638         except IOError as e:
639             if e.errno != errno.EEXIST:
640                 raise
641             LOG.debug('OPENRC file not found')
642
643
644 def change_server_name(scenario, suffix):
645     try:
646         host = scenario['host']
647     except KeyError:
648         pass
649     else:
650         try:
651             host['name'] += suffix
652         except TypeError:
653             scenario['host'] += suffix
654
655     try:
656         target = scenario['target']
657     except KeyError:
658         pass
659     else:
660         try:
661             target['name'] += suffix
662         except TypeError:
663             scenario['target'] += suffix
664
665     try:
666         key = 'targets'
667         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
668     except KeyError:
669         pass