c44081b7328b622ce3b7153cff67b8390a4a1099
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 from six.moves import filter
24
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.task_template import TaskTemplate
28 from yardstick.common.utils import source_env
29 from yardstick.common import utils
30 from yardstick.common import constants
31
32 output_file_default = "/tmp/yardstick.out"
33 config_file = '/etc/yardstick/yardstick.conf'
34 test_cases_dir_default = "tests/opnfv/test_cases/"
35 LOG = logging.getLogger(__name__)
36
37
38 class Task(object):     # pragma: no cover
39     """Task commands.
40
41        Set of commands to manage benchmark tasks.
42     """
43
44     def __init__(self):
45         self.config = {}
46         self.contexts = []
47         self.outputs = {}
48
49     def start(self, args, **kwargs):
50         """Start a benchmark scenario."""
51
52         atexit.register(self.atexit_handler)
53
54         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
55
56         check_environment()
57
58         self.config['yardstick'] = utils.parse_ini_file(config_file)
59
60         total_start_time = time.time()
61         parser = TaskParser(args.inputfile[0])
62
63         if args.suite:
64             # 1.parse suite, return suite_params info
65             task_files, task_args, task_args_fnames = \
66                 parser.parse_suite()
67         else:
68             task_files = [parser.path]
69             task_args = [args.task_args]
70             task_args_fnames = [args.task_args_file]
71
72         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
73                  task_files, task_args, task_args_fnames)
74
75         if args.parse_only:
76             sys.exit(0)
77
78         # parse task_files
79         for i in range(0, len(task_files)):
80             one_task_start_time = time.time()
81             parser.path = task_files[i]
82             scenarios, run_in_parallel, meet_precondition, contexts = \
83                 parser.parse_task(self.task_id, task_args[i],
84                                   task_args_fnames[i])
85
86             self.contexts.extend(contexts)
87
88             if not meet_precondition:
89                 LOG.info("meet_precondition is %s, please check envrionment",
90                          meet_precondition)
91                 continue
92
93             self._run(scenarios, run_in_parallel, args.output_file)
94
95             if args.keep_deploy:
96                 # keep deployment, forget about stack
97                 # (hide it for exit handler)
98                 self.contexts = []
99             else:
100                 for context in self.contexts[::-1]:
101                     context.undeploy()
102                 self.contexts = []
103             one_task_end_time = time.time()
104             LOG.info("task %s finished in %d secs", task_files[i],
105                      one_task_end_time - one_task_start_time)
106
107         total_end_time = time.time()
108         LOG.info("total finished in %d secs",
109                  total_end_time - total_start_time)
110
111         scenario = scenarios[0]
112         print("To generate report execute => yardstick report generate ",
113               scenario['task_id'], scenario['tc'])
114
115         print("Done, exiting")
116
117     def _run(self, scenarios, run_in_parallel, output_file):
118         """Deploys context and calls runners"""
119         for context in self.contexts:
120             context.deploy()
121
122         background_runners = []
123
124         # Start all background scenarios
125         for scenario in filter(_is_background_scenario, scenarios):
126             scenario["runner"] = dict(type="Duration", duration=1000000000)
127             runner = self.run_one_scenario(scenario, output_file)
128             background_runners.append(runner)
129
130         runners = []
131         if run_in_parallel:
132             for scenario in scenarios:
133                 if not _is_background_scenario(scenario):
134                     runner = self.run_one_scenario(scenario, output_file)
135                     runners.append(runner)
136
137             # Wait for runners to finish
138             for runner in runners:
139                 runner_join(runner)
140                 self.outputs.update(runner.get_output())
141                 print("Runner ended, output in", output_file)
142         else:
143             # run serially
144             for scenario in scenarios:
145                 if not _is_background_scenario(scenario):
146                     runner = self.run_one_scenario(scenario, output_file)
147                     runner_join(runner)
148                     self.outputs.update(runner.get_output())
149                     print("Runner ended, output in", output_file)
150
151         # Abort background runners
152         for runner in background_runners:
153             runner.abort()
154
155         # Wait for background runners to finish
156         for runner in background_runners:
157             if runner.join(timeout=60) is None:
158                 # Nuke if it did not stop nicely
159                 base_runner.Runner.terminate(runner)
160                 runner_join(runner)
161                 self.outputs.update(runner.get_output())
162             else:
163                 base_runner.Runner.release(runner)
164             print("Background task ended")
165
166     def atexit_handler(self):
167         """handler for process termination"""
168         base_runner.Runner.terminate_all()
169
170         if self.contexts:
171             print("Undeploying all contexts")
172             for context in self.contexts[::-1]:
173                 context.undeploy()
174
175     def _parse_options(self, op):
176         if isinstance(op, dict):
177             return {k: self._parse_options(v) for k, v in op.items()}
178         elif isinstance(op, list):
179             return [self._parse_options(v) for v in op]
180         elif isinstance(op, str):
181             return self.outputs.get(op[1:]) if op.startswith('$') else op
182         else:
183             return op
184
185     def run_one_scenario(self, scenario_cfg, output_file):
186         """run one scenario using context"""
187         runner_cfg = scenario_cfg["runner"]
188         runner_cfg['output_filename'] = output_file
189
190         options = scenario_cfg.get('options', {})
191         scenario_cfg['options'] = self._parse_options(options)
192
193         # TODO support get multi hosts/vms info
194         context_cfg = {}
195         if "host" in scenario_cfg:
196             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
197
198         if "target" in scenario_cfg:
199             if is_ip_addr(scenario_cfg["target"]):
200                 context_cfg['target'] = {}
201                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
202             else:
203                 context_cfg['target'] = Context.get_server(
204                     scenario_cfg["target"])
205                 if self._is_same_heat_context(scenario_cfg["host"],
206                                               scenario_cfg["target"]):
207                     context_cfg["target"]["ipaddr"] = \
208                         context_cfg["target"]["private_ip"]
209                 else:
210                     context_cfg["target"]["ipaddr"] = \
211                         context_cfg["target"]["ip"]
212
213         if "targets" in scenario_cfg:
214             ip_list = []
215             for target in scenario_cfg["targets"]:
216                 if is_ip_addr(target):
217                     ip_list.append(target)
218                     context_cfg['target'] = {}
219                 else:
220                     context_cfg['target'] = Context.get_server(target)
221                     if self._is_same_heat_context(scenario_cfg["host"],
222                                                   target):
223                         ip_list.append(context_cfg["target"]["private_ip"])
224                     else:
225                         ip_list.append(context_cfg["target"]["ip"])
226             context_cfg['target']['ipaddr'] = ','.join(ip_list)
227
228         if "nodes" in scenario_cfg:
229             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
230         runner = base_runner.Runner.get(runner_cfg, self.config)
231
232         print("Starting runner of type '%s'" % runner_cfg["type"])
233         runner.run(scenario_cfg, context_cfg)
234
235         return runner
236
237     def _is_same_heat_context(self, host_attr, target_attr):
238         """check if two servers are in the same heat context
239         host_attr: either a name for a server created by yardstick or a dict
240         with attribute name mapping when using external heat templates
241         target_attr: either a name for a server created by yardstick or a dict
242         with attribute name mapping when using external heat templates
243         """
244         host = None
245         target = None
246         for context in self.contexts:
247             if context.__context_type__ != "Heat":
248                 continue
249
250             host = context._get_server(host_attr)
251             if host is None:
252                 continue
253
254             target = context._get_server(target_attr)
255             if target is None:
256                 return False
257
258             # Both host and target is not None, then they are in the
259             # same heat context.
260             return True
261
262         return False
263
264
265 class TaskParser(object):       # pragma: no cover
266     """Parser for task config files in yaml format"""
267
268     def __init__(self, path):
269         self.path = path
270
271     def _meet_constraint(self, task, cur_pod, cur_installer):
272         if "constraint" in task:
273             constraint = task.get('constraint', None)
274             if constraint is not None:
275                 tc_fit_pod = constraint.get('pod', None)
276                 tc_fit_installer = constraint.get('installer', None)
277                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
278                          cur_pod, cur_installer, constraint)
279                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
280                     return False
281                 if cur_installer and tc_fit_installer and \
282                         cur_installer not in tc_fit_installer:
283                     return False
284         return True
285
286     def _get_task_para(self, task, cur_pod):
287         task_args = task.get('task_args', None)
288         if task_args is not None:
289             task_args = task_args.get(cur_pod, None)
290         task_args_fnames = task.get('task_args_fnames', None)
291         if task_args_fnames is not None:
292             task_args_fnames = task_args_fnames.get(cur_pod, None)
293         return task_args, task_args_fnames
294
295     def parse_suite(self):
296         """parse the suite file and return a list of task config file paths
297            and lists of optional parameters if present"""
298         LOG.info("\nParsing suite file:%s", self.path)
299
300         try:
301             with open(self.path) as stream:
302                 cfg = yaml.load(stream)
303         except IOError as ioerror:
304             sys.exit(ioerror)
305
306         self._check_schema(cfg["schema"], "suite")
307         LOG.info("\nStarting scenario:%s", cfg["name"])
308
309         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
310         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
311                                       test_cases_dir)
312         if test_cases_dir[-1] != os.sep:
313             test_cases_dir += os.sep
314
315         cur_pod = os.environ.get('NODE_NAME', None)
316         cur_installer = os.environ.get('INSTALLER_TYPE', None)
317
318         valid_task_files = []
319         valid_task_args = []
320         valid_task_args_fnames = []
321
322         for task in cfg["test_cases"]:
323             # 1.check file_name
324             if "file_name" in task:
325                 task_fname = task.get('file_name', None)
326                 if task_fname is None:
327                     continue
328             else:
329                 continue
330             # 2.check constraint
331             if self._meet_constraint(task, cur_pod, cur_installer):
332                 valid_task_files.append(test_cases_dir + task_fname)
333             else:
334                 continue
335             # 3.fetch task parameters
336             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
337             valid_task_args.append(task_args)
338             valid_task_args_fnames.append(task_args_fnames)
339
340         return valid_task_files, valid_task_args, valid_task_args_fnames
341
342     def parse_task(self, task_id, task_args=None, task_args_file=None):
343         """parses the task file and return an context and scenario instances"""
344         print("Parsing task config:", self.path)
345
346         try:
347             kw = {}
348             if task_args_file:
349                 with open(task_args_file) as f:
350                     kw.update(parse_task_args("task_args_file", f.read()))
351             kw.update(parse_task_args("task_args", task_args))
352         except TypeError:
353             raise TypeError()
354
355         try:
356             with open(self.path) as f:
357                 try:
358                     input_task = f.read()
359                     rendered_task = TaskTemplate.render(input_task, **kw)
360                 except Exception as e:
361                     print("Failed to render template:\n%(task)s\n%(err)s\n"
362                           % {"task": input_task, "err": e})
363                     raise e
364                 print("Input task is:\n%s\n" % rendered_task)
365
366                 cfg = yaml.load(rendered_task)
367         except IOError as ioerror:
368             sys.exit(ioerror)
369
370         self._check_schema(cfg["schema"], "task")
371         meet_precondition = self._check_precondition(cfg)
372
373         # TODO: support one or many contexts? Many would simpler and precise
374         # TODO: support hybrid context type
375         if "context" in cfg:
376             context_cfgs = [cfg["context"]]
377         elif "contexts" in cfg:
378             context_cfgs = cfg["contexts"]
379         else:
380             context_cfgs = [{"type": "Dummy"}]
381
382         contexts = []
383         name_suffix = '-{}'.format(task_id[:8])
384         for cfg_attrs in context_cfgs:
385             try:
386                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
387                                                   name_suffix)
388             except KeyError:
389                 pass
390             # default to Heat context because we are testing OpenStack
391             context_type = cfg_attrs.get("type", "Heat")
392             context = Context.get(context_type)
393             context.init(cfg_attrs)
394             contexts.append(context)
395
396         run_in_parallel = cfg.get("run_in_parallel", False)
397
398         # add tc and task id for influxdb extended tags
399         for scenario in cfg["scenarios"]:
400             task_name = os.path.splitext(os.path.basename(self.path))[0]
401             scenario["tc"] = task_name
402             scenario["task_id"] = task_id
403
404             change_server_name(scenario, name_suffix)
405
406             try:
407                 for node in scenario['nodes']:
408                     scenario['nodes'][node] += name_suffix
409             except KeyError:
410                 pass
411
412         # TODO we need something better here, a class that represent the file
413         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
414
415     def _check_schema(self, cfg_schema, schema_type):
416         """Check if config file is using the correct schema type"""
417
418         if cfg_schema != "yardstick:" + schema_type + ":0.1":
419             sys.exit("error: file %s has unknown schema %s" % (self.path,
420                                                                cfg_schema))
421
422     def _check_precondition(self, cfg):
423         """Check if the envrionment meet the preconditon"""
424
425         if "precondition" in cfg:
426             precondition = cfg["precondition"]
427             installer_type = precondition.get("installer_type", None)
428             deploy_scenarios = precondition.get("deploy_scenarios", None)
429             tc_fit_pods = precondition.get("pod_name", None)
430             installer_type_env = os.environ.get('INSTALL_TYPE', None)
431             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
432             pod_name_env = os.environ.get('NODE_NAME', None)
433
434             LOG.info("installer_type: %s, installer_type_env: %s",
435                      installer_type, installer_type_env)
436             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
437                      deploy_scenarios, deploy_scenario_env)
438             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
439                      tc_fit_pods, pod_name_env)
440             if installer_type and installer_type_env:
441                 if installer_type_env not in installer_type:
442                     return False
443             if deploy_scenarios and deploy_scenario_env:
444                 deploy_scenarios_list = deploy_scenarios.split(',')
445                 for deploy_scenario in deploy_scenarios_list:
446                     if deploy_scenario_env.startswith(deploy_scenario):
447                         return True
448                 return False
449             if tc_fit_pods and pod_name_env:
450                 if pod_name_env not in tc_fit_pods:
451                     return False
452         return True
453
454
455 def is_ip_addr(addr):
456     """check if string addr is an IP address"""
457     try:
458         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
459     except AttributeError:
460         pass
461
462     try:
463         ipaddress.ip_address(addr.encode('utf-8'))
464     except ValueError:
465         return False
466     else:
467         return True
468
469
470 def _is_background_scenario(scenario):
471     if "run_in_background" in scenario:
472         return scenario["run_in_background"]
473     else:
474         return False
475
476
477 def parse_nodes_with_context(scenario_cfg):
478     """paras the 'nodes' fields in scenario """
479     nodes = scenario_cfg["nodes"]
480
481     nodes_cfg = {}
482     for nodename in nodes:
483         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
484
485     return nodes_cfg
486
487
488 def runner_join(runner):
489     """join (wait for) a runner, exit process at runner failure"""
490     status = runner.join()
491     base_runner.Runner.release(runner)
492     if status != 0:
493         sys.exit("Runner failed")
494
495
496 def print_invalid_header(source_name, args):
497     print("Invalid %(source)s passed:\n\n %(args)s\n"
498           % {"source": source_name, "args": args})
499
500
501 def parse_task_args(src_name, args):
502     try:
503         kw = args and yaml.safe_load(args)
504         kw = {} if kw is None else kw
505     except yaml.parser.ParserError as e:
506         print_invalid_header(src_name, args)
507         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
508               % {"source": src_name, "err": e})
509         raise TypeError()
510
511     if not isinstance(kw, dict):
512         print_invalid_header(src_name, args)
513         print("%(src)s had to be dict, actually %(src_type)s\n"
514               % {"src": src_name, "src_type": type(kw)})
515         raise TypeError()
516     return kw
517
518
519 def check_environment():
520     auth_url = os.environ.get('OS_AUTH_URL', None)
521     if not auth_url:
522         try:
523             source_env(constants.OPENRC)
524         except IOError as e:
525             if e.errno != errno.EEXIST:
526                 raise
527             LOG.debug('OPENRC file not found')
528
529
530 def change_server_name(scenario, suffix):
531     try:
532         host = scenario['host']
533     except KeyError:
534         pass
535     else:
536         try:
537             host['name'] += suffix
538         except TypeError:
539             scenario['host'] += suffix
540
541     try:
542         target = scenario['target']
543     except KeyError:
544         pass
545     else:
546         try:
547             target['name'] += suffix
548         except TypeError:
549             scenario['target'] += suffix
550
551     try:
552         key = 'targets'
553         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
554     except KeyError:
555         pass