7eb9a1f292c0a545de40a200dfd86ee087478e1d
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 from six.moves import filter
24
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.task_template import TaskTemplate
28 from yardstick.common.utils import source_env
29 from yardstick.common import utils
30 from yardstick.common import constants
31
32 output_file_default = "/tmp/yardstick.out"
33 config_file = '/etc/yardstick/yardstick.conf'
34 test_cases_dir_default = "tests/opnfv/test_cases/"
35 LOG = logging.getLogger(__name__)
36
37
38 class Task(object):     # pragma: no cover
39     """Task commands.
40
41        Set of commands to manage benchmark tasks.
42     """
43
44     def __init__(self):
45         self.config = {}
46         self.contexts = []
47
48     def start(self, args, **kwargs):
49         """Start a benchmark scenario."""
50
51         atexit.register(self.atexit_handler)
52
53         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
54
55         check_environment()
56
57         self.config['yardstick'] = utils.parse_ini_file(config_file)
58
59         total_start_time = time.time()
60         parser = TaskParser(args.inputfile[0])
61
62         if args.suite:
63             # 1.parse suite, return suite_params info
64             task_files, task_args, task_args_fnames = \
65                 parser.parse_suite()
66         else:
67             task_files = [parser.path]
68             task_args = [args.task_args]
69             task_args_fnames = [args.task_args_file]
70
71         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
72                  task_files, task_args, task_args_fnames)
73
74         if args.parse_only:
75             sys.exit(0)
76
77         # parse task_files
78         for i in range(0, len(task_files)):
79             one_task_start_time = time.time()
80             parser.path = task_files[i]
81             scenarios, run_in_parallel, meet_precondition, contexts = \
82                 parser.parse_task(self.task_id, task_args[i],
83                                   task_args_fnames[i])
84
85             self.contexts.extend(contexts)
86
87             if not meet_precondition:
88                 LOG.info("meet_precondition is %s, please check envrionment",
89                          meet_precondition)
90                 continue
91
92             self._run(scenarios, run_in_parallel, args.output_file)
93
94             if args.keep_deploy:
95                 # keep deployment, forget about stack
96                 # (hide it for exit handler)
97                 self.contexts = []
98             else:
99                 for context in self.contexts[::-1]:
100                     context.undeploy()
101                 self.contexts = []
102             one_task_end_time = time.time()
103             LOG.info("task %s finished in %d secs", task_files[i],
104                      one_task_end_time - one_task_start_time)
105
106         total_end_time = time.time()
107         LOG.info("total finished in %d secs",
108                  total_end_time - total_start_time)
109
110         scenario = scenarios[0]
111         print("To generate report execute => yardstick report generate ",
112               scenario['task_id'], scenario['tc'])
113
114         print("Done, exiting")
115
116     def _run(self, scenarios, run_in_parallel, output_file):
117         """Deploys context and calls runners"""
118         for context in self.contexts:
119             context.deploy()
120
121         background_runners = []
122
123         # Start all background scenarios
124         for scenario in filter(_is_background_scenario, scenarios):
125             scenario["runner"] = dict(type="Duration", duration=1000000000)
126             runner = self.run_one_scenario(scenario, output_file)
127             background_runners.append(runner)
128
129         runners = []
130         if run_in_parallel:
131             for scenario in scenarios:
132                 if not _is_background_scenario(scenario):
133                     runner = self.run_one_scenario(scenario, output_file)
134                     runners.append(runner)
135
136             # Wait for runners to finish
137             for runner in runners:
138                 runner_join(runner)
139                 print("Runner ended, output in", output_file)
140         else:
141             # run serially
142             for scenario in scenarios:
143                 if not _is_background_scenario(scenario):
144                     runner = self.run_one_scenario(scenario, output_file)
145                     runner_join(runner)
146                     print("Runner ended, output in", output_file)
147
148         # Abort background runners
149         for runner in background_runners:
150             runner.abort()
151
152         # Wait for background runners to finish
153         for runner in background_runners:
154             if runner.join(timeout=60) is None:
155                 # Nuke if it did not stop nicely
156                 base_runner.Runner.terminate(runner)
157                 runner_join(runner)
158             else:
159                 base_runner.Runner.release(runner)
160             print("Background task ended")
161
162     def atexit_handler(self):
163         """handler for process termination"""
164         base_runner.Runner.terminate_all()
165
166         if self.contexts:
167             print("Undeploying all contexts")
168             for context in self.contexts[::-1]:
169                 context.undeploy()
170
171     def run_one_scenario(self, scenario_cfg, output_file):
172         """run one scenario using context"""
173         runner_cfg = scenario_cfg["runner"]
174         runner_cfg['output_filename'] = output_file
175
176         # TODO support get multi hosts/vms info
177         context_cfg = {}
178         if "host" in scenario_cfg:
179             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
180
181         if "target" in scenario_cfg:
182             if is_ip_addr(scenario_cfg["target"]):
183                 context_cfg['target'] = {}
184                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
185             else:
186                 context_cfg['target'] = Context.get_server(
187                     scenario_cfg["target"])
188                 if self._is_same_heat_context(scenario_cfg["host"],
189                                               scenario_cfg["target"]):
190                     context_cfg["target"]["ipaddr"] = \
191                         context_cfg["target"]["private_ip"]
192                 else:
193                     context_cfg["target"]["ipaddr"] = \
194                         context_cfg["target"]["ip"]
195
196         if "targets" in scenario_cfg:
197             ip_list = []
198             for target in scenario_cfg["targets"]:
199                 if is_ip_addr(target):
200                     ip_list.append(target)
201                     context_cfg['target'] = {}
202                 else:
203                     context_cfg['target'] = Context.get_server(target)
204                     if self._is_same_heat_context(scenario_cfg["host"],
205                                                   target):
206                         ip_list.append(context_cfg["target"]["private_ip"])
207                     else:
208                         ip_list.append(context_cfg["target"]["ip"])
209             context_cfg['target']['ipaddr'] = ','.join(ip_list)
210
211         if "nodes" in scenario_cfg:
212             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
213         runner = base_runner.Runner.get(runner_cfg, self.config)
214
215         print("Starting runner of type '%s'" % runner_cfg["type"])
216         runner.run(scenario_cfg, context_cfg)
217
218         return runner
219
220     def _is_same_heat_context(self, host_attr, target_attr):
221         """check if two servers are in the same heat context
222         host_attr: either a name for a server created by yardstick or a dict
223         with attribute name mapping when using external heat templates
224         target_attr: either a name for a server created by yardstick or a dict
225         with attribute name mapping when using external heat templates
226         """
227         host = None
228         target = None
229         for context in self.contexts:
230             if context.__context_type__ != "Heat":
231                 continue
232
233             host = context._get_server(host_attr)
234             if host is None:
235                 continue
236
237             target = context._get_server(target_attr)
238             if target is None:
239                 return False
240
241             # Both host and target is not None, then they are in the
242             # same heat context.
243             return True
244
245         return False
246
247
248 class TaskParser(object):       # pragma: no cover
249     """Parser for task config files in yaml format"""
250
251     def __init__(self, path):
252         self.path = path
253
254     def _meet_constraint(self, task, cur_pod, cur_installer):
255         if "constraint" in task:
256             constraint = task.get('constraint', None)
257             if constraint is not None:
258                 tc_fit_pod = constraint.get('pod', None)
259                 tc_fit_installer = constraint.get('installer', None)
260                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
261                          cur_pod, cur_installer, constraint)
262                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
263                     return False
264                 if cur_installer and tc_fit_installer and \
265                         cur_installer not in tc_fit_installer:
266                     return False
267         return True
268
269     def _get_task_para(self, task, cur_pod):
270         task_args = task.get('task_args', None)
271         if task_args is not None:
272             task_args = task_args.get(cur_pod, None)
273         task_args_fnames = task.get('task_args_fnames', None)
274         if task_args_fnames is not None:
275             task_args_fnames = task_args_fnames.get(cur_pod, None)
276         return task_args, task_args_fnames
277
278     def parse_suite(self):
279         """parse the suite file and return a list of task config file paths
280            and lists of optional parameters if present"""
281         LOG.info("\nParsing suite file:%s", self.path)
282
283         try:
284             with open(self.path) as stream:
285                 cfg = yaml.load(stream)
286         except IOError as ioerror:
287             sys.exit(ioerror)
288
289         self._check_schema(cfg["schema"], "suite")
290         LOG.info("\nStarting scenario:%s", cfg["name"])
291
292         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
293         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
294                                       test_cases_dir)
295         if test_cases_dir[-1] != os.sep:
296             test_cases_dir += os.sep
297
298         cur_pod = os.environ.get('NODE_NAME', None)
299         cur_installer = os.environ.get('INSTALLER_TYPE', None)
300
301         valid_task_files = []
302         valid_task_args = []
303         valid_task_args_fnames = []
304
305         for task in cfg["test_cases"]:
306             # 1.check file_name
307             if "file_name" in task:
308                 task_fname = task.get('file_name', None)
309                 if task_fname is None:
310                     continue
311             else:
312                 continue
313             # 2.check constraint
314             if self._meet_constraint(task, cur_pod, cur_installer):
315                 valid_task_files.append(test_cases_dir + task_fname)
316             else:
317                 continue
318             # 3.fetch task parameters
319             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
320             valid_task_args.append(task_args)
321             valid_task_args_fnames.append(task_args_fnames)
322
323         return valid_task_files, valid_task_args, valid_task_args_fnames
324
325     def parse_task(self, task_id, task_args=None, task_args_file=None):
326         """parses the task file and return an context and scenario instances"""
327         print("Parsing task config:", self.path)
328
329         try:
330             kw = {}
331             if task_args_file:
332                 with open(task_args_file) as f:
333                     kw.update(parse_task_args("task_args_file", f.read()))
334             kw.update(parse_task_args("task_args", task_args))
335         except TypeError:
336             raise TypeError()
337
338         try:
339             with open(self.path) as f:
340                 try:
341                     input_task = f.read()
342                     rendered_task = TaskTemplate.render(input_task, **kw)
343                 except Exception as e:
344                     print("Failed to render template:\n%(task)s\n%(err)s\n"
345                           % {"task": input_task, "err": e})
346                     raise e
347                 print("Input task is:\n%s\n" % rendered_task)
348
349                 cfg = yaml.load(rendered_task)
350         except IOError as ioerror:
351             sys.exit(ioerror)
352
353         self._check_schema(cfg["schema"], "task")
354         meet_precondition = self._check_precondition(cfg)
355
356         # TODO: support one or many contexts? Many would simpler and precise
357         # TODO: support hybrid context type
358         if "context" in cfg:
359             context_cfgs = [cfg["context"]]
360         elif "contexts" in cfg:
361             context_cfgs = cfg["contexts"]
362         else:
363             context_cfgs = [{"type": "Dummy"}]
364
365         contexts = []
366         name_suffix = '-{}'.format(task_id[:8])
367         for cfg_attrs in context_cfgs:
368             try:
369                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
370                                                   name_suffix)
371             except KeyError:
372                 pass
373             # default to Heat context because we are testing OpenStack
374             context_type = cfg_attrs.get("type", "Heat")
375             context = Context.get(context_type)
376             context.init(cfg_attrs)
377             contexts.append(context)
378
379         run_in_parallel = cfg.get("run_in_parallel", False)
380
381         # add tc and task id for influxdb extended tags
382         for scenario in cfg["scenarios"]:
383             task_name = os.path.splitext(os.path.basename(self.path))[0]
384             scenario["tc"] = task_name
385             scenario["task_id"] = task_id
386             # embed task path into scenario so we can load other files
387             # relative to task path
388             scenario["task_path"] = os.path.dirname(self.path)
389
390             change_server_name(scenario, name_suffix)
391
392             try:
393                 for node in scenario['nodes']:
394                     scenario['nodes'][node] += name_suffix
395             except KeyError:
396                 pass
397
398         # TODO we need something better here, a class that represent the file
399         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
400
401     def _check_schema(self, cfg_schema, schema_type):
402         """Check if config file is using the correct schema type"""
403
404         if cfg_schema != "yardstick:" + schema_type + ":0.1":
405             sys.exit("error: file %s has unknown schema %s" % (self.path,
406                                                                cfg_schema))
407
408     def _check_precondition(self, cfg):
409         """Check if the envrionment meet the preconditon"""
410
411         if "precondition" in cfg:
412             precondition = cfg["precondition"]
413             installer_type = precondition.get("installer_type", None)
414             deploy_scenarios = precondition.get("deploy_scenarios", None)
415             tc_fit_pods = precondition.get("pod_name", None)
416             installer_type_env = os.environ.get('INSTALL_TYPE', None)
417             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
418             pod_name_env = os.environ.get('NODE_NAME', None)
419
420             LOG.info("installer_type: %s, installer_type_env: %s",
421                      installer_type, installer_type_env)
422             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
423                      deploy_scenarios, deploy_scenario_env)
424             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
425                      tc_fit_pods, pod_name_env)
426             if installer_type and installer_type_env:
427                 if installer_type_env not in installer_type:
428                     return False
429             if deploy_scenarios and deploy_scenario_env:
430                 deploy_scenarios_list = deploy_scenarios.split(',')
431                 for deploy_scenario in deploy_scenarios_list:
432                     if deploy_scenario_env.startswith(deploy_scenario):
433                         return True
434                 return False
435             if tc_fit_pods and pod_name_env:
436                 if pod_name_env not in tc_fit_pods:
437                     return False
438         return True
439
440
441 def is_ip_addr(addr):
442     """check if string addr is an IP address"""
443     try:
444         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
445     except AttributeError:
446         pass
447
448     try:
449         ipaddress.ip_address(addr.encode('utf-8'))
450     except ValueError:
451         return False
452     else:
453         return True
454
455
456 def _is_background_scenario(scenario):
457     if "run_in_background" in scenario:
458         return scenario["run_in_background"]
459     else:
460         return False
461
462
463 def parse_nodes_with_context(scenario_cfg):
464     """paras the 'nodes' fields in scenario """
465     nodes = scenario_cfg["nodes"]
466
467     nodes_cfg = {}
468     for nodename in nodes:
469         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
470
471     return nodes_cfg
472
473
474 def runner_join(runner):
475     """join (wait for) a runner, exit process at runner failure"""
476     status = runner.join()
477     base_runner.Runner.release(runner)
478     if status != 0:
479         sys.exit("Runner failed")
480
481
482 def print_invalid_header(source_name, args):
483     print("Invalid %(source)s passed:\n\n %(args)s\n"
484           % {"source": source_name, "args": args})
485
486
487 def parse_task_args(src_name, args):
488     try:
489         kw = args and yaml.safe_load(args)
490         kw = {} if kw is None else kw
491     except yaml.parser.ParserError as e:
492         print_invalid_header(src_name, args)
493         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
494               % {"source": src_name, "err": e})
495         raise TypeError()
496
497     if not isinstance(kw, dict):
498         print_invalid_header(src_name, args)
499         print("%(src)s had to be dict, actually %(src_type)s\n"
500               % {"src": src_name, "src_type": type(kw)})
501         raise TypeError()
502     return kw
503
504
505 def check_environment():
506     auth_url = os.environ.get('OS_AUTH_URL', None)
507     if not auth_url:
508         try:
509             source_env(constants.OPENRC)
510         except IOError as e:
511             if e.errno != errno.EEXIST:
512                 raise
513             LOG.debug('OPENRC file not found')
514
515
516 def change_server_name(scenario, suffix):
517     try:
518         host = scenario['host']
519     except KeyError:
520         pass
521     else:
522         try:
523             host['name'] += suffix
524         except TypeError:
525             scenario['host'] += suffix
526
527     try:
528         target = scenario['target']
529     except KeyError:
530         pass
531     else:
532         try:
533             target['name'] += suffix
534         except TypeError:
535             scenario['target'] += suffix
536
537     try:
538         key = 'targets'
539         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
540     except KeyError:
541         pass