Merge "Refine test case description for tc076"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 from six.moves import filter
24
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.task_template import TaskTemplate
28 from yardstick.common.utils import source_env
29 from yardstick.common import utils
30 from yardstick.common import constants
31
32 output_file_default = "/tmp/yardstick.out"
33 config_file = '/etc/yardstick/yardstick.conf'
34 test_cases_dir_default = "tests/opnfv/test_cases/"
35 LOG = logging.getLogger(__name__)
36
37
38 class Task(object):     # pragma: no cover
39     """Task commands.
40
41        Set of commands to manage benchmark tasks.
42     """
43
44     def __init__(self):
45         self.config = {}
46         self.contexts = []
47
48     def start(self, args, **kwargs):
49         """Start a benchmark scenario."""
50
51         atexit.register(self.atexit_handler)
52
53         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
54
55         check_environment()
56
57         self.config['yardstick'] = utils.parse_ini_file(config_file)
58
59         total_start_time = time.time()
60         parser = TaskParser(args.inputfile[0])
61
62         if args.suite:
63             # 1.parse suite, return suite_params info
64             task_files, task_args, task_args_fnames = \
65                 parser.parse_suite()
66         else:
67             task_files = [parser.path]
68             task_args = [args.task_args]
69             task_args_fnames = [args.task_args_file]
70
71         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
72                  task_files, task_args, task_args_fnames)
73
74         if args.parse_only:
75             sys.exit(0)
76
77         # parse task_files
78         for i in range(0, len(task_files)):
79             one_task_start_time = time.time()
80             parser.path = task_files[i]
81             scenarios, run_in_parallel, meet_precondition, contexts = \
82                 parser.parse_task(self.task_id, task_args[i],
83                                   task_args_fnames[i])
84
85             self.contexts.extend(contexts)
86
87             if not meet_precondition:
88                 LOG.info("meet_precondition is %s, please check envrionment",
89                          meet_precondition)
90                 continue
91
92             self._run(scenarios, run_in_parallel, args.output_file)
93
94             if args.keep_deploy:
95                 # keep deployment, forget about stack
96                 # (hide it for exit handler)
97                 self.contexts = []
98             else:
99                 for context in self.contexts[::-1]:
100                     context.undeploy()
101                 self.contexts = []
102             one_task_end_time = time.time()
103             LOG.info("task %s finished in %d secs", task_files[i],
104                      one_task_end_time - one_task_start_time)
105
106         total_end_time = time.time()
107         LOG.info("total finished in %d secs",
108                  total_end_time - total_start_time)
109
110         print("Done, exiting")
111
112     def _run(self, scenarios, run_in_parallel, output_file):
113         """Deploys context and calls runners"""
114         for context in self.contexts:
115             context.deploy()
116
117         background_runners = []
118
119         # Start all background scenarios
120         for scenario in filter(_is_background_scenario, scenarios):
121             scenario["runner"] = dict(type="Duration", duration=1000000000)
122             runner = self.run_one_scenario(scenario, output_file)
123             background_runners.append(runner)
124
125         runners = []
126         if run_in_parallel:
127             for scenario in scenarios:
128                 if not _is_background_scenario(scenario):
129                     runner = self.run_one_scenario(scenario, output_file)
130                     runners.append(runner)
131
132             # Wait for runners to finish
133             for runner in runners:
134                 runner_join(runner)
135                 print("Runner ended, output in", output_file)
136         else:
137             # run serially
138             for scenario in scenarios:
139                 if not _is_background_scenario(scenario):
140                     runner = self.run_one_scenario(scenario, output_file)
141                     runner_join(runner)
142                     print("Runner ended, output in", output_file)
143
144         # Abort background runners
145         for runner in background_runners:
146             runner.abort()
147
148         # Wait for background runners to finish
149         for runner in background_runners:
150             if runner.join(timeout=60) is None:
151                 # Nuke if it did not stop nicely
152                 base_runner.Runner.terminate(runner)
153                 runner_join(runner)
154             else:
155                 base_runner.Runner.release(runner)
156             print("Background task ended")
157
158     def atexit_handler(self):
159         """handler for process termination"""
160         base_runner.Runner.terminate_all()
161
162         if self.contexts:
163             print("Undeploying all contexts")
164             for context in self.contexts[::-1]:
165                 context.undeploy()
166
167     def run_one_scenario(self, scenario_cfg, output_file):
168         """run one scenario using context"""
169         runner_cfg = scenario_cfg["runner"]
170         runner_cfg['output_filename'] = output_file
171
172         # TODO support get multi hosts/vms info
173         context_cfg = {}
174         if "host" in scenario_cfg:
175             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
176
177         if "target" in scenario_cfg:
178             if is_ip_addr(scenario_cfg["target"]):
179                 context_cfg['target'] = {}
180                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
181             else:
182                 context_cfg['target'] = Context.get_server(
183                     scenario_cfg["target"])
184                 if self._is_same_heat_context(scenario_cfg["host"],
185                                               scenario_cfg["target"]):
186                     context_cfg["target"]["ipaddr"] = \
187                         context_cfg["target"]["private_ip"]
188                 else:
189                     context_cfg["target"]["ipaddr"] = \
190                         context_cfg["target"]["ip"]
191
192         if "targets" in scenario_cfg:
193             ip_list = []
194             for target in scenario_cfg["targets"]:
195                 if is_ip_addr(target):
196                     ip_list.append(target)
197                     context_cfg['target'] = {}
198                 else:
199                     context_cfg['target'] = Context.get_server(target)
200                     if self._is_same_heat_context(scenario_cfg["host"],
201                                                   target):
202                         ip_list.append(context_cfg["target"]["private_ip"])
203                     else:
204                         ip_list.append(context_cfg["target"]["ip"])
205             context_cfg['target']['ipaddr'] = ','.join(ip_list)
206
207         if "nodes" in scenario_cfg:
208             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
209         runner = base_runner.Runner.get(runner_cfg, self.config)
210
211         print("Starting runner of type '%s'" % runner_cfg["type"])
212         runner.run(scenario_cfg, context_cfg)
213
214         return runner
215
216     def _is_same_heat_context(self, host_attr, target_attr):
217         """check if two servers are in the same heat context
218         host_attr: either a name for a server created by yardstick or a dict
219         with attribute name mapping when using external heat templates
220         target_attr: either a name for a server created by yardstick or a dict
221         with attribute name mapping when using external heat templates
222         """
223         host = None
224         target = None
225         for context in self.contexts:
226             if context.__context_type__ != "Heat":
227                 continue
228
229             host = context._get_server(host_attr)
230             if host is None:
231                 continue
232
233             target = context._get_server(target_attr)
234             if target is None:
235                 return False
236
237             # Both host and target is not None, then they are in the
238             # same heat context.
239             return True
240
241         return False
242
243
244 class TaskParser(object):       # pragma: no cover
245     """Parser for task config files in yaml format"""
246
247     def __init__(self, path):
248         self.path = path
249
250     def _meet_constraint(self, task, cur_pod, cur_installer):
251         if "constraint" in task:
252             constraint = task.get('constraint', None)
253             if constraint is not None:
254                 tc_fit_pod = constraint.get('pod', None)
255                 tc_fit_installer = constraint.get('installer', None)
256                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
257                          cur_pod, cur_installer, constraint)
258                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
259                     return False
260                 if cur_installer and tc_fit_installer and \
261                         cur_installer not in tc_fit_installer:
262                     return False
263         return True
264
265     def _get_task_para(self, task, cur_pod):
266         task_args = task.get('task_args', None)
267         if task_args is not None:
268             task_args = task_args.get(cur_pod, None)
269         task_args_fnames = task.get('task_args_fnames', None)
270         if task_args_fnames is not None:
271             task_args_fnames = task_args_fnames.get(cur_pod, None)
272         return task_args, task_args_fnames
273
274     def parse_suite(self):
275         """parse the suite file and return a list of task config file paths
276            and lists of optional parameters if present"""
277         LOG.info("\nParsing suite file:%s", self.path)
278
279         try:
280             with open(self.path) as stream:
281                 cfg = yaml.load(stream)
282         except IOError as ioerror:
283             sys.exit(ioerror)
284
285         self._check_schema(cfg["schema"], "suite")
286         LOG.info("\nStarting scenario:%s", cfg["name"])
287
288         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
289         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
290                                       test_cases_dir)
291         if test_cases_dir[-1] != os.sep:
292             test_cases_dir += os.sep
293
294         cur_pod = os.environ.get('NODE_NAME', None)
295         cur_installer = os.environ.get('INSTALLER_TYPE', None)
296
297         valid_task_files = []
298         valid_task_args = []
299         valid_task_args_fnames = []
300
301         for task in cfg["test_cases"]:
302             # 1.check file_name
303             if "file_name" in task:
304                 task_fname = task.get('file_name', None)
305                 if task_fname is None:
306                     continue
307             else:
308                 continue
309             # 2.check constraint
310             if self._meet_constraint(task, cur_pod, cur_installer):
311                 valid_task_files.append(test_cases_dir + task_fname)
312             else:
313                 continue
314             # 3.fetch task parameters
315             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
316             valid_task_args.append(task_args)
317             valid_task_args_fnames.append(task_args_fnames)
318
319         return valid_task_files, valid_task_args, valid_task_args_fnames
320
321     def parse_task(self, task_id, task_args=None, task_args_file=None):
322         """parses the task file and return an context and scenario instances"""
323         print("Parsing task config:", self.path)
324
325         try:
326             kw = {}
327             if task_args_file:
328                 with open(task_args_file) as f:
329                     kw.update(parse_task_args("task_args_file", f.read()))
330             kw.update(parse_task_args("task_args", task_args))
331         except TypeError:
332             raise TypeError()
333
334         try:
335             with open(self.path) as f:
336                 try:
337                     input_task = f.read()
338                     rendered_task = TaskTemplate.render(input_task, **kw)
339                 except Exception as e:
340                     print("Failed to render template:\n%(task)s\n%(err)s\n"
341                           % {"task": input_task, "err": e})
342                     raise e
343                 print("Input task is:\n%s\n" % rendered_task)
344
345                 cfg = yaml.load(rendered_task)
346         except IOError as ioerror:
347             sys.exit(ioerror)
348
349         self._check_schema(cfg["schema"], "task")
350         meet_precondition = self._check_precondition(cfg)
351
352         # TODO: support one or many contexts? Many would simpler and precise
353         # TODO: support hybrid context type
354         if "context" in cfg:
355             context_cfgs = [cfg["context"]]
356         elif "contexts" in cfg:
357             context_cfgs = cfg["contexts"]
358         else:
359             context_cfgs = [{"type": "Dummy"}]
360
361         contexts = []
362         name_suffix = '-{}'.format(task_id[:8])
363         for cfg_attrs in context_cfgs:
364             try:
365                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
366                                                   name_suffix)
367             except KeyError:
368                 pass
369             # default to Heat context because we are testing OpenStack
370             context_type = cfg_attrs.get("type", "Heat")
371             context = Context.get(context_type)
372             context.init(cfg_attrs)
373             contexts.append(context)
374
375         run_in_parallel = cfg.get("run_in_parallel", False)
376
377         # add tc and task id for influxdb extended tags
378         for scenario in cfg["scenarios"]:
379             task_name = os.path.splitext(os.path.basename(self.path))[0]
380             scenario["tc"] = task_name
381             scenario["task_id"] = task_id
382
383             change_server_name(scenario, name_suffix)
384
385             try:
386                 for node in scenario['nodes']:
387                     scenario['nodes'][node] += name_suffix
388             except KeyError:
389                 pass
390
391         # TODO we need something better here, a class that represent the file
392         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
393
394     def _check_schema(self, cfg_schema, schema_type):
395         """Check if config file is using the correct schema type"""
396
397         if cfg_schema != "yardstick:" + schema_type + ":0.1":
398             sys.exit("error: file %s has unknown schema %s" % (self.path,
399                                                                cfg_schema))
400
401     def _check_precondition(self, cfg):
402         """Check if the envrionment meet the preconditon"""
403
404         if "precondition" in cfg:
405             precondition = cfg["precondition"]
406             installer_type = precondition.get("installer_type", None)
407             deploy_scenarios = precondition.get("deploy_scenarios", None)
408             tc_fit_pods = precondition.get("pod_name", None)
409             installer_type_env = os.environ.get('INSTALL_TYPE', None)
410             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
411             pod_name_env = os.environ.get('NODE_NAME', None)
412
413             LOG.info("installer_type: %s, installer_type_env: %s",
414                      installer_type, installer_type_env)
415             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
416                      deploy_scenarios, deploy_scenario_env)
417             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
418                      tc_fit_pods, pod_name_env)
419             if installer_type and installer_type_env:
420                 if installer_type_env not in installer_type:
421                     return False
422             if deploy_scenarios and deploy_scenario_env:
423                 deploy_scenarios_list = deploy_scenarios.split(',')
424                 for deploy_scenario in deploy_scenarios_list:
425                     if deploy_scenario_env.startswith(deploy_scenario):
426                         return True
427                 return False
428             if tc_fit_pods and pod_name_env:
429                 if pod_name_env not in tc_fit_pods:
430                     return False
431         return True
432
433
434 def is_ip_addr(addr):
435     """check if string addr is an IP address"""
436     try:
437         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
438     except AttributeError:
439         pass
440
441     try:
442         ipaddress.ip_address(addr.encode('utf-8'))
443     except ValueError:
444         return False
445     else:
446         return True
447
448
449 def _is_background_scenario(scenario):
450     if "run_in_background" in scenario:
451         return scenario["run_in_background"]
452     else:
453         return False
454
455
456 def parse_nodes_with_context(scenario_cfg):
457     """paras the 'nodes' fields in scenario """
458     nodes = scenario_cfg["nodes"]
459
460     nodes_cfg = {}
461     for nodename in nodes:
462         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
463
464     return nodes_cfg
465
466
467 def runner_join(runner):
468     """join (wait for) a runner, exit process at runner failure"""
469     status = runner.join()
470     base_runner.Runner.release(runner)
471     if status != 0:
472         sys.exit("Runner failed")
473
474
475 def print_invalid_header(source_name, args):
476     print("Invalid %(source)s passed:\n\n %(args)s\n"
477           % {"source": source_name, "args": args})
478
479
480 def parse_task_args(src_name, args):
481     try:
482         kw = args and yaml.safe_load(args)
483         kw = {} if kw is None else kw
484     except yaml.parser.ParserError as e:
485         print_invalid_header(src_name, args)
486         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
487               % {"source": src_name, "err": e})
488         raise TypeError()
489
490     if not isinstance(kw, dict):
491         print_invalid_header(src_name, args)
492         print("%(src)s had to be dict, actually %(src_type)s\n"
493               % {"src": src_name, "src_type": type(kw)})
494         raise TypeError()
495     return kw
496
497
498 def check_environment():
499     auth_url = os.environ.get('OS_AUTH_URL', None)
500     if not auth_url:
501         try:
502             source_env(constants.OPENSTACK_RC_FILE)
503         except IOError as e:
504             if e.errno != errno.EEXIST:
505                 raise
506             LOG.debug('OPENRC file not found')
507
508
509 def change_server_name(scenario, suffix):
510     try:
511         host = scenario['host']
512     except KeyError:
513         pass
514     else:
515         try:
516             host['name'] += suffix
517         except TypeError:
518             scenario['host'] += suffix
519
520     try:
521         target = scenario['target']
522     except KeyError:
523         pass
524     else:
525         try:
526             target['name'] += suffix
527         except TypeError:
528             scenario['target'] += suffix
529
530     try:
531         key = 'targets'
532         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
533     except KeyError:
534         pass