Merge "Adding 2 node ixia generic scale-out test case generation"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 from collections import OrderedDict
17
18 import yaml
19 import atexit
20 import ipaddress
21 import time
22 import logging
23 import uuid
24 import collections
25
26 from six.moves import filter
27 from jinja2 import Environment
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.runners import base as base_runner
31 from yardstick.common.yaml_loader import yaml_load
32 from yardstick.dispatcher.base import Base as DispatcherBase
33 from yardstick.common.task_template import TaskTemplate
34 from yardstick.common import utils
35 from yardstick.common import constants
36 from yardstick.common.html_template import report_template
37
38 output_file_default = "/tmp/yardstick.out"
39 config_file = '/etc/yardstick/yardstick.conf'
40 test_cases_dir_default = "tests/opnfv/test_cases/"
41 LOG = logging.getLogger(__name__)
42 JOIN_TIMEOUT = 60
43
44
45 class Task(object):     # pragma: no cover
46     """Task commands.
47
48        Set of commands to manage benchmark tasks.
49     """
50
51     def __init__(self):
52         self.contexts = []
53         self.outputs = {}
54
55     def _set_dispatchers(self, output_config):
56         dispatchers = output_config.get('DEFAULT', {}).get('dispatcher',
57                                                            'file')
58         out_types = [s.strip() for s in dispatchers.split(',')]
59         output_config['DEFAULT']['dispatcher'] = out_types
60
61     def start(self, args, **kwargs):
62         """Start a benchmark scenario."""
63
64         atexit.register(self.atexit_handler)
65
66         task_id = getattr(args, 'task_id')
67         self.task_id = task_id if task_id else str(uuid.uuid4())
68
69         self._set_log()
70
71         try:
72             output_config = utils.parse_ini_file(config_file)
73         except Exception:
74             # all error will be ignore, the default value is {}
75             output_config = {}
76
77         self._init_output_config(output_config)
78         self._set_output_config(output_config, args.output_file)
79         LOG.debug('Output configuration is: %s', output_config)
80
81         self._set_dispatchers(output_config)
82
83         # update dispatcher list
84         if 'file' in output_config['DEFAULT']['dispatcher']:
85             result = {'status': 0, 'result': {}}
86             utils.write_json_to_file(args.output_file, result)
87
88         total_start_time = time.time()
89         parser = TaskParser(args.inputfile[0])
90
91         if args.suite:
92             # 1.parse suite, return suite_params info
93             task_files, task_args, task_args_fnames = \
94                 parser.parse_suite()
95         else:
96             task_files = [parser.path]
97             task_args = [args.task_args]
98             task_args_fnames = [args.task_args_file]
99
100         LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
101                   task_files, task_args, task_args_fnames)
102
103         if args.parse_only:
104             sys.exit(0)
105
106         testcases = {}
107         # parse task_files
108         for i in range(0, len(task_files)):
109             one_task_start_time = time.time()
110             parser.path = task_files[i]
111             scenarios, run_in_parallel, meet_precondition, contexts = \
112                 parser.parse_task(self.task_id, task_args[i],
113                                   task_args_fnames[i])
114
115             self.contexts.extend(contexts)
116
117             if not meet_precondition:
118                 LOG.info("meet_precondition is %s, please check envrionment",
119                          meet_precondition)
120                 continue
121
122             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
123             try:
124                 data = self._run(scenarios, run_in_parallel, args.output_file)
125             except KeyboardInterrupt:
126                 raise
127             except Exception:
128                 LOG.error('Testcase: "%s" FAILED!!!', case_name, exc_info=True)
129                 testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
130             else:
131                 LOG.info('Testcase: "%s" SUCCESS!!!', case_name)
132                 testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
133
134             if args.keep_deploy:
135                 # keep deployment, forget about stack
136                 # (hide it for exit handler)
137                 self.contexts = []
138             else:
139                 for context in self.contexts[::-1]:
140                     context.undeploy()
141                 self.contexts = []
142             one_task_end_time = time.time()
143             LOG.info("Task %s finished in %d secs", task_files[i],
144                      one_task_end_time - one_task_start_time)
145
146         result = self._get_format_result(testcases)
147
148         self._do_output(output_config, result)
149         self._generate_reporting(result)
150
151         total_end_time = time.time()
152         LOG.info("Total finished in %d secs",
153                  total_end_time - total_start_time)
154
155         scenario = scenarios[0]
156         LOG.info("To generate report, execute command "
157                  "'yardstick report generate %(task_id)s %(tc)s'", scenario)
158         LOG.info("Task ALL DONE, exiting")
159         return result
160
161     def _generate_reporting(self, result):
162         env = Environment()
163         with open(constants.REPORTING_FILE, 'w') as f:
164             f.write(env.from_string(report_template).render(result))
165
166         LOG.info("Report can be found in '%s'", constants.REPORTING_FILE)
167
168     def _set_log(self):
169         log_format = '%(asctime)s %(name)s %(filename)s:%(lineno)d %(levelname)s %(message)s'
170         log_formatter = logging.Formatter(log_format)
171
172         utils.makedirs(constants.TASK_LOG_DIR)
173         log_path = os.path.join(constants.TASK_LOG_DIR, '{}.log'.format(self.task_id))
174         log_handler = logging.FileHandler(log_path)
175         log_handler.setFormatter(log_formatter)
176         log_handler.setLevel(logging.DEBUG)
177
178         logging.root.addHandler(log_handler)
179
180     def _init_output_config(self, output_config):
181         output_config.setdefault('DEFAULT', {})
182         output_config.setdefault('dispatcher_http', {})
183         output_config.setdefault('dispatcher_file', {})
184         output_config.setdefault('dispatcher_influxdb', {})
185         output_config.setdefault('nsb', {})
186
187     def _set_output_config(self, output_config, file_path):
188         try:
189             out_type = os.environ['DISPATCHER']
190         except KeyError:
191             output_config['DEFAULT'].setdefault('dispatcher', 'file')
192         else:
193             output_config['DEFAULT']['dispatcher'] = out_type
194
195         output_config['dispatcher_file']['file_path'] = file_path
196
197         try:
198             target = os.environ['TARGET']
199         except KeyError:
200             pass
201         else:
202             k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
203             output_config[k]['target'] = target
204
205     def _get_format_result(self, testcases):
206         criteria = self._get_task_criteria(testcases)
207
208         info = {
209             'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
210             'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
211             'pod_name': os.environ.get('NODE_NAME', 'unknown'),
212             'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
213         }
214
215         result = {
216             'status': 1,
217             'result': {
218                 'criteria': criteria,
219                 'task_id': self.task_id,
220                 'info': info,
221                 'testcases': testcases
222             }
223         }
224
225         return result
226
227     def _get_task_criteria(self, testcases):
228         criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
229         if criteria:
230             return 'FAIL'
231         else:
232             return 'PASS'
233
234     def _do_output(self, output_config, result):
235         dispatchers = DispatcherBase.get(output_config)
236
237         for dispatcher in dispatchers:
238             dispatcher.flush_result_data(result)
239
240     def _run(self, scenarios, run_in_parallel, output_file):
241         """Deploys context and calls runners"""
242         for context in self.contexts:
243             context.deploy()
244
245         background_runners = []
246
247         result = []
248         # Start all background scenarios
249         for scenario in filter(_is_background_scenario, scenarios):
250             scenario["runner"] = dict(type="Duration", duration=1000000000)
251             runner = self.run_one_scenario(scenario, output_file)
252             background_runners.append(runner)
253
254         runners = []
255         if run_in_parallel:
256             for scenario in scenarios:
257                 if not _is_background_scenario(scenario):
258                     runner = self.run_one_scenario(scenario, output_file)
259                     runners.append(runner)
260
261             # Wait for runners to finish
262             for runner in runners:
263                 status = runner_join(runner, self.outputs, result)
264                 if status != 0:
265                     raise RuntimeError(
266                         "{0} runner status {1}".format(runner.__execution_type__, status))
267                 LOG.info("Runner ended, output in %s", output_file)
268         else:
269             # run serially
270             for scenario in scenarios:
271                 if not _is_background_scenario(scenario):
272                     runner = self.run_one_scenario(scenario, output_file)
273                     status = runner_join(runner, self.outputs, result)
274                     if status != 0:
275                         LOG.error('Scenario NO.%s: "%s" ERROR!',
276                                   scenarios.index(scenario) + 1,
277                                   scenario.get('type'))
278                         raise RuntimeError(
279                             "{0} runner status {1}".format(runner.__execution_type__, status))
280                     LOG.info("Runner ended, output in %s", output_file)
281
282         # Abort background runners
283         for runner in background_runners:
284             runner.abort()
285
286         # Wait for background runners to finish
287         for runner in background_runners:
288             status = runner.join(self.outputs, result, JOIN_TIMEOUT)
289             if status is None:
290                 # Nuke if it did not stop nicely
291                 base_runner.Runner.terminate(runner)
292                 runner.join(self.outputs, result, JOIN_TIMEOUT)
293             base_runner.Runner.release(runner)
294
295             print("Background task ended")
296         return result
297
298     def atexit_handler(self):
299         """handler for process termination"""
300         base_runner.Runner.terminate_all()
301
302         if self.contexts:
303             LOG.info("Undeploying all contexts")
304             for context in self.contexts[::-1]:
305                 context.undeploy()
306
307     def _parse_options(self, op):
308         if isinstance(op, dict):
309             return {k: self._parse_options(v) for k, v in op.items()}
310         elif isinstance(op, list):
311             return [self._parse_options(v) for v in op]
312         elif isinstance(op, str):
313             return self.outputs.get(op[1:]) if op.startswith('$') else op
314         else:
315             return op
316
317     def run_one_scenario(self, scenario_cfg, output_file):
318         """run one scenario using context"""
319         runner_cfg = scenario_cfg["runner"]
320         runner_cfg['output_filename'] = output_file
321
322         options = scenario_cfg.get('options', {})
323         scenario_cfg['options'] = self._parse_options(options)
324
325         # TODO support get multi hosts/vms info
326         context_cfg = {}
327         server_name = scenario_cfg.get('options', {}).get('server_name', {})
328
329         def config_context_target(cfg):
330             target = cfg['target']
331             if is_ip_addr(target):
332                 context_cfg['target'] = {"ipaddr": target}
333             else:
334                 context_cfg['target'] = Context.get_server(target)
335                 if self._is_same_context(cfg["host"], target):
336                     context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"]
337                 else:
338                     context_cfg['target']["ipaddr"] = context_cfg['target']["ip"]
339
340         host_name = server_name.get('host', scenario_cfg.get('host'))
341         if host_name:
342             context_cfg['host'] = Context.get_server(host_name)
343
344         for item in [server_name, scenario_cfg]:
345             try:
346                 config_context_target(item)
347             except KeyError:
348                 pass
349             else:
350                 break
351
352         if "targets" in scenario_cfg:
353             ip_list = []
354             for target in scenario_cfg["targets"]:
355                 if is_ip_addr(target):
356                     ip_list.append(target)
357                     context_cfg['target'] = {}
358                 else:
359                     context_cfg['target'] = Context.get_server(target)
360                     if self._is_same_context(scenario_cfg["host"],
361                                              target):
362                         ip_list.append(context_cfg["target"]["private_ip"])
363                     else:
364                         ip_list.append(context_cfg["target"]["ip"])
365             context_cfg['target']['ipaddr'] = ','.join(ip_list)
366
367         if "nodes" in scenario_cfg:
368             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
369             context_cfg["networks"] = get_networks_from_nodes(
370                 context_cfg["nodes"])
371
372         runner = base_runner.Runner.get(runner_cfg)
373
374         LOG.info("Starting runner of type '%s'", runner_cfg["type"])
375         runner.run(scenario_cfg, context_cfg)
376
377         return runner
378
379     def _is_same_context(self, host_attr, target_attr):
380         """check if two servers are in the same heat context
381         host_attr: either a name for a server created by yardstick or a dict
382         with attribute name mapping when using external heat templates
383         target_attr: either a name for a server created by yardstick or a dict
384         with attribute name mapping when using external heat templates
385         """
386         for context in self.contexts:
387             if context.__context_type__ not in {"Heat", "Kubernetes"}:
388                 continue
389
390             host = context._get_server(host_attr)
391             if host is None:
392                 continue
393
394             target = context._get_server(target_attr)
395             if target is None:
396                 return False
397
398             # Both host and target is not None, then they are in the
399             # same heat context.
400             return True
401
402         return False
403
404
405 class TaskParser(object):       # pragma: no cover
406     """Parser for task config files in yaml format"""
407
408     def __init__(self, path):
409         self.path = path
410
411     def _meet_constraint(self, task, cur_pod, cur_installer):
412         if "constraint" in task:
413             constraint = task.get('constraint', None)
414             if constraint is not None:
415                 tc_fit_pod = constraint.get('pod', None)
416                 tc_fit_installer = constraint.get('installer', None)
417                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
418                          cur_pod, cur_installer, constraint)
419                 if (cur_pod is None) or (tc_fit_pod and cur_pod not in tc_fit_pod):
420                     return False
421                 if (cur_installer is None) or (tc_fit_installer and cur_installer
422                                                not in tc_fit_installer):
423                     return False
424         return True
425
426     def _get_task_para(self, task, cur_pod):
427         task_args = task.get('task_args', None)
428         if task_args is not None:
429             task_args = task_args.get(cur_pod, task_args.get('default'))
430         task_args_fnames = task.get('task_args_fnames', None)
431         if task_args_fnames is not None:
432             task_args_fnames = task_args_fnames.get(cur_pod, None)
433         return task_args, task_args_fnames
434
435     def parse_suite(self):
436         """parse the suite file and return a list of task config file paths
437            and lists of optional parameters if present"""
438         LOG.info("\nParsing suite file:%s", self.path)
439
440         try:
441             with open(self.path) as stream:
442                 cfg = yaml_load(stream)
443         except IOError as ioerror:
444             sys.exit(ioerror)
445
446         self._check_schema(cfg["schema"], "suite")
447         LOG.info("\nStarting scenario:%s", cfg["name"])
448
449         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
450         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
451                                       test_cases_dir)
452         if test_cases_dir[-1] != os.sep:
453             test_cases_dir += os.sep
454
455         cur_pod = os.environ.get('NODE_NAME', None)
456         cur_installer = os.environ.get('INSTALLER_TYPE', None)
457
458         valid_task_files = []
459         valid_task_args = []
460         valid_task_args_fnames = []
461
462         for task in cfg["test_cases"]:
463             # 1.check file_name
464             if "file_name" in task:
465                 task_fname = task.get('file_name', None)
466                 if task_fname is None:
467                     continue
468             else:
469                 continue
470             # 2.check constraint
471             if self._meet_constraint(task, cur_pod, cur_installer):
472                 valid_task_files.append(test_cases_dir + task_fname)
473             else:
474                 continue
475             # 3.fetch task parameters
476             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
477             valid_task_args.append(task_args)
478             valid_task_args_fnames.append(task_args_fnames)
479
480         return valid_task_files, valid_task_args, valid_task_args_fnames
481
482     def parse_task(self, task_id, task_args=None, task_args_file=None):
483         """parses the task file and return an context and scenario instances"""
484         LOG.info("Parsing task config: %s", self.path)
485
486         try:
487             kw = {}
488             if task_args_file:
489                 with open(task_args_file) as f:
490                     kw.update(parse_task_args("task_args_file", f.read()))
491             kw.update(parse_task_args("task_args", task_args))
492         except TypeError:
493             raise TypeError()
494
495         try:
496             with open(self.path) as f:
497                 try:
498                     input_task = f.read()
499                     rendered_task = TaskTemplate.render(input_task, **kw)
500                 except Exception as e:
501                     LOG.exception('Failed to render template:\n%s\n', input_task)
502                     raise e
503                 LOG.debug("Input task is:\n%s\n", rendered_task)
504
505                 cfg = yaml_load(rendered_task)
506         except IOError as ioerror:
507             sys.exit(ioerror)
508
509         self._check_schema(cfg["schema"], "task")
510         meet_precondition = self._check_precondition(cfg)
511
512         # TODO: support one or many contexts? Many would simpler and precise
513         # TODO: support hybrid context type
514         if "context" in cfg:
515             context_cfgs = [cfg["context"]]
516         elif "contexts" in cfg:
517             context_cfgs = cfg["contexts"]
518         else:
519             context_cfgs = [{"type": "Dummy"}]
520
521         contexts = []
522         name_suffix = '-{}'.format(task_id[:8])
523         for cfg_attrs in context_cfgs:
524             try:
525                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
526                                                   name_suffix)
527             except KeyError:
528                 pass
529             # default to Heat context because we are testing OpenStack
530             context_type = cfg_attrs.get("type", "Heat")
531             context = Context.get(context_type)
532             context.init(cfg_attrs)
533             contexts.append(context)
534
535         run_in_parallel = cfg.get("run_in_parallel", False)
536
537         # add tc and task id for influxdb extended tags
538         for scenario in cfg["scenarios"]:
539             task_name = os.path.splitext(os.path.basename(self.path))[0]
540             scenario["tc"] = task_name
541             scenario["task_id"] = task_id
542             # embed task path into scenario so we can load other files
543             # relative to task path
544             scenario["task_path"] = os.path.dirname(self.path)
545
546             change_server_name(scenario, name_suffix)
547
548             try:
549                 for node in scenario['nodes']:
550                     scenario['nodes'][node] += name_suffix
551             except KeyError:
552                 pass
553
554         # TODO we need something better here, a class that represent the file
555         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
556
557     def _check_schema(self, cfg_schema, schema_type):
558         """Check if config file is using the correct schema type"""
559
560         if cfg_schema != "yardstick:" + schema_type + ":0.1":
561             sys.exit("error: file %s has unknown schema %s" % (self.path,
562                                                                cfg_schema))
563
564     def _check_precondition(self, cfg):
565         """Check if the environment meet the precondition"""
566
567         if "precondition" in cfg:
568             precondition = cfg["precondition"]
569             installer_type = precondition.get("installer_type", None)
570             deploy_scenarios = precondition.get("deploy_scenarios", None)
571             tc_fit_pods = precondition.get("pod_name", None)
572             installer_type_env = os.environ.get('INSTALL_TYPE', None)
573             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
574             pod_name_env = os.environ.get('NODE_NAME', None)
575
576             LOG.info("installer_type: %s, installer_type_env: %s",
577                      installer_type, installer_type_env)
578             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
579                      deploy_scenarios, deploy_scenario_env)
580             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
581                      tc_fit_pods, pod_name_env)
582             if installer_type and installer_type_env:
583                 if installer_type_env not in installer_type:
584                     return False
585             if deploy_scenarios and deploy_scenario_env:
586                 deploy_scenarios_list = deploy_scenarios.split(',')
587                 for deploy_scenario in deploy_scenarios_list:
588                     if deploy_scenario_env.startswith(deploy_scenario):
589                         return True
590                 return False
591             if tc_fit_pods and pod_name_env:
592                 if pod_name_env not in tc_fit_pods:
593                     return False
594         return True
595
596
597 def is_ip_addr(addr):
598     """check if string addr is an IP address"""
599     try:
600         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
601     except AttributeError:
602         pass
603
604     try:
605         ipaddress.ip_address(addr.encode('utf-8'))
606     except ValueError:
607         return False
608     else:
609         return True
610
611
612 def _is_background_scenario(scenario):
613     if "run_in_background" in scenario:
614         return scenario["run_in_background"]
615     else:
616         return False
617
618
619 def parse_nodes_with_context(scenario_cfg):
620     """parse the 'nodes' fields in scenario """
621     # ensure consistency in node instantiation order
622     return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
623                        for nodename in sorted(scenario_cfg["nodes"]))
624
625
626 def get_networks_from_nodes(nodes):
627     """parse the 'nodes' fields in scenario """
628     networks = {}
629     for node in nodes.values():
630         if not node:
631             continue
632         interfaces = node.get('interfaces', {})
633         for interface in interfaces.values():
634             # vld_id is network_name
635             network_name = interface.get('network_name')
636             if not network_name:
637                 continue
638             network = Context.get_network(network_name)
639             if network:
640                 networks[network['name']] = network
641     return networks
642
643
644 def runner_join(runner, outputs, result):
645     """join (wait for) a runner, exit process at runner failure
646     :param outputs:
647     :type outputs: dict
648     :param result:
649     :type result: list
650     """
651     status = runner.join(outputs, result)
652     base_runner.Runner.release(runner)
653     return status
654
655
656 def print_invalid_header(source_name, args):
657     print("Invalid %(source)s passed:\n\n %(args)s\n"
658           % {"source": source_name, "args": args})
659
660
661 def parse_task_args(src_name, args):
662     if isinstance(args, collections.Mapping):
663         return args
664
665     try:
666         kw = args and yaml_load(args)
667         kw = {} if kw is None else kw
668     except yaml.parser.ParserError as e:
669         print_invalid_header(src_name, args)
670         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
671               % {"source": src_name, "err": e})
672         raise TypeError()
673
674     if not isinstance(kw, dict):
675         print_invalid_header(src_name, args)
676         print("%(src)s had to be dict, actually %(src_type)s\n"
677               % {"src": src_name, "src_type": type(kw)})
678         raise TypeError()
679     return kw
680
681
682 def change_server_name(scenario, suffix):
683
684     def add_suffix(cfg, key):
685         try:
686             value = cfg[key]
687         except KeyError:
688             pass
689         else:
690             try:
691                 value['name'] += suffix
692             except TypeError:
693                 cfg[key] += suffix
694
695     server_name = scenario.get('options', {}).get('server_name', {})
696
697     add_suffix(scenario, 'host')
698     add_suffix(scenario, 'target')
699     add_suffix(server_name, 'host')
700     add_suffix(server_name, 'target')
701
702     try:
703         key = 'targets'
704         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
705     except KeyError:
706         pass