Merge "increase line length to 99"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 from six.moves import filter
24
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.dispatcher.base import Base as DispatcherBase
28 from yardstick.common.task_template import TaskTemplate
29 from yardstick.common.utils import source_env
30 from yardstick.common import utils
31 from yardstick.common import constants
32
33 output_file_default = "/tmp/yardstick.out"
34 config_file = '/etc/yardstick/yardstick.conf'
35 test_cases_dir_default = "tests/opnfv/test_cases/"
36 LOG = logging.getLogger(__name__)
37
38
39 class Task(object):     # pragma: no cover
40     """Task commands.
41
42        Set of commands to manage benchmark tasks.
43     """
44
45     def __init__(self):
46         self.contexts = []
47         self.outputs = {}
48
49     def start(self, args, **kwargs):
50         """Start a benchmark scenario."""
51
52         atexit.register(self.atexit_handler)
53
54         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
55
56         check_environment()
57
58         output_config = utils.parse_ini_file(config_file)
59         self._init_output_config(output_config)
60         self._set_output_config(output_config, args.output_file)
61         LOG.debug('Output configuration is: %s', output_config)
62
63         if output_config['DEFAULT'].get('dispatcher') == 'file':
64             result = {'status': 0, 'result': {}}
65             utils.write_json_to_file(args.output_file, result)
66
67         total_start_time = time.time()
68         parser = TaskParser(args.inputfile[0])
69
70         if args.suite:
71             # 1.parse suite, return suite_params info
72             task_files, task_args, task_args_fnames = \
73                 parser.parse_suite()
74         else:
75             task_files = [parser.path]
76             task_args = [args.task_args]
77             task_args_fnames = [args.task_args_file]
78
79         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
80                  task_files, task_args, task_args_fnames)
81
82         if args.parse_only:
83             sys.exit(0)
84
85         testcases = {}
86         # parse task_files
87         for i in range(0, len(task_files)):
88             one_task_start_time = time.time()
89             parser.path = task_files[i]
90             scenarios, run_in_parallel, meet_precondition, contexts = \
91                 parser.parse_task(self.task_id, task_args[i],
92                                   task_args_fnames[i])
93
94             self.contexts.extend(contexts)
95
96             if not meet_precondition:
97                 LOG.info("meet_precondition is %s, please check envrionment",
98                          meet_precondition)
99                 continue
100
101             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
102             try:
103                 data = self._run(scenarios, run_in_parallel, args.output_file)
104             except KeyboardInterrupt:
105                 raise
106             except Exception:
107                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
108             else:
109                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
110
111             if args.keep_deploy:
112                 # keep deployment, forget about stack
113                 # (hide it for exit handler)
114                 self.contexts = []
115             else:
116                 for context in self.contexts[::-1]:
117                     context.undeploy()
118                 self.contexts = []
119             one_task_end_time = time.time()
120             LOG.info("task %s finished in %d secs", task_files[i],
121                      one_task_end_time - one_task_start_time)
122
123         result = self._get_format_result(testcases)
124
125         self._do_output(output_config, result)
126
127         total_end_time = time.time()
128         LOG.info("total finished in %d secs",
129                  total_end_time - total_start_time)
130
131         scenario = scenarios[0]
132         print("To generate report execute => yardstick report generate ",
133               scenario['task_id'], scenario['tc'])
134
135         print("Done, exiting")
136
137     def _init_output_config(self, output_config):
138         output_config.setdefault('DEFAULT', {})
139         output_config.setdefault('dispatcher_http', {})
140         output_config.setdefault('dispatcher_file', {})
141         output_config.setdefault('dispatcher_influxdb', {})
142         output_config.setdefault('nsb', {})
143
144     def _set_output_config(self, output_config, file_path):
145         try:
146             out_type = os.environ['DISPATCHER']
147         except KeyError:
148             output_config['DEFAULT'].setdefault('dispatcher', 'file')
149         else:
150             output_config['DEFAULT']['dispatcher'] = out_type
151
152         output_config['dispatcher_file']['file_path'] = file_path
153
154         try:
155             target = os.environ['TARGET']
156         except KeyError:
157             pass
158         else:
159             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
160             output_config[k]['target'] = target
161
162     def _get_format_result(self, testcases):
163         criteria = self._get_task_criteria(testcases)
164
165         info = {
166             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
167             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
168             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
169             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
170         }
171
172         result = {
173             'status': 1,
174             'result': {
175                 'criteria': criteria,
176                 'task_id': self.task_id,
177                 'info': info,
178                 'testcases': testcases
179             }
180         }
181
182         return result
183
184     def _get_task_criteria(self, testcases):
185         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
186         if criteria:
187             return 'FAIL'
188         else:
189             return 'PASS'
190
191     def _do_output(self, output_config, result):
192
193         dispatcher = DispatcherBase.get(output_config)
194         dispatcher.flush_result_data(result)
195
196     def _run(self, scenarios, run_in_parallel, output_file):
197         """Deploys context and calls runners"""
198         for context in self.contexts:
199             context.deploy()
200
201         background_runners = []
202
203         result = []
204         # Start all background scenarios
205         for scenario in filter(_is_background_scenario, scenarios):
206             scenario["runner"] = dict(type="Duration", duration=1000000000)
207             runner = self.run_one_scenario(scenario, output_file)
208             background_runners.append(runner)
209
210         runners = []
211         if run_in_parallel:
212             for scenario in scenarios:
213                 if not _is_background_scenario(scenario):
214                     runner = self.run_one_scenario(scenario, output_file)
215                     runners.append(runner)
216
217             # Wait for runners to finish
218             for runner in runners:
219                 status = runner_join(runner)
220                 if status != 0:
221                     raise RuntimeError
222                 self.outputs.update(runner.get_output())
223                 result.extend(runner.get_result())
224                 print("Runner ended, output in", output_file)
225         else:
226             # run serially
227             for scenario in scenarios:
228                 if not _is_background_scenario(scenario):
229                     runner = self.run_one_scenario(scenario, output_file)
230                     status = runner_join(runner)
231                     if status != 0:
232                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
233                         raise RuntimeError
234                     self.outputs.update(runner.get_output())
235                     result.extend(runner.get_result())
236                     print("Runner ended, output in", output_file)
237
238         # Abort background runners
239         for runner in background_runners:
240             runner.abort()
241
242         # Wait for background runners to finish
243         for runner in background_runners:
244             status = runner.join(timeout=60)
245             if status is None:
246                 # Nuke if it did not stop nicely
247                 base_runner.Runner.terminate(runner)
248                 status = runner_join(runner)
249                 self.outputs.update(runner.get_output())
250                 result.extend(runner.get_result())
251             else:
252                 base_runner.Runner.release(runner)
253             if status != 0:
254                 raise RuntimeError
255             print("Background task ended")
256
257         return result
258
259     def atexit_handler(self):
260         """handler for process termination"""
261         base_runner.Runner.terminate_all()
262
263         if self.contexts:
264             print("Undeploying all contexts")
265             for context in self.contexts[::-1]:
266                 context.undeploy()
267
268     def _parse_options(self, op):
269         if isinstance(op, dict):
270             return {k: self._parse_options(v) for k, v in op.items()}
271         elif isinstance(op, list):
272             return [self._parse_options(v) for v in op]
273         elif isinstance(op, str):
274             return self.outputs.get(op[1:]) if op.startswith('$') else op
275         else:
276             return op
277
278     def run_one_scenario(self, scenario_cfg, output_file):
279         """run one scenario using context"""
280         runner_cfg = scenario_cfg["runner"]
281         runner_cfg['output_filename'] = output_file
282
283         options = scenario_cfg.get('options', {})
284         scenario_cfg['options'] = self._parse_options(options)
285
286         # TODO support get multi hosts/vms info
287         context_cfg = {}
288         if "host" in scenario_cfg:
289             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
290
291         if "target" in scenario_cfg:
292             if is_ip_addr(scenario_cfg["target"]):
293                 context_cfg['target'] = {}
294                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
295             else:
296                 context_cfg['target'] = Context.get_server(
297                     scenario_cfg["target"])
298                 if self._is_same_heat_context(scenario_cfg["host"],
299                                               scenario_cfg["target"]):
300                     context_cfg["target"]["ipaddr"] = \
301                         context_cfg["target"]["private_ip"]
302                 else:
303                     context_cfg["target"]["ipaddr"] = \
304                         context_cfg["target"]["ip"]
305
306         if "targets" in scenario_cfg:
307             ip_list = []
308             for target in scenario_cfg["targets"]:
309                 if is_ip_addr(target):
310                     ip_list.append(target)
311                     context_cfg['target'] = {}
312                 else:
313                     context_cfg['target'] = Context.get_server(target)
314                     if self._is_same_heat_context(scenario_cfg["host"],
315                                                   target):
316                         ip_list.append(context_cfg["target"]["private_ip"])
317                     else:
318                         ip_list.append(context_cfg["target"]["ip"])
319             context_cfg['target']['ipaddr'] = ','.join(ip_list)
320
321         if "nodes" in scenario_cfg:
322             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
323         runner = base_runner.Runner.get(runner_cfg)
324
325         print("Starting runner of type '%s'" % runner_cfg["type"])
326         runner.run(scenario_cfg, context_cfg)
327
328         return runner
329
330     def _is_same_heat_context(self, host_attr, target_attr):
331         """check if two servers are in the same heat context
332         host_attr: either a name for a server created by yardstick or a dict
333         with attribute name mapping when using external heat templates
334         target_attr: either a name for a server created by yardstick or a dict
335         with attribute name mapping when using external heat templates
336         """
337         host = None
338         target = None
339         for context in self.contexts:
340             if context.__context_type__ != "Heat":
341                 continue
342
343             host = context._get_server(host_attr)
344             if host is None:
345                 continue
346
347             target = context._get_server(target_attr)
348             if target is None:
349                 return False
350
351             # Both host and target is not None, then they are in the
352             # same heat context.
353             return True
354
355         return False
356
357
358 class TaskParser(object):       # pragma: no cover
359     """Parser for task config files in yaml format"""
360
361     def __init__(self, path):
362         self.path = path
363
364     def _meet_constraint(self, task, cur_pod, cur_installer):
365         if "constraint" in task:
366             constraint = task.get('constraint', None)
367             if constraint is not None:
368                 tc_fit_pod = constraint.get('pod', None)
369                 tc_fit_installer = constraint.get('installer', None)
370                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
371                          cur_pod, cur_installer, constraint)
372                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
373                     return False
374                 if cur_installer and tc_fit_installer and \
375                         cur_installer not in tc_fit_installer:
376                     return False
377         return True
378
379     def _get_task_para(self, task, cur_pod):
380         task_args = task.get('task_args', None)
381         if task_args is not None:
382             task_args = task_args.get(cur_pod, None)
383         task_args_fnames = task.get('task_args_fnames', None)
384         if task_args_fnames is not None:
385             task_args_fnames = task_args_fnames.get(cur_pod, None)
386         return task_args, task_args_fnames
387
388     def parse_suite(self):
389         """parse the suite file and return a list of task config file paths
390            and lists of optional parameters if present"""
391         LOG.info("\nParsing suite file:%s", self.path)
392
393         try:
394             with open(self.path) as stream:
395                 cfg = yaml.load(stream)
396         except IOError as ioerror:
397             sys.exit(ioerror)
398
399         self._check_schema(cfg["schema"], "suite")
400         LOG.info("\nStarting scenario:%s", cfg["name"])
401
402         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
403         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
404                                       test_cases_dir)
405         if test_cases_dir[-1] != os.sep:
406             test_cases_dir += os.sep
407
408         cur_pod = os.environ.get('NODE_NAME', None)
409         cur_installer = os.environ.get('INSTALLER_TYPE', None)
410
411         valid_task_files = []
412         valid_task_args = []
413         valid_task_args_fnames = []
414
415         for task in cfg["test_cases"]:
416             # 1.check file_name
417             if "file_name" in task:
418                 task_fname = task.get('file_name', None)
419                 if task_fname is None:
420                     continue
421             else:
422                 continue
423             # 2.check constraint
424             if self._meet_constraint(task, cur_pod, cur_installer):
425                 valid_task_files.append(test_cases_dir + task_fname)
426             else:
427                 continue
428             # 3.fetch task parameters
429             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
430             valid_task_args.append(task_args)
431             valid_task_args_fnames.append(task_args_fnames)
432
433         return valid_task_files, valid_task_args, valid_task_args_fnames
434
435     def parse_task(self, task_id, task_args=None, task_args_file=None):
436         """parses the task file and return an context and scenario instances"""
437         print("Parsing task config:", self.path)
438
439         try:
440             kw = {}
441             if task_args_file:
442                 with open(task_args_file) as f:
443                     kw.update(parse_task_args("task_args_file", f.read()))
444             kw.update(parse_task_args("task_args", task_args))
445         except TypeError:
446             raise TypeError()
447
448         try:
449             with open(self.path) as f:
450                 try:
451                     input_task = f.read()
452                     rendered_task = TaskTemplate.render(input_task, **kw)
453                 except Exception as e:
454                     print("Failed to render template:\n%(task)s\n%(err)s\n"
455                           % {"task": input_task, "err": e})
456                     raise e
457                 print("Input task is:\n%s\n" % rendered_task)
458
459                 cfg = yaml.load(rendered_task)
460         except IOError as ioerror:
461             sys.exit(ioerror)
462
463         self._check_schema(cfg["schema"], "task")
464         meet_precondition = self._check_precondition(cfg)
465
466         # TODO: support one or many contexts? Many would simpler and precise
467         # TODO: support hybrid context type
468         if "context" in cfg:
469             context_cfgs = [cfg["context"]]
470         elif "contexts" in cfg:
471             context_cfgs = cfg["contexts"]
472         else:
473             context_cfgs = [{"type": "Dummy"}]
474
475         contexts = []
476         name_suffix = '-{}'.format(task_id[:8])
477         for cfg_attrs in context_cfgs:
478             try:
479                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
480                                                   name_suffix)
481             except KeyError:
482                 pass
483             # default to Heat context because we are testing OpenStack
484             context_type = cfg_attrs.get("type", "Heat")
485             context = Context.get(context_type)
486             context.init(cfg_attrs)
487             contexts.append(context)
488
489         run_in_parallel = cfg.get("run_in_parallel", False)
490
491         # add tc and task id for influxdb extended tags
492         for scenario in cfg["scenarios"]:
493             task_name = os.path.splitext(os.path.basename(self.path))[0]
494             scenario["tc"] = task_name
495             scenario["task_id"] = task_id
496             # embed task path into scenario so we can load other files
497             # relative to task path
498             scenario["task_path"] = os.path.dirname(self.path)
499
500             change_server_name(scenario, name_suffix)
501
502             try:
503                 for node in scenario['nodes']:
504                     scenario['nodes'][node] += name_suffix
505             except KeyError:
506                 pass
507
508         # TODO we need something better here, a class that represent the file
509         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
510
511     def _check_schema(self, cfg_schema, schema_type):
512         """Check if config file is using the correct schema type"""
513
514         if cfg_schema != "yardstick:" + schema_type + ":0.1":
515             sys.exit("error: file %s has unknown schema %s" % (self.path,
516                                                                cfg_schema))
517
518     def _check_precondition(self, cfg):
519         """Check if the envrionment meet the preconditon"""
520
521         if "precondition" in cfg:
522             precondition = cfg["precondition"]
523             installer_type = precondition.get("installer_type", None)
524             deploy_scenarios = precondition.get("deploy_scenarios", None)
525             tc_fit_pods = precondition.get("pod_name", None)
526             installer_type_env = os.environ.get('INSTALL_TYPE', None)
527             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
528             pod_name_env = os.environ.get('NODE_NAME', None)
529
530             LOG.info("installer_type: %s, installer_type_env: %s",
531                      installer_type, installer_type_env)
532             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
533                      deploy_scenarios, deploy_scenario_env)
534             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
535                      tc_fit_pods, pod_name_env)
536             if installer_type and installer_type_env:
537                 if installer_type_env not in installer_type:
538                     return False
539             if deploy_scenarios and deploy_scenario_env:
540                 deploy_scenarios_list = deploy_scenarios.split(',')
541                 for deploy_scenario in deploy_scenarios_list:
542                     if deploy_scenario_env.startswith(deploy_scenario):
543                         return True
544                 return False
545             if tc_fit_pods and pod_name_env:
546                 if pod_name_env not in tc_fit_pods:
547                     return False
548         return True
549
550
551 def is_ip_addr(addr):
552     """check if string addr is an IP address"""
553     try:
554         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
555     except AttributeError:
556         pass
557
558     try:
559         ipaddress.ip_address(addr.encode('utf-8'))
560     except ValueError:
561         return False
562     else:
563         return True
564
565
566 def _is_background_scenario(scenario):
567     if "run_in_background" in scenario:
568         return scenario["run_in_background"]
569     else:
570         return False
571
572
573 def parse_nodes_with_context(scenario_cfg):
574     """paras the 'nodes' fields in scenario """
575     nodes = scenario_cfg["nodes"]
576
577     nodes_cfg = {}
578     for nodename in nodes:
579         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
580
581     return nodes_cfg
582
583
584 def runner_join(runner):
585     """join (wait for) a runner, exit process at runner failure"""
586     status = runner.join()
587     base_runner.Runner.release(runner)
588     return status
589
590
591 def print_invalid_header(source_name, args):
592     print("Invalid %(source)s passed:\n\n %(args)s\n"
593           % {"source": source_name, "args": args})
594
595
596 def parse_task_args(src_name, args):
597     try:
598         kw = args and yaml.safe_load(args)
599         kw = {} if kw is None else kw
600     except yaml.parser.ParserError as e:
601         print_invalid_header(src_name, args)
602         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
603               % {"source": src_name, "err": e})
604         raise TypeError()
605
606     if not isinstance(kw, dict):
607         print_invalid_header(src_name, args)
608         print("%(src)s had to be dict, actually %(src_type)s\n"
609               % {"src": src_name, "src_type": type(kw)})
610         raise TypeError()
611     return kw
612
613
614 def check_environment():
615     auth_url = os.environ.get('OS_AUTH_URL', None)
616     if not auth_url:
617         try:
618             source_env(constants.OPENRC)
619         except IOError as e:
620             if e.errno != errno.EEXIST:
621                 raise
622             LOG.debug('OPENRC file not found')
623
624
625 def change_server_name(scenario, suffix):
626     try:
627         host = scenario['host']
628     except KeyError:
629         pass
630     else:
631         try:
632             host['name'] += suffix
633         except TypeError:
634             scenario['host'] += suffix
635
636     try:
637         target = scenario['target']
638     except KeyError:
639         pass
640     else:
641         try:
642             target['name'] += suffix
643         except TypeError:
644             scenario['target'] += suffix
645
646     try:
647         key = 'targets'
648         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
649     except KeyError:
650         pass