5a006f2b24184c778f6049fd4058707603999042
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 from six.moves import filter
24
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.task_template import TaskTemplate
28 from yardstick.common.utils import source_env
29 from yardstick.common import utils
30 from yardstick.common import constants
31
32 output_file_default = "/tmp/yardstick.out"
33 config_file = '/etc/yardstick/yardstick.conf'
34 test_cases_dir_default = "tests/opnfv/test_cases/"
35 LOG = logging.getLogger(__name__)
36
37
38 class Task(object):     # pragma: no cover
39     """Task commands.
40
41        Set of commands to manage benchmark tasks.
42     """
43
44     def __init__(self):
45         self.config = {}
46         self.contexts = []
47         self.outputs = {}
48
49     def start(self, args, **kwargs):
50         """Start a benchmark scenario."""
51
52         atexit.register(self.atexit_handler)
53
54         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
55
56         check_environment()
57
58         self.config['yardstick'] = utils.parse_ini_file(config_file)
59
60         total_start_time = time.time()
61         parser = TaskParser(args.inputfile[0])
62
63         if args.suite:
64             # 1.parse suite, return suite_params info
65             task_files, task_args, task_args_fnames = \
66                 parser.parse_suite()
67         else:
68             task_files = [parser.path]
69             task_args = [args.task_args]
70             task_args_fnames = [args.task_args_file]
71
72         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
73                  task_files, task_args, task_args_fnames)
74
75         if args.parse_only:
76             sys.exit(0)
77
78         # parse task_files
79         for i in range(0, len(task_files)):
80             one_task_start_time = time.time()
81             parser.path = task_files[i]
82             scenarios, run_in_parallel, meet_precondition, contexts = \
83                 parser.parse_task(self.task_id, task_args[i],
84                                   task_args_fnames[i])
85
86             self.contexts.extend(contexts)
87
88             if not meet_precondition:
89                 LOG.info("meet_precondition is %s, please check envrionment",
90                          meet_precondition)
91                 continue
92
93             self._run(scenarios, run_in_parallel, args.output_file)
94
95             if args.keep_deploy:
96                 # keep deployment, forget about stack
97                 # (hide it for exit handler)
98                 self.contexts = []
99             else:
100                 for context in self.contexts[::-1]:
101                     context.undeploy()
102                 self.contexts = []
103             one_task_end_time = time.time()
104             LOG.info("task %s finished in %d secs", task_files[i],
105                      one_task_end_time - one_task_start_time)
106
107         total_end_time = time.time()
108         LOG.info("total finished in %d secs",
109                  total_end_time - total_start_time)
110
111         scenario = scenarios[0]
112         print("To generate report execute => yardstick report generate ",
113               scenario['task_id'], scenario['tc'])
114
115         print("Done, exiting")
116
117     def _run(self, scenarios, run_in_parallel, output_file):
118         """Deploys context and calls runners"""
119         for context in self.contexts:
120             context.deploy()
121
122         background_runners = []
123
124         # Start all background scenarios
125         for scenario in filter(_is_background_scenario, scenarios):
126             scenario["runner"] = dict(type="Duration", duration=1000000000)
127             runner = self.run_one_scenario(scenario, output_file)
128             background_runners.append(runner)
129
130         runners = []
131         if run_in_parallel:
132             for scenario in scenarios:
133                 if not _is_background_scenario(scenario):
134                     runner = self.run_one_scenario(scenario, output_file)
135                     runners.append(runner)
136
137             # Wait for runners to finish
138             for runner in runners:
139                 runner_join(runner)
140                 self.outputs.update(runner.get_output())
141                 print("Runner ended, output in", output_file)
142         else:
143             # run serially
144             for scenario in scenarios:
145                 if not _is_background_scenario(scenario):
146                     runner = self.run_one_scenario(scenario, output_file)
147                     runner_join(runner)
148                     self.outputs.update(runner.get_output())
149                     print("Runner ended, output in", output_file)
150
151         # Abort background runners
152         for runner in background_runners:
153             runner.abort()
154
155         # Wait for background runners to finish
156         for runner in background_runners:
157             if runner.join(timeout=60) is None:
158                 # Nuke if it did not stop nicely
159                 base_runner.Runner.terminate(runner)
160                 runner_join(runner)
161                 self.outputs.update(runner.get_output())
162             else:
163                 base_runner.Runner.release(runner)
164             print("Background task ended")
165
166     def atexit_handler(self):
167         """handler for process termination"""
168         base_runner.Runner.terminate_all()
169
170         if self.contexts:
171             print("Undeploying all contexts")
172             for context in self.contexts[::-1]:
173                 context.undeploy()
174
175     def _parse_options(self, op):
176         if isinstance(op, dict):
177             return {k: self._parse_options(v) for k, v in op.items()}
178         elif isinstance(op, list):
179             return [self._parse_options(v) for v in op]
180         elif isinstance(op, str):
181             return self.outputs.get(op[1:]) if op.startswith('$') else op
182         else:
183             return op
184
185     def run_one_scenario(self, scenario_cfg, output_file):
186         """run one scenario using context"""
187         runner_cfg = scenario_cfg["runner"]
188         runner_cfg['output_filename'] = output_file
189
190         options = scenario_cfg.get('options', {})
191         scenario_cfg['options'] = self._parse_options(options)
192
193         # TODO support get multi hosts/vms info
194         context_cfg = {}
195         if "host" in scenario_cfg:
196             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
197
198         if "target" in scenario_cfg:
199             if is_ip_addr(scenario_cfg["target"]):
200                 context_cfg['target'] = {}
201                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
202             else:
203                 context_cfg['target'] = Context.get_server(
204                     scenario_cfg["target"])
205                 if self._is_same_heat_context(scenario_cfg["host"],
206                                               scenario_cfg["target"]):
207                     context_cfg["target"]["ipaddr"] = \
208                         context_cfg["target"]["private_ip"]
209                 else:
210                     context_cfg["target"]["ipaddr"] = \
211                         context_cfg["target"]["ip"]
212
213         if "targets" in scenario_cfg:
214             ip_list = []
215             for target in scenario_cfg["targets"]:
216                 if is_ip_addr(target):
217                     ip_list.append(target)
218                     context_cfg['target'] = {}
219                 else:
220                     context_cfg['target'] = Context.get_server(target)
221                     if self._is_same_heat_context(scenario_cfg["host"],
222                                                   target):
223                         ip_list.append(context_cfg["target"]["private_ip"])
224                     else:
225                         ip_list.append(context_cfg["target"]["ip"])
226             context_cfg['target']['ipaddr'] = ','.join(ip_list)
227
228         if "nodes" in scenario_cfg:
229             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
230         runner = base_runner.Runner.get(runner_cfg, self.config)
231
232         print("Starting runner of type '%s'" % runner_cfg["type"])
233         runner.run(scenario_cfg, context_cfg)
234
235         return runner
236
237     def _is_same_heat_context(self, host_attr, target_attr):
238         """check if two servers are in the same heat context
239         host_attr: either a name for a server created by yardstick or a dict
240         with attribute name mapping when using external heat templates
241         target_attr: either a name for a server created by yardstick or a dict
242         with attribute name mapping when using external heat templates
243         """
244         host = None
245         target = None
246         for context in self.contexts:
247             if context.__context_type__ != "Heat":
248                 continue
249
250             host = context._get_server(host_attr)
251             if host is None:
252                 continue
253
254             target = context._get_server(target_attr)
255             if target is None:
256                 return False
257
258             # Both host and target is not None, then they are in the
259             # same heat context.
260             return True
261
262         return False
263
264
265 class TaskParser(object):       # pragma: no cover
266     """Parser for task config files in yaml format"""
267
268     def __init__(self, path):
269         self.path = path
270
271     def _meet_constraint(self, task, cur_pod, cur_installer):
272         if "constraint" in task:
273             constraint = task.get('constraint', None)
274             if constraint is not None:
275                 tc_fit_pod = constraint.get('pod', None)
276                 tc_fit_installer = constraint.get('installer', None)
277                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
278                          cur_pod, cur_installer, constraint)
279                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
280                     return False
281                 if cur_installer and tc_fit_installer and \
282                         cur_installer not in tc_fit_installer:
283                     return False
284         return True
285
286     def _get_task_para(self, task, cur_pod):
287         task_args = task.get('task_args', None)
288         if task_args is not None:
289             task_args = task_args.get(cur_pod, None)
290         task_args_fnames = task.get('task_args_fnames', None)
291         if task_args_fnames is not None:
292             task_args_fnames = task_args_fnames.get(cur_pod, None)
293         return task_args, task_args_fnames
294
295     def parse_suite(self):
296         """parse the suite file and return a list of task config file paths
297            and lists of optional parameters if present"""
298         LOG.info("\nParsing suite file:%s", self.path)
299
300         try:
301             with open(self.path) as stream:
302                 cfg = yaml.load(stream)
303         except IOError as ioerror:
304             sys.exit(ioerror)
305
306         self._check_schema(cfg["schema"], "suite")
307         LOG.info("\nStarting scenario:%s", cfg["name"])
308
309         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
310         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
311                                       test_cases_dir)
312         if test_cases_dir[-1] != os.sep:
313             test_cases_dir += os.sep
314
315         cur_pod = os.environ.get('NODE_NAME', None)
316         cur_installer = os.environ.get('INSTALLER_TYPE', None)
317
318         valid_task_files = []
319         valid_task_args = []
320         valid_task_args_fnames = []
321
322         for task in cfg["test_cases"]:
323             # 1.check file_name
324             if "file_name" in task:
325                 task_fname = task.get('file_name', None)
326                 if task_fname is None:
327                     continue
328             else:
329                 continue
330             # 2.check constraint
331             if self._meet_constraint(task, cur_pod, cur_installer):
332                 valid_task_files.append(test_cases_dir + task_fname)
333             else:
334                 continue
335             # 3.fetch task parameters
336             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
337             valid_task_args.append(task_args)
338             valid_task_args_fnames.append(task_args_fnames)
339
340         return valid_task_files, valid_task_args, valid_task_args_fnames
341
342     def parse_task(self, task_id, task_args=None, task_args_file=None):
343         """parses the task file and return an context and scenario instances"""
344         print("Parsing task config:", self.path)
345
346         try:
347             kw = {}
348             if task_args_file:
349                 with open(task_args_file) as f:
350                     kw.update(parse_task_args("task_args_file", f.read()))
351             kw.update(parse_task_args("task_args", task_args))
352         except TypeError:
353             raise TypeError()
354
355         try:
356             with open(self.path) as f:
357                 try:
358                     input_task = f.read()
359                     rendered_task = TaskTemplate.render(input_task, **kw)
360                 except Exception as e:
361                     print("Failed to render template:\n%(task)s\n%(err)s\n"
362                           % {"task": input_task, "err": e})
363                     raise e
364                 print("Input task is:\n%s\n" % rendered_task)
365
366                 cfg = yaml.load(rendered_task)
367         except IOError as ioerror:
368             sys.exit(ioerror)
369
370         self._check_schema(cfg["schema"], "task")
371         meet_precondition = self._check_precondition(cfg)
372
373         # TODO: support one or many contexts? Many would simpler and precise
374         # TODO: support hybrid context type
375         if "context" in cfg:
376             context_cfgs = [cfg["context"]]
377         elif "contexts" in cfg:
378             context_cfgs = cfg["contexts"]
379         else:
380             context_cfgs = [{"type": "Dummy"}]
381
382         contexts = []
383         name_suffix = '-{}'.format(task_id[:8])
384         for cfg_attrs in context_cfgs:
385             try:
386                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
387                                                   name_suffix)
388             except KeyError:
389                 pass
390             # default to Heat context because we are testing OpenStack
391             context_type = cfg_attrs.get("type", "Heat")
392             context = Context.get(context_type)
393             context.init(cfg_attrs)
394             contexts.append(context)
395
396         run_in_parallel = cfg.get("run_in_parallel", False)
397
398         # add tc and task id for influxdb extended tags
399         for scenario in cfg["scenarios"]:
400             task_name = os.path.splitext(os.path.basename(self.path))[0]
401             scenario["tc"] = task_name
402             scenario["task_id"] = task_id
403             # embed task path into scenario so we can load other files
404             # relative to task path
405             scenario["task_path"] = os.path.dirname(self.path)
406
407             change_server_name(scenario, name_suffix)
408
409             try:
410                 for node in scenario['nodes']:
411                     scenario['nodes'][node] += name_suffix
412             except KeyError:
413                 pass
414
415         # TODO we need something better here, a class that represent the file
416         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
417
418     def _check_schema(self, cfg_schema, schema_type):
419         """Check if config file is using the correct schema type"""
420
421         if cfg_schema != "yardstick:" + schema_type + ":0.1":
422             sys.exit("error: file %s has unknown schema %s" % (self.path,
423                                                                cfg_schema))
424
425     def _check_precondition(self, cfg):
426         """Check if the envrionment meet the preconditon"""
427
428         if "precondition" in cfg:
429             precondition = cfg["precondition"]
430             installer_type = precondition.get("installer_type", None)
431             deploy_scenarios = precondition.get("deploy_scenarios", None)
432             tc_fit_pods = precondition.get("pod_name", None)
433             installer_type_env = os.environ.get('INSTALL_TYPE', None)
434             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
435             pod_name_env = os.environ.get('NODE_NAME', None)
436
437             LOG.info("installer_type: %s, installer_type_env: %s",
438                      installer_type, installer_type_env)
439             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
440                      deploy_scenarios, deploy_scenario_env)
441             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
442                      tc_fit_pods, pod_name_env)
443             if installer_type and installer_type_env:
444                 if installer_type_env not in installer_type:
445                     return False
446             if deploy_scenarios and deploy_scenario_env:
447                 deploy_scenarios_list = deploy_scenarios.split(',')
448                 for deploy_scenario in deploy_scenarios_list:
449                     if deploy_scenario_env.startswith(deploy_scenario):
450                         return True
451                 return False
452             if tc_fit_pods and pod_name_env:
453                 if pod_name_env not in tc_fit_pods:
454                     return False
455         return True
456
457
458 def is_ip_addr(addr):
459     """check if string addr is an IP address"""
460     try:
461         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
462     except AttributeError:
463         pass
464
465     try:
466         ipaddress.ip_address(addr.encode('utf-8'))
467     except ValueError:
468         return False
469     else:
470         return True
471
472
473 def _is_background_scenario(scenario):
474     if "run_in_background" in scenario:
475         return scenario["run_in_background"]
476     else:
477         return False
478
479
480 def parse_nodes_with_context(scenario_cfg):
481     """paras the 'nodes' fields in scenario """
482     nodes = scenario_cfg["nodes"]
483
484     nodes_cfg = {}
485     for nodename in nodes:
486         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
487
488     return nodes_cfg
489
490
491 def runner_join(runner):
492     """join (wait for) a runner, exit process at runner failure"""
493     status = runner.join()
494     base_runner.Runner.release(runner)
495     if status != 0:
496         sys.exit("Runner failed")
497
498
499 def print_invalid_header(source_name, args):
500     print("Invalid %(source)s passed:\n\n %(args)s\n"
501           % {"source": source_name, "args": args})
502
503
504 def parse_task_args(src_name, args):
505     try:
506         kw = args and yaml.safe_load(args)
507         kw = {} if kw is None else kw
508     except yaml.parser.ParserError as e:
509         print_invalid_header(src_name, args)
510         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
511               % {"source": src_name, "err": e})
512         raise TypeError()
513
514     if not isinstance(kw, dict):
515         print_invalid_header(src_name, args)
516         print("%(src)s had to be dict, actually %(src_type)s\n"
517               % {"src": src_name, "src_type": type(kw)})
518         raise TypeError()
519     return kw
520
521
522 def check_environment():
523     auth_url = os.environ.get('OS_AUTH_URL', None)
524     if not auth_url:
525         try:
526             source_env(constants.OPENRC)
527         except IOError as e:
528             if e.errno != errno.EEXIST:
529                 raise
530             LOG.debug('OPENRC file not found')
531
532
533 def change_server_name(scenario, suffix):
534     try:
535         host = scenario['host']
536     except KeyError:
537         pass
538     else:
539         try:
540             host['name'] += suffix
541         except TypeError:
542             scenario['host'] += suffix
543
544     try:
545         target = scenario['target']
546     except KeyError:
547         pass
548     else:
549         try:
550             target['name'] += suffix
551         except TypeError:
552             scenario['target'] += suffix
553
554     try:
555         key = 'targets'
556         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
557     except KeyError:
558         pass