Merge "Introduced timeout to post method of HttpClient"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import collections
25
26 from six.moves import filter
27 from jinja2 import Environment
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.runners import base as base_runner
31 from yardstick.common.yaml_loader import yaml_load
32 from yardstick.dispatcher.base import Base as DispatcherBase
33 from yardstick.common.task_template import TaskTemplate
34 from yardstick.common import utils
35 from yardstick.common import constants
36 from yardstick.common.html_template import report_template
37
38 output_file_default = "/tmp/yardstick.out"
39 config_file = '/etc/yardstick/yardstick.conf'
40 test_cases_dir_default = "tests/opnfv/test_cases/"
41 LOG = logging.getLogger(__name__)
42 JOIN_TIMEOUT = 60
43
44
45 class Task(object):     # pragma: no cover
46     """Task commands.
47
48        Set of commands to manage benchmark tasks.
49     """
50
51     def __init__(self):
52         self.contexts = []
53         self.outputs = {}
54
55     def _set_dispatchers(self, output_config):
56         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
57                                                            'file')
58         out_types = [s.strip() for s in dispatchers.split(',')]
59         output_config['DEFAULT']['dispatcher'] = out_types
60
61     def start(self, args, **kwargs):
62         """Start a benchmark scenario."""
63
64         atexit.register(self.atexit_handler)
65
66         task_id = getattr(args, 'task_id')
67         self.task_id = task_id if task_id else str(uuid.uuid4())
68
69         self._set_log()
70
71         try:
72             output_config = utils.parse_ini_file(config_file)
73         except Exception:
74             # all error will be ignore, the default value is {}
75             output_config = {}
76
77         self._init_output_config(output_config)
78         self._set_output_config(output_config, args.output_file)
79         LOG.debug('Output configuration is: %s', output_config)
80
81         self._set_dispatchers(output_config)
82
83         # update dispatcher list
84         if 'file' in output_config['DEFAULT']['dispatcher']:
85             result = {'status': 0, 'result': {}}
86             utils.write_json_to_file(args.output_file, result)
87
88         total_start_time = time.time()
89         parser = TaskParser(args.inputfile[0])
90
91         if args.suite:
92             # 1.parse suite, return suite_params info
93             task_files, task_args, task_args_fnames = \
94                 parser.parse_suite()
95         else:
96             task_files = [parser.path]
97             task_args = [args.task_args]
98             task_args_fnames = [args.task_args_file]
99
100         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
101                   task_files, task_args, task_args_fnames)
102
103         if args.parse_only:
104             sys.exit(0)
105
106         testcases = {}
107         # parse task_files
108         for i in range(0, len(task_files)):
109             one_task_start_time = time.time()
110             parser.path = task_files[i]
111             scenarios, run_in_parallel, meet_precondition, contexts = \
112                 parser.parse_task(self.task_id, task_args[i],
113                                   task_args_fnames[i])
114
115             self.contexts.extend(contexts)
116
117             if not meet_precondition:
118                 LOG.info("meet_precondition is %s, please check envrionment",
119                          meet_precondition)
120                 continue
121
122             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
123             try:
124                 data = self._run(scenarios, run_in_parallel, args.output_file)
125             except KeyboardInterrupt:
126                 raise
127             except Exception:
128                 LOG.exception("Running test case %s failed!", case_name)
129                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
130             else:
131                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
132
133             if args.keep_deploy:
134                 # keep deployment, forget about stack
135                 # (hide it for exit handler)
136                 self.contexts = []
137             else:
138                 for context in self.contexts[::-1]:
139                     context.undeploy()
140                 self.contexts = []
141             one_task_end_time = time.time()
142             LOG.info("Task %s finished in %d secs", task_files[i],
143                      one_task_end_time - one_task_start_time)
144
145         result = self._get_format_result(testcases)
146
147         self._do_output(output_config, result)
148         self._generate_reporting(result)
149
150         total_end_time = time.time()
151         LOG.info("Total finished in %d secs",
152                  total_end_time - total_start_time)
153
154         scenario = scenarios[0]
155         LOG.info("To generate report, execute command "
156                  "'yardstick report generate %(task_id)s %(tc)s'", scenario)
157         LOG.info("Task ALL DONE, exiting")
158         return result
159
160     def _generate_reporting(self, result):
161         env = Environment()
162         with open(constants.REPORTING_FILE, 'w') as f:
163             f.write(env.from_string(report_template).render(result))
164
165         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
166
167     def _set_log(self):
168         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
169         log_formatter = logging.Formatter(log_format)
170
171         utils.makedirs(constants.TASK_LOG_DIR)
172         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
173         log_handler = logging.FileHandler(log_path)
174         log_handler.setFormatter(log_formatter)
175         log_handler.setLevel(logging.DEBUG)
176
177         logging.root.addHandler(log_handler)
178
179     def _init_output_config(self, output_config):
180         output_config.setdefault('DEFAULT', {})
181         output_config.setdefault('dispatcher_http', {})
182         output_config.setdefault('dispatcher_file', {})
183         output_config.setdefault('dispatcher_influxdb', {})
184         output_config.setdefault('nsb', {})
185
186     def _set_output_config(self, output_config, file_path):
187         try:
188             out_type = os.environ['DISPATCHER']
189         except KeyError:
190             output_config['DEFAULT'].setdefault('dispatcher', 'file')
191         else:
192             output_config['DEFAULT']['dispatcher'] = out_type
193
194         output_config['dispatcher_file']['file_path'] = file_path
195
196         try:
197             target = os.environ['TARGET']
198         except KeyError:
199             pass
200         else:
201             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
202             output_config[k]['target'] = target
203
204     def _get_format_result(self, testcases):
205         criteria = self._get_task_criteria(testcases)
206
207         info = {
208             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
209             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
210             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
211             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
212         }
213
214         result = {
215             'status': 1,
216             'result': {
217                 'criteria': criteria,
218                 'task_id': self.task_id,
219                 'info': info,
220                 'testcases': testcases
221             }
222         }
223
224         return result
225
226     def _get_task_criteria(self, testcases):
227         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
228         if criteria:
229             return 'FAIL'
230         else:
231             return 'PASS'
232
233     def _do_output(self, output_config, result):
234         dispatchers = DispatcherBase.get(output_config)
235
236         for dispatcher in dispatchers:
237             dispatcher.flush_result_data(result)
238
239     def _run(self, scenarios, run_in_parallel, output_file):
240         """Deploys context and calls runners"""
241         for context in self.contexts:
242             context.deploy()
243
244         background_runners = []
245
246         result = []
247         # Start all background scenarios
248         for scenario in filter(_is_background_scenario, scenarios):
249             scenario["runner"] = dict(type="Duration", duration=1000000000)
250             runner = self.run_one_scenario(scenario, output_file)
251             background_runners.append(runner)
252
253         runners = []
254         if run_in_parallel:
255             for scenario in scenarios:
256                 if not _is_background_scenario(scenario):
257                     runner = self.run_one_scenario(scenario, output_file)
258                     runners.append(runner)
259
260             # Wait for runners to finish
261             for runner in runners:
262                 status = runner_join(runner)
263                 if status != 0:
264                     raise RuntimeError
265                 self.outputs.update(runner.get_output())
266                 result.extend(runner.get_result())
267                 LOG.info("Runner ended, output in %s", output_file)
268         else:
269             # run serially
270             for scenario in scenarios:
271                 if not _is_background_scenario(scenario):
272                     runner = self.run_one_scenario(scenario, output_file)
273                     status = runner_join(runner)
274                     if status != 0:
275                         LOG.error('Scenario: %s ERROR', scenario.get('type'))
276                         raise RuntimeError
277                     self.outputs.update(runner.get_output())
278                     result.extend(runner.get_result())
279                     LOG.info("Runner ended, output in %s", output_file)
280
281         # Abort background runners
282         for runner in background_runners:
283             runner.abort()
284
285         # Wait for background runners to finish
286         for runner in background_runners:
287             status = runner.join(JOIN_TIMEOUT)
288             if status is None:
289                 # Nuke if it did not stop nicely
290                 base_runner.Runner.terminate(runner)
291                 runner.join(JOIN_TIMEOUT)
292             base_runner.Runner.release(runner)
293
294             self.outputs.update(runner.get_output())
295             result.extend(runner.get_result())
296             print("Background task ended")
297         return result
298
299     def atexit_handler(self):
300         """handler for process termination"""
301         base_runner.Runner.terminate_all()
302
303         if self.contexts:
304             LOG.info("Undeploying all contexts")
305             for context in self.contexts[::-1]:
306                 context.undeploy()
307
308     def _parse_options(self, op):
309         if isinstance(op, dict):
310             return {k: self._parse_options(v) for k, v in op.items()}
311         elif isinstance(op, list):
312             return [self._parse_options(v) for v in op]
313         elif isinstance(op, str):
314             return self.outputs.get(op[1:]) if op.startswith('$') else op
315         else:
316             return op
317
318     def run_one_scenario(self, scenario_cfg, output_file):
319         """run one scenario using context"""
320         runner_cfg = scenario_cfg["runner"]
321         runner_cfg['output_filename'] = output_file
322
323         options = scenario_cfg.get('options', {})
324         scenario_cfg['options'] = self._parse_options(options)
325
326         # TODO support get multi hosts/vms info
327         context_cfg = {}
328         if "host" in scenario_cfg:
329             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
330
331         if "target" in scenario_cfg:
332             if is_ip_addr(scenario_cfg["target"]):
333                 context_cfg['target'] = {}
334                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
335             else:
336                 context_cfg['target'] = Context.get_server(
337                     scenario_cfg["target"])
338                 if self._is_same_heat_context(scenario_cfg["host"],
339                                               scenario_cfg["target"]):
340                     context_cfg["target"]["ipaddr"] = \
341                         context_cfg["target"]["private_ip"]
342                 else:
343                     context_cfg["target"]["ipaddr"] = \
344                         context_cfg["target"]["ip"]
345
346         if "targets" in scenario_cfg:
347             ip_list = []
348             for target in scenario_cfg["targets"]:
349                 if is_ip_addr(target):
350                     ip_list.append(target)
351                     context_cfg['target'] = {}
352                 else:
353                     context_cfg['target'] = Context.get_server(target)
354                     if self._is_same_heat_context(scenario_cfg["host"],
355                                                   target):
356                         ip_list.append(context_cfg["target"]["private_ip"])
357                     else:
358                         ip_list.append(context_cfg["target"]["ip"])
359             context_cfg['target']['ipaddr'] = ','.join(ip_list)
360
361         if "nodes" in scenario_cfg:
362             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
363             context_cfg["networks"] = get_networks_from_nodes(
364                 context_cfg["nodes"])
365
366         runner = base_runner.Runner.get(runner_cfg)
367
368         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
369         runner.run(scenario_cfg, context_cfg)
370
371         return runner
372
373     def _is_same_heat_context(self, host_attr, target_attr):
374         """check if two servers are in the same heat context
375         host_attr: either a name for a server created by yardstick or a dict
376         with attribute name mapping when using external heat templates
377         target_attr: either a name for a server created by yardstick or a dict
378         with attribute name mapping when using external heat templates
379         """
380         for context in self.contexts:
381             if context.__context_type__ != "Heat":
382                 continue
383
384             host = context._get_server(host_attr)
385             if host is None:
386                 continue
387
388             target = context._get_server(target_attr)
389             if target is None:
390                 return False
391
392             # Both host and target is not None, then they are in the
393             # same heat context.
394             return True
395
396         return False
397
398
399 class TaskParser(object):       # pragma: no cover
400     """Parser for task config files in yaml format"""
401
402     def __init__(self, path):
403         self.path = path
404
405     def _meet_constraint(self, task, cur_pod, cur_installer):
406         if "constraint" in task:
407             constraint = task.get('constraint', None)
408             if constraint is not None:
409                 tc_fit_pod = constraint.get('pod', None)
410                 tc_fit_installer = constraint.get('installer', None)
411                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
412                          cur_pod, cur_installer, constraint)
413                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
414                     return False
415                 if (cur_installer is None) or (tc_fit_installer and cur_installer
416                                                not in tc_fit_installer):
417                     return False
418         return True
419
420     def _get_task_para(self, task, cur_pod):
421         task_args = task.get('task_args', None)
422         if task_args is not None:
423             task_args = task_args.get(cur_pod, task_args.get('default'))
424         task_args_fnames = task.get('task_args_fnames', None)
425         if task_args_fnames is not None:
426             task_args_fnames = task_args_fnames.get(cur_pod, None)
427         return task_args, task_args_fnames
428
429     def parse_suite(self):
430         """parse the suite file and return a list of task config file paths
431            and lists of optional parameters if present"""
432         LOG.info("\nParsing suite file:%s", self.path)
433
434         try:
435             with open(self.path) as stream:
436                 cfg = yaml_load(stream)
437         except IOError as ioerror:
438             sys.exit(ioerror)
439
440         self._check_schema(cfg["schema"], "suite")
441         LOG.info("\nStarting scenario:%s", cfg["name"])
442
443         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
444         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
445                                       test_cases_dir)
446         if test_cases_dir[-1] != os.sep:
447             test_cases_dir += os.sep
448
449         cur_pod = os.environ.get('NODE_NAME', None)
450         cur_installer = os.environ.get('INSTALLER_TYPE', None)
451
452         valid_task_files = []
453         valid_task_args = []
454         valid_task_args_fnames = []
455
456         for task in cfg["test_cases"]:
457             # 1.check file_name
458             if "file_name" in task:
459                 task_fname = task.get('file_name', None)
460                 if task_fname is None:
461                     continue
462             else:
463                 continue
464             # 2.check constraint
465             if self._meet_constraint(task, cur_pod, cur_installer):
466                 valid_task_files.append(test_cases_dir + task_fname)
467             else:
468                 continue
469             # 3.fetch task parameters
470             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
471             valid_task_args.append(task_args)
472             valid_task_args_fnames.append(task_args_fnames)
473
474         return valid_task_files, valid_task_args, valid_task_args_fnames
475
476     def parse_task(self, task_id, task_args=None, task_args_file=None):
477         """parses the task file and return an context and scenario instances"""
478         LOG.info("Parsing task config: %s", self.path)
479
480         try:
481             kw = {}
482             if task_args_file:
483                 with open(task_args_file) as f:
484                     kw.update(parse_task_args("task_args_file", f.read()))
485             kw.update(parse_task_args("task_args", task_args))
486         except TypeError:
487             raise TypeError()
488
489         try:
490             with open(self.path) as f:
491                 try:
492                     input_task = f.read()
493                     rendered_task = TaskTemplate.render(input_task, **kw)
494                 except Exception as e:
495                     LOG.exception('Failed to render template:\n%s\n', input_task)
496                     raise e
497                 LOG.debug("Input task is:\n%s\n", rendered_task)
498
499                 cfg = yaml_load(rendered_task)
500         except IOError as ioerror:
501             sys.exit(ioerror)
502
503         self._check_schema(cfg["schema"], "task")
504         meet_precondition = self._check_precondition(cfg)
505
506         # TODO: support one or many contexts? Many would simpler and precise
507         # TODO: support hybrid context type
508         if "context" in cfg:
509             context_cfgs = [cfg["context"]]
510         elif "contexts" in cfg:
511             context_cfgs = cfg["contexts"]
512         else:
513             context_cfgs = [{"type": "Dummy"}]
514
515         contexts = []
516         name_suffix = '-{}'.format(task_id[:8])
517         for cfg_attrs in context_cfgs:
518             try:
519                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
520                                                   name_suffix)
521             except KeyError:
522                 pass
523             # default to Heat context because we are testing OpenStack
524             context_type = cfg_attrs.get("type", "Heat")
525             context = Context.get(context_type)
526             context.init(cfg_attrs)
527             contexts.append(context)
528
529         run_in_parallel = cfg.get("run_in_parallel", False)
530
531         # add tc and task id for influxdb extended tags
532         for scenario in cfg["scenarios"]:
533             task_name = os.path.splitext(os.path.basename(self.path))[0]
534             scenario["tc"] = task_name
535             scenario["task_id"] = task_id
536             # embed task path into scenario so we can load other files
537             # relative to task path
538             scenario["task_path"] = os.path.dirname(self.path)
539
540             change_server_name(scenario, name_suffix)
541
542             try:
543                 for node in scenario['nodes']:
544                     scenario['nodes'][node] += name_suffix
545             except KeyError:
546                 pass
547
548         # TODO we need something better here, a class that represent the file
549         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
550
551     def _check_schema(self, cfg_schema, schema_type):
552         """Check if config file is using the correct schema type"""
553
554         if cfg_schema != "yardstick:" + schema_type + ":0.1":
555             sys.exit("error: file %s has unknown schema %s" % (self.path,
556                                                                cfg_schema))
557
558     def _check_precondition(self, cfg):
559         """Check if the environment meet the precondition"""
560
561         if "precondition" in cfg:
562             precondition = cfg["precondition"]
563             installer_type = precondition.get("installer_type", None)
564             deploy_scenarios = precondition.get("deploy_scenarios", None)
565             tc_fit_pods = precondition.get("pod_name", None)
566             installer_type_env = os.environ.get('INSTALL_TYPE', None)
567             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
568             pod_name_env = os.environ.get('NODE_NAME', None)
569
570             LOG.info("installer_type: %s, installer_type_env: %s",
571                      installer_type, installer_type_env)
572             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
573                      deploy_scenarios, deploy_scenario_env)
574             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
575                      tc_fit_pods, pod_name_env)
576             if installer_type and installer_type_env:
577                 if installer_type_env not in installer_type:
578                     return False
579             if deploy_scenarios and deploy_scenario_env:
580                 deploy_scenarios_list = deploy_scenarios.split(',')
581                 for deploy_scenario in deploy_scenarios_list:
582                     if deploy_scenario_env.startswith(deploy_scenario):
583                         return True
584                 return False
585             if tc_fit_pods and pod_name_env:
586                 if pod_name_env not in tc_fit_pods:
587                     return False
588         return True
589
590
591 def is_ip_addr(addr):
592     """check if string addr is an IP address"""
593     try:
594         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
595     except AttributeError:
596         pass
597
598     try:
599         ipaddress.ip_address(addr.encode('utf-8'))
600     except ValueError:
601         return False
602     else:
603         return True
604
605
606 def _is_background_scenario(scenario):
607     if "run_in_background" in scenario:
608         return scenario["run_in_background"]
609     else:
610         return False
611
612
613 def parse_nodes_with_context(scenario_cfg):
614     """parse the 'nodes' fields in scenario """
615     # ensure consistency in node instantiation order
616     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
617                        for nodename in sorted(scenario_cfg["nodes"]))
618
619
620 def get_networks_from_nodes(nodes):
621     """parse the 'nodes' fields in scenario """
622     networks = {}
623     for node in nodes.values():
624         if not node:
625             continue
626         interfaces = node.get('interfaces', {})
627         for interface in interfaces.values():
628             # vld_id is network_name
629             network_name = interface.get('network_name')
630             if not network_name:
631                 continue
632             network = Context.get_network(network_name)
633             if network:
634                 networks[network['name']] = network
635     return networks
636
637
638 def runner_join(runner):
639     """join (wait for) a runner, exit process at runner failure"""
640     status = runner.join()
641     base_runner.Runner.release(runner)
642     return status
643
644
645 def print_invalid_header(source_name, args):
646     print("Invalid %(source)s passed:\n\n %(args)s\n"
647           % {"source": source_name, "args": args})
648
649
650 def parse_task_args(src_name, args):
651     if isinstance(args, collections.Mapping):
652         return args
653
654     try:
655         kw = args and yaml_load(args)
656         kw = {} if kw is None else kw
657     except yaml.parser.ParserError as e:
658         print_invalid_header(src_name, args)
659         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
660               % {"source": src_name, "err": e})
661         raise TypeError()
662
663     if not isinstance(kw, dict):
664         print_invalid_header(src_name, args)
665         print("%(src)s had to be dict, actually %(src_type)s\n"
666               % {"src": src_name, "src_type": type(kw)})
667         raise TypeError()
668     return kw
669
670
671 def change_server_name(scenario, suffix):
672     try:
673         host = scenario['host']
674     except KeyError:
675         pass
676     else:
677         try:
678             host['name'] += suffix
679         except TypeError:
680             scenario['host'] += suffix
681
682     try:
683         target = scenario['target']
684     except KeyError:
685         pass
686     else:
687         try:
688             target['name'] += suffix
689         except TypeError:
690             scenario['target'] += suffix
691
692     try:
693         key = 'targets'
694         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
695     except KeyError:
696         pass