e23db075e0615b9c47229943d12c633d7d520c7f
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 from six.moves import filter
24
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.task_template import TaskTemplate
28 from yardstick.common.utils import source_env
29 from yardstick.common import utils
30 from yardstick.common import constants
31
32 output_file_default = "/tmp/yardstick.out"
33 config_file = '/etc/yardstick/yardstick.conf'
34 test_cases_dir_default = "tests/opnfv/test_cases/"
35 LOG = logging.getLogger(__name__)
36
37
38 class Task(object):     # pragma: no cover
39     """Task commands.
40
41        Set of commands to manage benchmark tasks.
42     """
43
44     def __init__(self):
45         self.config = {}
46         self.contexts = []
47
48     def start(self, args, **kwargs):
49         """Start a benchmark scenario."""
50
51         atexit.register(self.atexit_handler)
52
53         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
54
55         check_environment()
56
57         self.config['yardstick'] = utils.parse_ini_file(config_file)
58
59         total_start_time = time.time()
60         parser = TaskParser(args.inputfile[0])
61
62         if args.suite:
63             # 1.parse suite, return suite_params info
64             task_files, task_args, task_args_fnames = \
65                 parser.parse_suite()
66         else:
67             task_files = [parser.path]
68             task_args = [args.task_args]
69             task_args_fnames = [args.task_args_file]
70
71         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
72                  task_files, task_args, task_args_fnames)
73
74         if args.parse_only:
75             sys.exit(0)
76
77         # parse task_files
78         for i in range(0, len(task_files)):
79             one_task_start_time = time.time()
80             parser.path = task_files[i]
81             scenarios, run_in_parallel, meet_precondition, contexts = \
82                 parser.parse_task(self.task_id, task_args[i],
83                                   task_args_fnames[i])
84
85             self.contexts.extend(contexts)
86
87             if not meet_precondition:
88                 LOG.info("meet_precondition is %s, please check envrionment",
89                          meet_precondition)
90                 continue
91
92             self._run(scenarios, run_in_parallel, args.output_file)
93
94             if args.keep_deploy:
95                 # keep deployment, forget about stack
96                 # (hide it for exit handler)
97                 self.contexts = []
98             else:
99                 for context in self.contexts[::-1]:
100                     context.undeploy()
101                 self.contexts = []
102             one_task_end_time = time.time()
103             LOG.info("task %s finished in %d secs", task_files[i],
104                      one_task_end_time - one_task_start_time)
105
106         total_end_time = time.time()
107         LOG.info("total finished in %d secs",
108                  total_end_time - total_start_time)
109
110         scenario = scenarios[0]
111         print("To generate report execute => yardstick report generate ",
112               scenario['task_id'], scenario['tc'])
113
114         print("Done, exiting")
115
116     def _run(self, scenarios, run_in_parallel, output_file):
117         """Deploys context and calls runners"""
118         for context in self.contexts:
119             context.deploy()
120
121         background_runners = []
122
123         # Start all background scenarios
124         for scenario in filter(_is_background_scenario, scenarios):
125             scenario["runner"] = dict(type="Duration", duration=1000000000)
126             runner = self.run_one_scenario(scenario, output_file)
127             background_runners.append(runner)
128
129         runners = []
130         if run_in_parallel:
131             for scenario in scenarios:
132                 if not _is_background_scenario(scenario):
133                     runner = self.run_one_scenario(scenario, output_file)
134                     runners.append(runner)
135
136             # Wait for runners to finish
137             for runner in runners:
138                 runner_join(runner)
139                 print("Runner ended, output in", output_file)
140         else:
141             # run serially
142             for scenario in scenarios:
143                 if not _is_background_scenario(scenario):
144                     runner = self.run_one_scenario(scenario, output_file)
145                     runner_join(runner)
146                     print("Runner ended, output in", output_file)
147
148         # Abort background runners
149         for runner in background_runners:
150             runner.abort()
151
152         # Wait for background runners to finish
153         for runner in background_runners:
154             if runner.join(timeout=60) is None:
155                 # Nuke if it did not stop nicely
156                 base_runner.Runner.terminate(runner)
157                 runner_join(runner)
158             else:
159                 base_runner.Runner.release(runner)
160             print("Background task ended")
161
162     def atexit_handler(self):
163         """handler for process termination"""
164         base_runner.Runner.terminate_all()
165
166         if self.contexts:
167             print("Undeploying all contexts")
168             for context in self.contexts[::-1]:
169                 context.undeploy()
170
171     def run_one_scenario(self, scenario_cfg, output_file):
172         """run one scenario using context"""
173         runner_cfg = scenario_cfg["runner"]
174         runner_cfg['output_filename'] = output_file
175
176         # TODO support get multi hosts/vms info
177         context_cfg = {}
178         if "host" in scenario_cfg:
179             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
180
181         if "target" in scenario_cfg:
182             if is_ip_addr(scenario_cfg["target"]):
183                 context_cfg['target'] = {}
184                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
185             else:
186                 context_cfg['target'] = Context.get_server(
187                     scenario_cfg["target"])
188                 if self._is_same_heat_context(scenario_cfg["host"],
189                                               scenario_cfg["target"]):
190                     context_cfg["target"]["ipaddr"] = \
191                         context_cfg["target"]["private_ip"]
192                 else:
193                     context_cfg["target"]["ipaddr"] = \
194                         context_cfg["target"]["ip"]
195
196         if "targets" in scenario_cfg:
197             ip_list = []
198             for target in scenario_cfg["targets"]:
199                 if is_ip_addr(target):
200                     ip_list.append(target)
201                     context_cfg['target'] = {}
202                 else:
203                     context_cfg['target'] = Context.get_server(target)
204                     if self._is_same_heat_context(scenario_cfg["host"],
205                                                   target):
206                         ip_list.append(context_cfg["target"]["private_ip"])
207                     else:
208                         ip_list.append(context_cfg["target"]["ip"])
209             context_cfg['target']['ipaddr'] = ','.join(ip_list)
210
211         if "nodes" in scenario_cfg:
212             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
213         runner = base_runner.Runner.get(runner_cfg, self.config)
214
215         print("Starting runner of type '%s'" % runner_cfg["type"])
216         runner.run(scenario_cfg, context_cfg)
217
218         return runner
219
220     def _is_same_heat_context(self, host_attr, target_attr):
221         """check if two servers are in the same heat context
222         host_attr: either a name for a server created by yardstick or a dict
223         with attribute name mapping when using external heat templates
224         target_attr: either a name for a server created by yardstick or a dict
225         with attribute name mapping when using external heat templates
226         """
227         return True
228         host = None
229         target = None
230         for context in self.contexts:
231             if context.__context_type__ != "Heat":
232                 continue
233
234             host = context._get_server(host_attr)
235             if host is None:
236                 continue
237
238             target = context._get_server(target_attr)
239             if target is None:
240                 return False
241
242             # Both host and target is not None, then they are in the
243             # same heat context.
244             return True
245
246         return False
247
248
249 class TaskParser(object):       # pragma: no cover
250     """Parser for task config files in yaml format"""
251
252     def __init__(self, path):
253         self.path = path
254
255     def _meet_constraint(self, task, cur_pod, cur_installer):
256         if "constraint" in task:
257             constraint = task.get('constraint', None)
258             if constraint is not None:
259                 tc_fit_pod = constraint.get('pod', None)
260                 tc_fit_installer = constraint.get('installer', None)
261                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
262                          cur_pod, cur_installer, constraint)
263                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
264                     return False
265                 if cur_installer and tc_fit_installer and \
266                         cur_installer not in tc_fit_installer:
267                     return False
268         return True
269
270     def _get_task_para(self, task, cur_pod):
271         task_args = task.get('task_args', None)
272         if task_args is not None:
273             task_args = task_args.get(cur_pod, None)
274         task_args_fnames = task.get('task_args_fnames', None)
275         if task_args_fnames is not None:
276             task_args_fnames = task_args_fnames.get(cur_pod, None)
277         return task_args, task_args_fnames
278
279     def parse_suite(self):
280         """parse the suite file and return a list of task config file paths
281            and lists of optional parameters if present"""
282         LOG.info("\nParsing suite file:%s", self.path)
283
284         try:
285             with open(self.path) as stream:
286                 cfg = yaml.load(stream)
287         except IOError as ioerror:
288             sys.exit(ioerror)
289
290         self._check_schema(cfg["schema"], "suite")
291         LOG.info("\nStarting scenario:%s", cfg["name"])
292
293         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
294         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
295                                       test_cases_dir)
296         if test_cases_dir[-1] != os.sep:
297             test_cases_dir += os.sep
298
299         cur_pod = os.environ.get('NODE_NAME', None)
300         cur_installer = os.environ.get('INSTALLER_TYPE', None)
301
302         valid_task_files = []
303         valid_task_args = []
304         valid_task_args_fnames = []
305
306         for task in cfg["test_cases"]:
307             # 1.check file_name
308             if "file_name" in task:
309                 task_fname = task.get('file_name', None)
310                 if task_fname is None:
311                     continue
312             else:
313                 continue
314             # 2.check constraint
315             if self._meet_constraint(task, cur_pod, cur_installer):
316                 valid_task_files.append(test_cases_dir + task_fname)
317             else:
318                 continue
319             # 3.fetch task parameters
320             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
321             valid_task_args.append(task_args)
322             valid_task_args_fnames.append(task_args_fnames)
323
324         return valid_task_files, valid_task_args, valid_task_args_fnames
325
326     def parse_task(self, task_id, task_args=None, task_args_file=None):
327         """parses the task file and return an context and scenario instances"""
328         print("Parsing task config:", self.path)
329
330         try:
331             kw = {}
332             if task_args_file:
333                 with open(task_args_file) as f:
334                     kw.update(parse_task_args("task_args_file", f.read()))
335             kw.update(parse_task_args("task_args", task_args))
336         except TypeError:
337             raise TypeError()
338
339         try:
340             with open(self.path) as f:
341                 try:
342                     input_task = f.read()
343                     rendered_task = TaskTemplate.render(input_task, **kw)
344                 except Exception as e:
345                     print("Failed to render template:\n%(task)s\n%(err)s\n"
346                           % {"task": input_task, "err": e})
347                     raise e
348                 print("Input task is:\n%s\n" % rendered_task)
349
350                 cfg = yaml.load(rendered_task)
351         except IOError as ioerror:
352             sys.exit(ioerror)
353
354         self._check_schema(cfg["schema"], "task")
355         meet_precondition = self._check_precondition(cfg)
356
357         # TODO: support one or many contexts? Many would simpler and precise
358         # TODO: support hybrid context type
359         if "context" in cfg:
360             context_cfgs = [cfg["context"]]
361         elif "contexts" in cfg:
362             context_cfgs = cfg["contexts"]
363         else:
364             context_cfgs = [{"type": "Dummy"}]
365
366         contexts = []
367         name_suffix = '-{}'.format(task_id[:8])
368         for cfg_attrs in context_cfgs:
369             try:
370                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
371                                                   name_suffix)
372             except KeyError:
373                 pass
374             # default to Heat context because we are testing OpenStack
375             context_type = cfg_attrs.get("type", "Heat")
376             context = Context.get(context_type)
377             context.init(cfg_attrs)
378             contexts.append(context)
379
380         run_in_parallel = cfg.get("run_in_parallel", False)
381
382         # add tc and task id for influxdb extended tags
383         for scenario in cfg["scenarios"]:
384             task_name = os.path.splitext(os.path.basename(self.path))[0]
385             scenario["tc"] = task_name
386             scenario["task_id"] = task_id
387
388             change_server_name(scenario, name_suffix)
389
390             try:
391                 for node in scenario['nodes']:
392                     scenario['nodes'][node] += name_suffix
393             except KeyError:
394                 pass
395
396         # TODO we need something better here, a class that represent the file
397         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
398
399     def _check_schema(self, cfg_schema, schema_type):
400         """Check if config file is using the correct schema type"""
401
402         if cfg_schema != "yardstick:" + schema_type + ":0.1":
403             sys.exit("error: file %s has unknown schema %s" % (self.path,
404                                                                cfg_schema))
405
406     def _check_precondition(self, cfg):
407         """Check if the envrionment meet the preconditon"""
408
409         if "precondition" in cfg:
410             precondition = cfg["precondition"]
411             installer_type = precondition.get("installer_type", None)
412             deploy_scenarios = precondition.get("deploy_scenarios", None)
413             tc_fit_pods = precondition.get("pod_name", None)
414             installer_type_env = os.environ.get('INSTALL_TYPE', None)
415             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
416             pod_name_env = os.environ.get('NODE_NAME', None)
417
418             LOG.info("installer_type: %s, installer_type_env: %s",
419                      installer_type, installer_type_env)
420             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
421                      deploy_scenarios, deploy_scenario_env)
422             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
423                      tc_fit_pods, pod_name_env)
424             if installer_type and installer_type_env:
425                 if installer_type_env not in installer_type:
426                     return False
427             if deploy_scenarios and deploy_scenario_env:
428                 deploy_scenarios_list = deploy_scenarios.split(',')
429                 for deploy_scenario in deploy_scenarios_list:
430                     if deploy_scenario_env.startswith(deploy_scenario):
431                         return True
432                 return False
433             if tc_fit_pods and pod_name_env:
434                 if pod_name_env not in tc_fit_pods:
435                     return False
436         return True
437
438
439 def is_ip_addr(addr):
440     """check if string addr is an IP address"""
441     try:
442         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
443     except AttributeError:
444         pass
445
446     try:
447         ipaddress.ip_address(addr.encode('utf-8'))
448     except ValueError:
449         return False
450     else:
451         return True
452
453
454 def _is_background_scenario(scenario):
455     if "run_in_background" in scenario:
456         return scenario["run_in_background"]
457     else:
458         return False
459
460
461 def parse_nodes_with_context(scenario_cfg):
462     """paras the 'nodes' fields in scenario """
463     nodes = scenario_cfg["nodes"]
464
465     nodes_cfg = {}
466     for nodename in nodes:
467         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
468
469     return nodes_cfg
470
471
472 def runner_join(runner):
473     """join (wait for) a runner, exit process at runner failure"""
474     status = runner.join()
475     base_runner.Runner.release(runner)
476     if status != 0:
477         sys.exit("Runner failed")
478
479
480 def print_invalid_header(source_name, args):
481     print("Invalid %(source)s passed:\n\n %(args)s\n"
482           % {"source": source_name, "args": args})
483
484
485 def parse_task_args(src_name, args):
486     try:
487         kw = args and yaml.safe_load(args)
488         kw = {} if kw is None else kw
489     except yaml.parser.ParserError as e:
490         print_invalid_header(src_name, args)
491         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
492               % {"source": src_name, "err": e})
493         raise TypeError()
494
495     if not isinstance(kw, dict):
496         print_invalid_header(src_name, args)
497         print("%(src)s had to be dict, actually %(src_type)s\n"
498               % {"src": src_name, "src_type": type(kw)})
499         raise TypeError()
500     return kw
501
502
503 def check_environment():
504     auth_url = os.environ.get('OS_AUTH_URL', None)
505     if not auth_url:
506         try:
507             source_env(constants.OPENSTACK_RC_FILE)
508         except IOError as e:
509             if e.errno != errno.EEXIST:
510                 raise
511             LOG.debug('OPENRC file not found')
512
513
514 def change_server_name(scenario, suffix):
515     try:
516         host = scenario['host']
517     except KeyError:
518         pass
519     else:
520         try:
521             host['name'] += suffix
522         except TypeError:
523             scenario['host'] += suffix
524
525     try:
526         target = scenario['target']
527     except KeyError:
528         pass
529     else:
530         try:
531             target['name'] += suffix
532         except TypeError:
533             scenario['target'] += suffix
534
535     try:
536         key = 'targets'
537         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
538     except KeyError:
539         pass