Merge "Testcase to find storage bottlenecks using Yardstick for Multistack"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import collections
25
26 from six.moves import filter
27 from jinja2 import Environment
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.runners import base as base_runner
31 from yardstick.common.constants import CONF_FILE
32 from yardstick.common.yaml_loader import yaml_load
33 from yardstick.dispatcher.base import Base as DispatcherBase
34 from yardstick.common.task_template import TaskTemplate
35 from yardstick.common import utils
36 from yardstick.common import constants
37 from yardstick.common.html_template import report_template
38
39 output_file_default = "/tmp/yardstick.out"
40 test_cases_dir_default = "tests/opnfv/test_cases/"
41 LOG = logging.getLogger(__name__)
42
43
44 class Task(object):     # pragma: no cover
45     """Task commands.
46
47        Set of commands to manage benchmark tasks.
48     """
49
50     def __init__(self):
51         self.contexts = []
52         self.outputs = {}
53
54     def _set_dispatchers(self, output_config):
55         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
56                                                            'file')
57         out_types = [s.strip() for s in dispatchers.split(',')]
58         output_config['DEFAULT']['dispatcher'] = out_types
59
60     def start(self, args):
61         """Start a benchmark scenario."""
62
63         atexit.register(self.atexit_handler)
64
65         task_id = getattr(args, 'task_id')
66         self.task_id = task_id if task_id else str(uuid.uuid4())
67
68         self._set_log()
69
70         try:
71             output_config = utils.parse_ini_file(CONF_FILE)
72         except Exception:  # pylint: disable=broad-except
73             # all error will be ignore, the default value is {}
74             output_config = {}
75
76         self._init_output_config(output_config)
77         self._set_output_config(output_config, args.output_file)
78         LOG.debug('Output configuration is: %s', output_config)
79
80         self._set_dispatchers(output_config)
81
82         # update dispatcher list
83         if 'file' in output_config['DEFAULT']['dispatcher']:
84             result = {'status': 0, 'result': {}}
85             utils.write_json_to_file(args.output_file, result)
86
87         total_start_time = time.time()
88         parser = TaskParser(args.inputfile[0])
89
90         if args.suite:
91             # 1.parse suite, return suite_params info
92             task_files, task_args, task_args_fnames = \
93                 parser.parse_suite()
94         else:
95             task_files = [parser.path]
96             task_args = [args.task_args]
97             task_args_fnames = [args.task_args_file]
98
99         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
100                   task_files, task_args, task_args_fnames)
101
102         if args.parse_only:
103             sys.exit(0)
104
105         testcases = {}
106         # parse task_files
107         for i in range(0, len(task_files)):
108             one_task_start_time = time.time()
109             parser.path = task_files[i]
110             scenarios, run_in_parallel, meet_precondition, contexts = \
111                 parser.parse_task(self.task_id, task_args[i],
112                                   task_args_fnames[i])
113
114             self.contexts.extend(contexts)
115
116             if not meet_precondition:
117                 LOG.info("meet_precondition is %s, please check envrionment",
118                          meet_precondition)
119                 continue
120
121             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
122             try:
123                 data = self._run(scenarios, run_in_parallel, args.output_file)
124             except KeyboardInterrupt:
125                 raise
126             except Exception:  # pylint: disable=broad-except
127                 LOG.error('Testcase: "%s" FAILED!!!', case_name, exc_info=True)
128                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
129             else:
130                 LOG.info('Testcase: "%s" SUCCESS!!!', case_name)
131                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
132
133             if args.keep_deploy:
134                 # keep deployment, forget about stack
135                 # (hide it for exit handler)
136                 self.contexts = []
137             else:
138                 for context in self.contexts[::-1]:
139                     context.undeploy()
140                 self.contexts = []
141             one_task_end_time = time.time()
142             LOG.info("Task %s finished in %d secs", task_files[i],
143                      one_task_end_time - one_task_start_time)
144
145         result = self._get_format_result(testcases)
146
147         self._do_output(output_config, result)
148         self._generate_reporting(result)
149
150         total_end_time = time.time()
151         LOG.info("Total finished in %d secs",
152                  total_end_time - total_start_time)
153
154         scenario = scenarios[0]
155         LOG.info("To generate report, execute command "
156                  "'yardstick report generate %(task_id)s %(tc)s'", scenario)
157         LOG.info("Task ALL DONE, exiting")
158         return result
159
160     def _generate_reporting(self, result):
161         env = Environment()
162         with open(constants.REPORTING_FILE, 'w') as f:
163             f.write(env.from_string(report_template).render(result))
164
165         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
166
167     def _set_log(self):
168         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
169         log_formatter = logging.Formatter(log_format)
170
171         utils.makedirs(constants.TASK_LOG_DIR)
172         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
173         log_handler = logging.FileHandler(log_path)
174         log_handler.setFormatter(log_formatter)
175         log_handler.setLevel(logging.DEBUG)
176
177         logging.root.addHandler(log_handler)
178
179     def _init_output_config(self, output_config):
180         output_config.setdefault('DEFAULT', {})
181         output_config.setdefault('dispatcher_http', {})
182         output_config.setdefault('dispatcher_file', {})
183         output_config.setdefault('dispatcher_influxdb', {})
184         output_config.setdefault('nsb', {})
185
186     def _set_output_config(self, output_config, file_path):
187         try:
188             out_type = os.environ['DISPATCHER']
189         except KeyError:
190             output_config['DEFAULT'].setdefault('dispatcher', 'file')
191         else:
192             output_config['DEFAULT']['dispatcher'] = out_type
193
194         output_config['dispatcher_file']['file_path'] = file_path
195
196         try:
197             target = os.environ['TARGET']
198         except KeyError:
199             pass
200         else:
201             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
202             output_config[k]['target'] = target
203
204     def _get_format_result(self, testcases):
205         criteria = self._get_task_criteria(testcases)
206
207         info = {
208             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
209             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
210             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
211             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
212         }
213
214         result = {
215             'status': 1,
216             'result': {
217                 'criteria': criteria,
218                 'task_id': self.task_id,
219                 'info': info,
220                 'testcases': testcases
221             }
222         }
223
224         return result
225
226     def _get_task_criteria(self, testcases):
227         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
228         if criteria:
229             return 'FAIL'
230         else:
231             return 'PASS'
232
233     def _do_output(self, output_config, result):
234         dispatchers = DispatcherBase.get(output_config)
235
236         for dispatcher in dispatchers:
237             dispatcher.flush_result_data(result)
238
239     def _run(self, scenarios, run_in_parallel, output_file):
240         """Deploys context and calls runners"""
241         for context in self.contexts:
242             context.deploy()
243
244         background_runners = []
245
246         result = []
247         # Start all background scenarios
248         for scenario in filter(_is_background_scenario, scenarios):
249             scenario["runner"] = dict(type="Duration", duration=1000000000)
250             runner = self.run_one_scenario(scenario, output_file)
251             background_runners.append(runner)
252
253         runners = []
254         if run_in_parallel:
255             for scenario in scenarios:
256                 if not _is_background_scenario(scenario):
257                     runner = self.run_one_scenario(scenario, output_file)
258                     runners.append(runner)
259
260             # Wait for runners to finish
261             for runner in runners:
262                 status = runner_join(runner, background_runners, self.outputs, result)
263                 if status != 0:
264                     raise RuntimeError(
265                         "{0} runner status {1}".format(runner.__execution_type__, status))
266                 LOG.info("Runner ended, output in %s", output_file)
267         else:
268             # run serially
269             for scenario in scenarios:
270                 if not _is_background_scenario(scenario):
271                     runner = self.run_one_scenario(scenario, output_file)
272                     status = runner_join(runner, background_runners, self.outputs, result)
273                     if status != 0:
274                         LOG.error('Scenario NO.%s: "%s" ERROR!',
275                                   scenarios.index(scenario) + 1,
276                                   scenario.get('type'))
277                         raise RuntimeError(
278                             "{0} runner status {1}".format(runner.__execution_type__, status))
279                     LOG.info("Runner ended, output in %s", output_file)
280
281         # Abort background runners
282         for runner in background_runners:
283             runner.abort()
284
285         # Wait for background runners to finish
286         for runner in background_runners:
287             status = runner.join(self.outputs, result)
288             if status is None:
289                 # Nuke if it did not stop nicely
290                 base_runner.Runner.terminate(runner)
291                 runner.join(self.outputs, result)
292             base_runner.Runner.release(runner)
293
294             print("Background task ended")
295         return result
296
297     def atexit_handler(self):
298         """handler for process termination"""
299         base_runner.Runner.terminate_all()
300
301         if self.contexts:
302             LOG.info("Undeploying all contexts")
303             for context in self.contexts[::-1]:
304                 context.undeploy()
305
306     def _parse_options(self, op):
307         if isinstance(op, dict):
308             return {k: self._parse_options(v) for k, v in op.items()}
309         elif isinstance(op, list):
310             return [self._parse_options(v) for v in op]
311         elif isinstance(op, str):
312             return self.outputs.get(op[1:]) if op.startswith('$') else op
313         else:
314             return op
315
316     def run_one_scenario(self, scenario_cfg, output_file):
317         """run one scenario using context"""
318         runner_cfg = scenario_cfg["runner"]
319         runner_cfg['output_filename'] = output_file
320
321         options = scenario_cfg.get('options', {})
322         scenario_cfg['options'] = self._parse_options(options)
323
324         # TODO support get multi hosts/vms info
325         context_cfg = {}
326         server_name = scenario_cfg.get('options', {}).get('server_name', {})
327
328         def config_context_target(cfg):
329             target = cfg['target']
330             if is_ip_addr(target):
331                 context_cfg['target'] = {"ipaddr": target}
332             else:
333                 context_cfg['target'] = Context.get_server(target)
334                 if self._is_same_context(cfg["host"], target):
335                     context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
336                 else:
337                     context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
338
339         host_name = server_name.get('host', scenario_cfg.get('host'))
340         if host_name:
341             context_cfg['host'] = Context.get_server(host_name)
342
343         for item in [server_name, scenario_cfg]:
344             try:
345                 config_context_target(item)
346             except KeyError:
347                 pass
348             else:
349                 break
350
351         if "targets" in scenario_cfg:
352             ip_list = []
353             for target in scenario_cfg["targets"]:
354                 if is_ip_addr(target):
355                     ip_list.append(target)
356                     context_cfg['target'] = {}
357                 else:
358                     context_cfg['target'] = Context.get_server(target)
359                     if self._is_same_context(scenario_cfg["host"],
360                                              target):
361                         ip_list.append(context_cfg["target"]["private_ip"])
362                     else:
363                         ip_list.append(context_cfg["target"]["ip"])
364             context_cfg['target']['ipaddr'] = ','.join(ip_list)
365
366         if "nodes" in scenario_cfg:
367             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
368             context_cfg["networks"] = get_networks_from_nodes(
369                 context_cfg["nodes"])
370
371         runner = base_runner.Runner.get(runner_cfg)
372
373         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
374         runner.run(scenario_cfg, context_cfg)
375
376         return runner
377
378     def _is_same_context(self, host_attr, target_attr):
379         """check if two servers are in the same heat context
380         host_attr: either a name for a server created by yardstick or a dict
381         with attribute name mapping when using external heat templates
382         target_attr: either a name for a server created by yardstick or a dict
383         with attribute name mapping when using external heat templates
384         """
385         for context in self.contexts:
386             if context.__context_type__ not in {"Heat", "Kubernetes"}:
387                 continue
388
389             host = context._get_server(host_attr)
390             if host is None:
391                 continue
392
393             target = context._get_server(target_attr)
394             if target is None:
395                 return False
396
397             # Both host and target is not None, then they are in the
398             # same heat context.
399             return True
400
401         return False
402
403
404 class TaskParser(object):       # pragma: no cover
405     """Parser for task config files in yaml format"""
406
407     def __init__(self, path):
408         self.path = path
409
410     def _meet_constraint(self, task, cur_pod, cur_installer):
411         if "constraint" in task:
412             constraint = task.get('constraint', None)
413             if constraint is not None:
414                 tc_fit_pod = constraint.get('pod', None)
415                 tc_fit_installer = constraint.get('installer', None)
416                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
417                          cur_pod, cur_installer, constraint)
418                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
419                     return False
420                 if (cur_installer is None) or (tc_fit_installer and cur_installer
421                                                not in tc_fit_installer):
422                     return False
423         return True
424
425     def _get_task_para(self, task, cur_pod):
426         task_args = task.get('task_args', None)
427         if task_args is not None:
428             task_args = task_args.get(cur_pod, task_args.get('default'))
429         task_args_fnames = task.get('task_args_fnames', None)
430         if task_args_fnames is not None:
431             task_args_fnames = task_args_fnames.get(cur_pod, None)
432         return task_args, task_args_fnames
433
434     def parse_suite(self):
435         """parse the suite file and return a list of task config file paths
436            and lists of optional parameters if present"""
437         LOG.info("\nParsing suite file:%s", self.path)
438
439         try:
440             with open(self.path) as stream:
441                 cfg = yaml_load(stream)
442         except IOError as ioerror:
443             sys.exit(ioerror)
444
445         self._check_schema(cfg["schema"], "suite")
446         LOG.info("\nStarting scenario:%s", cfg["name"])
447
448         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
449         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
450                                       test_cases_dir)
451         if test_cases_dir[-1] != os.sep:
452             test_cases_dir += os.sep
453
454         cur_pod = os.environ.get('NODE_NAME', None)
455         cur_installer = os.environ.get('INSTALLER_TYPE', None)
456
457         valid_task_files = []
458         valid_task_args = []
459         valid_task_args_fnames = []
460
461         for task in cfg["test_cases"]:
462             # 1.check file_name
463             if "file_name" in task:
464                 task_fname = task.get('file_name', None)
465                 if task_fname is None:
466                     continue
467             else:
468                 continue
469             # 2.check constraint
470             if self._meet_constraint(task, cur_pod, cur_installer):
471                 valid_task_files.append(test_cases_dir + task_fname)
472             else:
473                 continue
474             # 3.fetch task parameters
475             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
476             valid_task_args.append(task_args)
477             valid_task_args_fnames.append(task_args_fnames)
478
479         return valid_task_files, valid_task_args, valid_task_args_fnames
480
481     def parse_task(self, task_id, task_args=None, task_args_file=None):
482         """parses the task file and return an context and scenario instances"""
483         LOG.info("Parsing task config: %s", self.path)
484
485         try:
486             kw = {}
487             if task_args_file:
488                 with open(task_args_file) as f:
489                     kw.update(parse_task_args("task_args_file", f.read()))
490             kw.update(parse_task_args("task_args", task_args))
491         except TypeError:
492             raise TypeError()
493
494         try:
495             with open(self.path) as f:
496                 try:
497                     input_task = f.read()
498                     rendered_task = TaskTemplate.render(input_task, **kw)
499                 except Exception as e:
500                     LOG.exception('Failed to render template:\n%s\n', input_task)
501                     raise e
502                 LOG.debug("Input task is:\n%s\n", rendered_task)
503
504                 cfg = yaml_load(rendered_task)
505         except IOError as ioerror:
506             sys.exit(ioerror)
507
508         self._check_schema(cfg["schema"], "task")
509         meet_precondition = self._check_precondition(cfg)
510
511         # TODO: support one or many contexts? Many would simpler and precise
512         # TODO: support hybrid context type
513         if "context" in cfg:
514             context_cfgs = [cfg["context"]]
515         elif "contexts" in cfg:
516             context_cfgs = cfg["contexts"]
517         else:
518             context_cfgs = [{"type": "Dummy"}]
519
520         contexts = []
521         name_suffix = '-{}'.format(task_id[:8])
522         for cfg_attrs in context_cfgs:
523             try:
524                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
525                                                   name_suffix)
526             except KeyError:
527                 pass
528             # default to Heat context because we are testing OpenStack
529             context_type = cfg_attrs.get("type", "Heat")
530             context = Context.get(context_type)
531             context.init(cfg_attrs)
532             contexts.append(context)
533
534         run_in_parallel = cfg.get("run_in_parallel", False)
535
536         # add tc and task id for influxdb extended tags
537         for scenario in cfg["scenarios"]:
538             task_name = os.path.splitext(os.path.basename(self.path))[0]
539             scenario["tc"] = task_name
540             scenario["task_id"] = task_id
541             # embed task path into scenario so we can load other files
542             # relative to task path
543             scenario["task_path"] = os.path.dirname(self.path)
544
545             change_server_name(scenario, name_suffix)
546
547             try:
548                 for node in scenario['nodes']:
549                     scenario['nodes'][node] += name_suffix
550             except KeyError:
551                 pass
552
553         # TODO we need something better here, a class that represent the file
554         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
555
556     def _check_schema(self, cfg_schema, schema_type):
557         """Check if config file is using the correct schema type"""
558
559         if cfg_schema != "yardstick:" + schema_type + ":0.1":
560             sys.exit("error: file %s has unknown schema %s" % (self.path,
561                                                                cfg_schema))
562
563     def _check_precondition(self, cfg):
564         """Check if the environment meet the precondition"""
565
566         if "precondition" in cfg:
567             precondition = cfg["precondition"]
568             installer_type = precondition.get("installer_type", None)
569             deploy_scenarios = precondition.get("deploy_scenarios", None)
570             tc_fit_pods = precondition.get("pod_name", None)
571             installer_type_env = os.environ.get('INSTALL_TYPE', None)
572             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
573             pod_name_env = os.environ.get('NODE_NAME', None)
574
575             LOG.info("installer_type: %s, installer_type_env: %s",
576                      installer_type, installer_type_env)
577             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
578                      deploy_scenarios, deploy_scenario_env)
579             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
580                      tc_fit_pods, pod_name_env)
581             if installer_type and installer_type_env:
582                 if installer_type_env not in installer_type:
583                     return False
584             if deploy_scenarios and deploy_scenario_env:
585                 deploy_scenarios_list = deploy_scenarios.split(',')
586                 for deploy_scenario in deploy_scenarios_list:
587                     if deploy_scenario_env.startswith(deploy_scenario):
588                         return True
589                 return False
590             if tc_fit_pods and pod_name_env:
591                 if pod_name_env not in tc_fit_pods:
592                     return False
593         return True
594
595
596 def is_ip_addr(addr):
597     """check if string addr is an IP address"""
598     try:
599         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
600     except AttributeError:
601         pass
602
603     try:
604         ipaddress.ip_address(addr.encode('utf-8'))
605     except ValueError:
606         return False
607     else:
608         return True
609
610
611 def _is_background_scenario(scenario):
612     if "run_in_background" in scenario:
613         return scenario["run_in_background"]
614     else:
615         return False
616
617
618 def parse_nodes_with_context(scenario_cfg):
619     """parse the 'nodes' fields in scenario """
620     # ensure consistency in node instantiation order
621     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
622                        for nodename in sorted(scenario_cfg["nodes"]))
623
624
625 def get_networks_from_nodes(nodes):
626     """parse the 'nodes' fields in scenario """
627     networks = {}
628     for node in nodes.values():
629         if not node:
630             continue
631         interfaces = node.get('interfaces', {})
632         for interface in interfaces.values():
633             # vld_id is network_name
634             network_name = interface.get('network_name')
635             if not network_name:
636                 continue
637             network = Context.get_network(network_name)
638             if network:
639                 networks[network['name']] = network
640     return networks
641
642
643 def runner_join(runner, background_runners, outputs, result):
644     """join (wait for) a runner, exit process at runner failure
645     :param background_runners:
646     :type background_runners:
647     :param outputs:
648     :type outputs: dict
649     :param result:
650     :type result: list
651     """
652     while runner.poll() is None:
653         outputs.update(runner.get_output())
654         result.extend(runner.get_result())
655         # drain all the background runner queues
656         for background in background_runners:
657             outputs.update(background.get_output())
658             result.extend(background.get_result())
659     status = runner.join(outputs, result)
660     base_runner.Runner.release(runner)
661     return status
662
663
664 def print_invalid_header(source_name, args):
665     print("Invalid %(source)s passed:\n\n %(args)s\n"
666           % {"source": source_name, "args": args})
667
668
669 def parse_task_args(src_name, args):
670     if isinstance(args, collections.Mapping):
671         return args
672
673     try:
674         kw = args and yaml_load(args)
675         kw = {} if kw is None else kw
676     except yaml.parser.ParserError as e:
677         print_invalid_header(src_name, args)
678         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
679               % {"source": src_name, "err": e})
680         raise TypeError()
681
682     if not isinstance(kw, dict):
683         print_invalid_header(src_name, args)
684         print("%(src)s had to be dict, actually %(src_type)s\n"
685               % {"src": src_name, "src_type": type(kw)})
686         raise TypeError()
687     return kw
688
689
690 def change_server_name(scenario, suffix):
691
692     def add_suffix(cfg, key):
693         try:
694             value = cfg[key]
695         except KeyError:
696             pass
697         else:
698             try:
699                 value['name'] += suffix
700             except TypeError:
701                 cfg[key] += suffix
702
703     server_name = scenario.get('options', {}).get('server_name', {})
704
705     add_suffix(scenario, 'host')
706     add_suffix(scenario, 'target')
707     add_suffix(server_name, 'host')
708     add_suffix(server_name, 'target')
709
710     try:
711         key = 'targets'
712         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
713     except KeyError:
714         pass