Merge "Bugfix:can not run a test suite if not under yardstick root path"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 from six.moves import filter
24
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.task_template import TaskTemplate
28 from yardstick.common.utils import source_env
29 from yardstick.common import constants
30
31 output_file_default = "/tmp/yardstick.out"
32 test_cases_dir_default = "tests/opnfv/test_cases/"
33 LOG = logging.getLogger(__name__)
34
35
36 class Task(object):     # pragma: no cover
37     """Task commands.
38
39        Set of commands to manage benchmark tasks.
40     """
41
42     def start(self, args, **kwargs):
43         """Start a benchmark scenario."""
44
45         atexit.register(atexit_handler)
46
47         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
48
49         check_environment()
50
51         total_start_time = time.time()
52         parser = TaskParser(args.inputfile[0])
53
54         if args.suite:
55             # 1.parse suite, return suite_params info
56             task_files, task_args, task_args_fnames = \
57                 parser.parse_suite()
58         else:
59             task_files = [parser.path]
60             task_args = [args.task_args]
61             task_args_fnames = [args.task_args_file]
62
63         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
64                  task_files, task_args, task_args_fnames)
65
66         if args.parse_only:
67             sys.exit(0)
68
69         if os.path.isfile(args.output_file):
70             os.remove(args.output_file)
71         # parse task_files
72         for i in range(0, len(task_files)):
73             one_task_start_time = time.time()
74             parser.path = task_files[i]
75             scenarios, run_in_parallel, meet_precondition = parser.parse_task(
76                 self.task_id, task_args[i], task_args_fnames[i])
77
78             if not meet_precondition:
79                 LOG.info("meet_precondition is %s, please check envrionment",
80                          meet_precondition)
81                 continue
82
83             self._run(scenarios, run_in_parallel, args.output_file)
84
85             if args.keep_deploy:
86                 # keep deployment, forget about stack
87                 # (hide it for exit handler)
88                 Context.list = []
89             else:
90                 for context in Context.list:
91                     context.undeploy()
92                 Context.list = []
93             one_task_end_time = time.time()
94             LOG.info("task %s finished in %d secs", task_files[i],
95                      one_task_end_time - one_task_start_time)
96
97         total_end_time = time.time()
98         LOG.info("total finished in %d secs",
99                  total_end_time - total_start_time)
100
101         print("Done, exiting")
102
103     def _run(self, scenarios, run_in_parallel, output_file):
104         """Deploys context and calls runners"""
105         for context in Context.list:
106             context.deploy()
107
108         background_runners = []
109
110         # Start all background scenarios
111         for scenario in filter(_is_background_scenario, scenarios):
112             scenario["runner"] = dict(type="Duration", duration=1000000000)
113             runner = run_one_scenario(scenario, output_file)
114             background_runners.append(runner)
115
116         runners = []
117         if run_in_parallel:
118             for scenario in scenarios:
119                 if not _is_background_scenario(scenario):
120                     runner = run_one_scenario(scenario, output_file)
121                     runners.append(runner)
122
123             # Wait for runners to finish
124             for runner in runners:
125                 runner_join(runner)
126                 print("Runner ended, output in", output_file)
127         else:
128             # run serially
129             for scenario in scenarios:
130                 if not _is_background_scenario(scenario):
131                     runner = run_one_scenario(scenario, output_file)
132                     runner_join(runner)
133                     print("Runner ended, output in", output_file)
134
135         # Abort background runners
136         for runner in background_runners:
137             runner.abort()
138
139         # Wait for background runners to finish
140         for runner in background_runners:
141             if runner.join(timeout=60) is None:
142                 # Nuke if it did not stop nicely
143                 base_runner.Runner.terminate(runner)
144                 runner_join(runner)
145             else:
146                 base_runner.Runner.release(runner)
147             print("Background task ended")
148
149
150 # TODO: Move stuff below into TaskCommands class !?
151
152
153 class TaskParser(object):       # pragma: no cover
154     """Parser for task config files in yaml format"""
155
156     def __init__(self, path):
157         self.path = path
158
159     def _meet_constraint(self, task, cur_pod, cur_installer):
160         if "constraint" in task:
161             constraint = task.get('constraint', None)
162             if constraint is not None:
163                 tc_fit_pod = constraint.get('pod', None)
164                 tc_fit_installer = constraint.get('installer', None)
165                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
166                          cur_pod, cur_installer, constraint)
167                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
168                     return False
169                 if cur_installer and tc_fit_installer and \
170                         cur_installer not in tc_fit_installer:
171                     return False
172         return True
173
174     def _get_task_para(self, task, cur_pod):
175         task_args = task.get('task_args', None)
176         if task_args is not None:
177             task_args = task_args.get(cur_pod, None)
178         task_args_fnames = task.get('task_args_fnames', None)
179         if task_args_fnames is not None:
180             task_args_fnames = task_args_fnames.get(cur_pod, None)
181         return task_args, task_args_fnames
182
183     def parse_suite(self):
184         """parse the suite file and return a list of task config file paths
185            and lists of optional parameters if present"""
186         LOG.info("\nParsing suite file:%s", self.path)
187
188         try:
189             with open(self.path) as stream:
190                 cfg = yaml.load(stream)
191         except IOError as ioerror:
192             sys.exit(ioerror)
193
194         self._check_schema(cfg["schema"], "suite")
195         LOG.info("\nStarting scenario:%s", cfg["name"])
196
197         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
198         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
199                                       test_cases_dir)
200         if test_cases_dir[-1] != os.sep:
201             test_cases_dir += os.sep
202
203         cur_pod = os.environ.get('NODE_NAME', None)
204         cur_installer = os.environ.get('INSTALLER_TYPE', None)
205
206         valid_task_files = []
207         valid_task_args = []
208         valid_task_args_fnames = []
209
210         for task in cfg["test_cases"]:
211             # 1.check file_name
212             if "file_name" in task:
213                 task_fname = task.get('file_name', None)
214                 if task_fname is None:
215                     continue
216             else:
217                 continue
218             # 2.check constraint
219             if self._meet_constraint(task, cur_pod, cur_installer):
220                 valid_task_files.append(test_cases_dir + task_fname)
221             else:
222                 continue
223             # 3.fetch task parameters
224             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
225             valid_task_args.append(task_args)
226             valid_task_args_fnames.append(task_args_fnames)
227
228         return valid_task_files, valid_task_args, valid_task_args_fnames
229
230     def parse_task(self, task_id, task_args=None, task_args_file=None):
231         """parses the task file and return an context and scenario instances"""
232         print("Parsing task config:", self.path)
233
234         try:
235             kw = {}
236             if task_args_file:
237                 with open(task_args_file) as f:
238                     kw.update(parse_task_args("task_args_file", f.read()))
239             kw.update(parse_task_args("task_args", task_args))
240         except TypeError:
241             raise TypeError()
242
243         try:
244             with open(self.path) as f:
245                 try:
246                     input_task = f.read()
247                     rendered_task = TaskTemplate.render(input_task, **kw)
248                 except Exception as e:
249                     print("Failed to render template:\n%(task)s\n%(err)s\n"
250                           % {"task": input_task, "err": e})
251                     raise e
252                 print("Input task is:\n%s\n" % rendered_task)
253
254                 cfg = yaml.load(rendered_task)
255         except IOError as ioerror:
256             sys.exit(ioerror)
257
258         self._check_schema(cfg["schema"], "task")
259         meet_precondition = self._check_precondition(cfg)
260
261         # TODO: support one or many contexts? Many would simpler and precise
262         # TODO: support hybrid context type
263         if "context" in cfg:
264             context_cfgs = [cfg["context"]]
265         elif "contexts" in cfg:
266             context_cfgs = cfg["contexts"]
267         else:
268             context_cfgs = [{"type": "Dummy"}]
269
270         name_suffix = '-{}'.format(task_id[:8])
271         for cfg_attrs in context_cfgs:
272             cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'], name_suffix)
273             context_type = cfg_attrs.get("type", "Heat")
274             if "Heat" == context_type and "networks" in cfg_attrs:
275                 # bugfix: if there are more than one network,
276                 # only add "external_network" on first one.
277                 # the name of netwrok should follow this rule:
278                 # test, test2, test3 ...
279                 # sort network with the length of network's name
280                 sorted_networks = sorted(cfg_attrs["networks"])
281                 # config external_network based on env var
282                 cfg_attrs["networks"][sorted_networks[0]]["external_network"] \
283                     = os.environ.get("EXTERNAL_NETWORK", "net04_ext")
284
285             context = Context.get(context_type)
286             context.init(cfg_attrs)
287
288         run_in_parallel = cfg.get("run_in_parallel", False)
289
290         # add tc and task id for influxdb extended tags
291         for scenario in cfg["scenarios"]:
292             task_name = os.path.splitext(os.path.basename(self.path))[0]
293             scenario["tc"] = task_name
294             scenario["task_id"] = task_id
295
296             change_server_name(scenario, name_suffix)
297
298             try:
299                 for node in scenario['nodes']:
300                     scenario['nodes'][node] += name_suffix
301             except KeyError:
302                 pass
303
304         # TODO we need something better here, a class that represent the file
305         return cfg["scenarios"], run_in_parallel, meet_precondition
306
307     def _check_schema(self, cfg_schema, schema_type):
308         """Check if config file is using the correct schema type"""
309
310         if cfg_schema != "yardstick:" + schema_type + ":0.1":
311             sys.exit("error: file %s has unknown schema %s" % (self.path,
312                                                                cfg_schema))
313
314     def _check_precondition(self, cfg):
315         """Check if the envrionment meet the preconditon"""
316
317         if "precondition" in cfg:
318             precondition = cfg["precondition"]
319             installer_type = precondition.get("installer_type", None)
320             deploy_scenarios = precondition.get("deploy_scenarios", None)
321             tc_fit_pods = precondition.get("pod_name", None)
322             installer_type_env = os.environ.get('INSTALL_TYPE', None)
323             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
324             pod_name_env = os.environ.get('NODE_NAME', None)
325
326             LOG.info("installer_type: %s, installer_type_env: %s",
327                      installer_type, installer_type_env)
328             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
329                      deploy_scenarios, deploy_scenario_env)
330             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
331                      tc_fit_pods, pod_name_env)
332             if installer_type and installer_type_env:
333                 if installer_type_env not in installer_type:
334                     return False
335             if deploy_scenarios and deploy_scenario_env:
336                 deploy_scenarios_list = deploy_scenarios.split(',')
337                 for deploy_scenario in deploy_scenarios_list:
338                     if deploy_scenario_env.startswith(deploy_scenario):
339                         return True
340                 return False
341             if tc_fit_pods and pod_name_env:
342                 if pod_name_env not in tc_fit_pods:
343                     return False
344         return True
345
346
347 def atexit_handler():
348     """handler for process termination"""
349     base_runner.Runner.terminate_all()
350
351     if len(Context.list) > 0:
352         print("Undeploying all contexts")
353         for context in Context.list:
354             context.undeploy()
355
356
357 def is_ip_addr(addr):
358     """check if string addr is an IP address"""
359     try:
360         ipaddress.ip_address(addr.encode('utf-8'))
361         return True
362     except ValueError:
363         return False
364
365
366 def _is_same_heat_context(host_attr, target_attr):
367     """check if two servers are in the same heat context
368     host_attr: either a name for a server created by yardstick or a dict
369     with attribute name mapping when using external heat templates
370     target_attr: either a name for a server created by yardstick or a dict
371     with attribute name mapping when using external heat templates
372     """
373     host = None
374     target = None
375     for context in Context.list:
376         if context.__context_type__ != "Heat":
377             continue
378
379         host = context._get_server(host_attr)
380         if host is None:
381             continue
382
383         target = context._get_server(target_attr)
384         if target is None:
385             return False
386
387         # Both host and target is not None, then they are in the
388         # same heat context.
389         return True
390
391     return False
392
393
394 def _is_background_scenario(scenario):
395     if "run_in_background" in scenario:
396         return scenario["run_in_background"]
397     else:
398         return False
399
400
401 def run_one_scenario(scenario_cfg, output_file):
402     """run one scenario using context"""
403     runner_cfg = scenario_cfg["runner"]
404     runner_cfg['output_filename'] = output_file
405
406     # TODO support get multi hosts/vms info
407     context_cfg = {}
408     if "host" in scenario_cfg:
409         context_cfg['host'] = Context.get_server(scenario_cfg["host"])
410
411     if "target" in scenario_cfg:
412         if is_ip_addr(scenario_cfg["target"]):
413             context_cfg['target'] = {}
414             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
415         else:
416             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
417             if _is_same_heat_context(scenario_cfg["host"],
418                                      scenario_cfg["target"]):
419                 context_cfg["target"]["ipaddr"] = \
420                     context_cfg["target"]["private_ip"]
421             else:
422                 context_cfg["target"]["ipaddr"] = \
423                     context_cfg["target"]["ip"]
424
425     if "targets" in scenario_cfg:
426         ip_list = []
427         for target in scenario_cfg["targets"]:
428             if is_ip_addr(target):
429                 ip_list.append(target)
430                 context_cfg['target'] = {}
431             else:
432                 context_cfg['target'] = Context.get_server(target)
433                 if _is_same_heat_context(scenario_cfg["host"], target):
434                     ip_list.append(context_cfg["target"]["private_ip"])
435                 else:
436                     ip_list.append(context_cfg["target"]["ip"])
437         context_cfg['target']['ipaddr'] = ','.join(ip_list)
438
439     if "nodes" in scenario_cfg:
440         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
441     runner = base_runner.Runner.get(runner_cfg)
442
443     print("Starting runner of type '%s'" % runner_cfg["type"])
444     runner.run(scenario_cfg, context_cfg)
445
446     return runner
447
448
449 def parse_nodes_with_context(scenario_cfg):
450     """paras the 'nodes' fields in scenario """
451     nodes = scenario_cfg["nodes"]
452
453     nodes_cfg = {}
454     for nodename in nodes:
455         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
456
457     return nodes_cfg
458
459
460 def runner_join(runner):
461     """join (wait for) a runner, exit process at runner failure"""
462     status = runner.join()
463     base_runner.Runner.release(runner)
464     if status != 0:
465         sys.exit("Runner failed")
466
467
468 def print_invalid_header(source_name, args):
469     print("Invalid %(source)s passed:\n\n %(args)s\n"
470           % {"source": source_name, "args": args})
471
472
473 def parse_task_args(src_name, args):
474     try:
475         kw = args and yaml.safe_load(args)
476         kw = {} if kw is None else kw
477     except yaml.parser.ParserError as e:
478         print_invalid_header(src_name, args)
479         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
480               % {"source": src_name, "err": e})
481         raise TypeError()
482
483     if not isinstance(kw, dict):
484         print_invalid_header(src_name, args)
485         print("%(src)s had to be dict, actually %(src_type)s\n"
486               % {"src": src_name, "src_type": type(kw)})
487         raise TypeError()
488     return kw
489
490
491 def check_environment():
492     auth_url = os.environ.get('OS_AUTH_URL', None)
493     if not auth_url:
494         try:
495             source_env(constants.OPENSTACK_RC_FILE)
496         except IOError as e:
497             if e.errno != errno.EEXIST:
498                 raise
499             LOG.debug('OPENRC file not found')
500
501
502 def change_server_name(scenario, suffix):
503     try:
504         scenario['host'] += suffix
505     except KeyError:
506         pass
507
508     try:
509         scenario['target'] += suffix
510     except KeyError:
511         pass
512
513     try:
514         key = 'targets'
515         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
516     except KeyError:
517         pass