Merge "Bugfix: debug should be default off"
[yardstick.git] / yardstick / cmd / commands / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 import sys
13 import os
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 import uuid
20 import errno
21 from itertools import ifilter
22
23 from yardstick.benchmark.contexts.base import Context
24 from yardstick.benchmark.runners import base as base_runner
25 from yardstick.common.task_template import TaskTemplate
26 from yardstick.common.utils import cliargs
27 from yardstick.common.utils import source_env
28 from yardstick.common import constants
29
30 output_file_default = "/tmp/yardstick.out"
31 test_cases_dir_default = "tests/opnfv/test_cases/"
32 LOG = logging.getLogger(__name__)
33
34
35 class TaskCommands(object):
36     '''Task commands.
37
38        Set of commands to manage benchmark tasks.
39     '''
40
41     @cliargs("inputfile", type=str, help="path to task or suite file", nargs=1)
42     @cliargs("--task-args", dest="task_args",
43              help="Input task args (dict in json). These args are used"
44              "to render input task that is jinja2 template.")
45     @cliargs("--task-args-file", dest="task_args_file",
46              help="Path to the file with input task args (dict in "
47              "json/yaml). These args are used to render input"
48              "task that is jinja2 template.")
49     @cliargs("--keep-deploy", help="keep context deployed in cloud",
50              action="store_true")
51     @cliargs("--parse-only", help="parse the config file and exit",
52              action="store_true")
53     @cliargs("--output-file", help="file where output is stored, default %s" %
54              output_file_default, default=output_file_default)
55     @cliargs("--suite", help="process test suite file instead of a task file",
56              action="store_true")
57     def do_start(self, args, **kwargs):
58         '''Start a benchmark scenario.'''
59
60         atexit.register(atexit_handler)
61
62         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
63
64         check_environment()
65
66         total_start_time = time.time()
67         parser = TaskParser(args.inputfile[0])
68
69         if args.suite:
70             # 1.parse suite, return suite_params info
71             task_files, task_args, task_args_fnames = \
72                 parser.parse_suite()
73         else:
74             task_files = [parser.path]
75             task_args = [args.task_args]
76             task_args_fnames = [args.task_args_file]
77
78         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
79                  task_files, task_args, task_args_fnames)
80
81         if args.parse_only:
82             sys.exit(0)
83
84         if os.path.isfile(args.output_file):
85             os.remove(args.output_file)
86         # parse task_files
87         for i in range(0, len(task_files)):
88             one_task_start_time = time.time()
89             parser.path = task_files[i]
90             scenarios, run_in_parallel, meet_precondition = parser.parse_task(
91                  self.task_id, task_args[i], task_args_fnames[i])
92
93             if not meet_precondition:
94                 LOG.info("meet_precondition is %s, please check envrionment",
95                          meet_precondition)
96                 continue
97
98             self._run(scenarios, run_in_parallel, args.output_file)
99
100             if args.keep_deploy:
101                 # keep deployment, forget about stack
102                 # (hide it for exit handler)
103                 Context.list = []
104             else:
105                 for context in Context.list:
106                     context.undeploy()
107                 Context.list = []
108             one_task_end_time = time.time()
109             LOG.info("task %s finished in %d secs", task_files[i],
110                      one_task_end_time - one_task_start_time)
111
112         total_end_time = time.time()
113         LOG.info("total finished in %d secs",
114                  total_end_time - total_start_time)
115
116         print "Done, exiting"
117
118     def _run(self, scenarios, run_in_parallel, output_file):
119         '''Deploys context and calls runners'''
120         for context in Context.list:
121             context.deploy()
122
123         background_runners = []
124
125         # Start all background scenarios
126         for scenario in ifilter(_is_background_scenario, scenarios):
127             scenario["runner"] = dict(type="Duration", duration=1000000000)
128             runner = run_one_scenario(scenario, output_file)
129             background_runners.append(runner)
130
131         runners = []
132         if run_in_parallel:
133             for scenario in scenarios:
134                 if not _is_background_scenario(scenario):
135                     runner = run_one_scenario(scenario, output_file)
136                     runners.append(runner)
137
138             # Wait for runners to finish
139             for runner in runners:
140                 runner_join(runner)
141                 print "Runner ended, output in", output_file
142         else:
143             # run serially
144             for scenario in scenarios:
145                 if not _is_background_scenario(scenario):
146                     runner = run_one_scenario(scenario, output_file)
147                     runner_join(runner)
148                     print "Runner ended, output in", output_file
149
150         # Abort background runners
151         for runner in background_runners:
152             runner.abort()
153
154         # Wait for background runners to finish
155         for runner in background_runners:
156             if runner.join(timeout=60) is None:
157                 # Nuke if it did not stop nicely
158                 base_runner.Runner.terminate(runner)
159                 runner_join(runner)
160             else:
161                 base_runner.Runner.release(runner)
162             print "Background task ended"
163
164
165 # TODO: Move stuff below into TaskCommands class !?
166
167
168 class TaskParser(object):
169     '''Parser for task config files in yaml format'''
170     def __init__(self, path):
171         self.path = path
172
173     def _meet_constraint(self, task, cur_pod, cur_installer):
174         if "constraint" in task:
175             constraint = task.get('constraint', None)
176             if constraint is not None:
177                 tc_fit_pod = constraint.get('pod', None)
178                 tc_fit_installer = constraint.get('installer', None)
179                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
180                          cur_pod, cur_installer, constraint)
181                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
182                     return False
183                 if cur_installer and tc_fit_installer and \
184                         cur_installer not in tc_fit_installer:
185                     return False
186         return True
187
188     def _get_task_para(self, task, cur_pod):
189         task_args = task.get('task_args', None)
190         if task_args is not None:
191             task_args = task_args.get(cur_pod, None)
192         task_args_fnames = task.get('task_args_fnames', None)
193         if task_args_fnames is not None:
194             task_args_fnames = task_args_fnames.get(cur_pod, None)
195         return task_args, task_args_fnames
196
197     def parse_suite(self):
198         '''parse the suite file and return a list of task config file paths
199            and lists of optional parameters if present'''
200         LOG.info("\nParsing suite file:%s", self.path)
201
202         try:
203             with open(self.path) as stream:
204                 cfg = yaml.load(stream)
205         except IOError as ioerror:
206             sys.exit(ioerror)
207
208         self._check_schema(cfg["schema"], "suite")
209         LOG.info("\nStarting scenario:%s", cfg["name"])
210
211         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
212         if test_cases_dir[-1] != os.sep:
213             test_cases_dir += os.sep
214
215         cur_pod = os.environ.get('NODE_NAME', None)
216         cur_installer = os.environ.get('INSTALLER_TYPE', None)
217
218         valid_task_files = []
219         valid_task_args = []
220         valid_task_args_fnames = []
221
222         for task in cfg["test_cases"]:
223             # 1.check file_name
224             if "file_name" in task:
225                 task_fname = task.get('file_name', None)
226                 if task_fname is None:
227                     continue
228             else:
229                 continue
230             # 2.check constraint
231             if self._meet_constraint(task, cur_pod, cur_installer):
232                 valid_task_files.append(test_cases_dir + task_fname)
233             else:
234                 continue
235             # 3.fetch task parameters
236             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
237             valid_task_args.append(task_args)
238             valid_task_args_fnames.append(task_args_fnames)
239
240         return valid_task_files, valid_task_args, valid_task_args_fnames
241
242     def parse_task(self, task_id, task_args=None, task_args_file=None):
243         '''parses the task file and return an context and scenario instances'''
244         print "Parsing task config:", self.path
245
246         try:
247             kw = {}
248             if task_args_file:
249                 with open(task_args_file) as f:
250                     kw.update(parse_task_args("task_args_file", f.read()))
251             kw.update(parse_task_args("task_args", task_args))
252         except TypeError:
253             raise TypeError()
254
255         try:
256             with open(self.path) as f:
257                 try:
258                     input_task = f.read()
259                     rendered_task = TaskTemplate.render(input_task, **kw)
260                 except Exception as e:
261                     print(("Failed to render template:\n%(task)s\n%(err)s\n")
262                           % {"task": input_task, "err": e})
263                     raise e
264                 print(("Input task is:\n%s\n") % rendered_task)
265
266                 cfg = yaml.load(rendered_task)
267         except IOError as ioerror:
268             sys.exit(ioerror)
269
270         self._check_schema(cfg["schema"], "task")
271         meet_precondition = self._check_precondition(cfg)
272
273         # TODO: support one or many contexts? Many would simpler and precise
274         # TODO: support hybrid context type
275         if "context" in cfg:
276             context_cfgs = [cfg["context"]]
277         elif "contexts" in cfg:
278             context_cfgs = cfg["contexts"]
279         else:
280             context_cfgs = [{"type": "Dummy"}]
281
282         for cfg_attrs in context_cfgs:
283             context_type = cfg_attrs.get("type", "Heat")
284             if "Heat" == context_type and "networks" in cfg_attrs:
285                 # bugfix: if there are more than one network,
286                 # only add "external_network" on first one.
287                 # the name of netwrok should follow this rule:
288                 # test, test2, test3 ...
289                 # sort network with the length of network's name
290                 sorted_networks = sorted(cfg_attrs["networks"].keys())
291                 # config external_network based on env var
292                 cfg_attrs["networks"][sorted_networks[0]]["external_network"] \
293                     = os.environ.get("EXTERNAL_NETWORK", "net04_ext")
294
295             context = Context.get(context_type)
296             context.init(cfg_attrs)
297
298         run_in_parallel = cfg.get("run_in_parallel", False)
299
300         # add tc and task id for influxdb extended tags
301         for scenario in cfg["scenarios"]:
302             task_name = os.path.splitext(os.path.basename(self.path))[0]
303             scenario["tc"] = task_name
304             scenario["task_id"] = task_id
305
306         # TODO we need something better here, a class that represent the file
307         return cfg["scenarios"], run_in_parallel, meet_precondition
308
309     def _check_schema(self, cfg_schema, schema_type):
310         '''Check if config file is using the correct schema type'''
311
312         if cfg_schema != "yardstick:" + schema_type + ":0.1":
313             sys.exit("error: file %s has unknown schema %s" % (self.path,
314                                                                cfg_schema))
315
316     def _check_precondition(self, cfg):
317         '''Check if the envrionment meet the preconditon'''
318
319         if "precondition" in cfg:
320             precondition = cfg["precondition"]
321             installer_type = precondition.get("installer_type", None)
322             deploy_scenarios = precondition.get("deploy_scenarios", None)
323             tc_fit_pods = precondition.get("pod_name", None)
324             installer_type_env = os.environ.get('INSTALL_TYPE', None)
325             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
326             pod_name_env = os.environ.get('NODE_NAME', None)
327
328             LOG.info("installer_type: %s, installer_type_env: %s",
329                      installer_type, installer_type_env)
330             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
331                      deploy_scenarios, deploy_scenario_env)
332             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
333                      tc_fit_pods, pod_name_env)
334             if installer_type and installer_type_env:
335                 if installer_type_env not in installer_type:
336                     return False
337             if deploy_scenarios and deploy_scenario_env:
338                 deploy_scenarios_list = deploy_scenarios.split(',')
339                 for deploy_scenario in deploy_scenarios_list:
340                     if deploy_scenario_env.startswith(deploy_scenario):
341                         return True
342                 return False
343             if tc_fit_pods and pod_name_env:
344                 if pod_name_env not in tc_fit_pods:
345                     return False
346         return True
347
348
349 def atexit_handler():
350     '''handler for process termination'''
351     base_runner.Runner.terminate_all()
352
353     if len(Context.list) > 0:
354         print "Undeploying all contexts"
355         for context in Context.list:
356             context.undeploy()
357
358
359 def is_ip_addr(addr):
360     '''check if string addr is an IP address'''
361     try:
362         ipaddress.ip_address(unicode(addr))
363         return True
364     except ValueError:
365         return False
366
367
368 def _is_same_heat_context(host_attr, target_attr):
369     '''check if two servers are in the same heat context
370     host_attr: either a name for a server created by yardstick or a dict
371     with attribute name mapping when using external heat templates
372     target_attr: either a name for a server created by yardstick or a dict
373     with attribute name mapping when using external heat templates
374     '''
375     host = None
376     target = None
377     for context in Context.list:
378         if context.__context_type__ != "Heat":
379             continue
380
381         host = context._get_server(host_attr)
382         if host is None:
383             continue
384
385         target = context._get_server(target_attr)
386         if target is None:
387             return False
388
389         # Both host and target is not None, then they are in the
390         # same heat context.
391         return True
392
393     return False
394
395
396 def _is_background_scenario(scenario):
397     if "run_in_background" in scenario:
398         return scenario["run_in_background"]
399     else:
400         return False
401
402
403 def run_one_scenario(scenario_cfg, output_file):
404     '''run one scenario using context'''
405     runner_cfg = scenario_cfg["runner"]
406     runner_cfg['output_filename'] = output_file
407
408     # TODO support get multi hosts/vms info
409     context_cfg = {}
410     if "host" in scenario_cfg:
411         context_cfg['host'] = Context.get_server(scenario_cfg["host"])
412
413     if "target" in scenario_cfg:
414         if is_ip_addr(scenario_cfg["target"]):
415             context_cfg['target'] = {}
416             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
417         else:
418             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
419             if _is_same_heat_context(scenario_cfg["host"],
420                                      scenario_cfg["target"]):
421                 context_cfg["target"]["ipaddr"] = \
422                     context_cfg["target"]["private_ip"]
423             else:
424                 context_cfg["target"]["ipaddr"] = \
425                     context_cfg["target"]["ip"]
426
427     if "targets" in scenario_cfg:
428         ip_list = []
429         for target in scenario_cfg["targets"]:
430             if is_ip_addr(target):
431                 ip_list.append(target)
432                 context_cfg['target'] = {}
433             else:
434                 context_cfg['target'] = Context.get_server(target)
435                 if _is_same_heat_context(scenario_cfg["host"], target):
436                     ip_list.append(context_cfg["target"]["private_ip"])
437                 else:
438                     ip_list.append(context_cfg["target"]["ip"])
439         context_cfg['target']['ipaddr'] = ','.join(ip_list)
440
441     if "nodes" in scenario_cfg:
442         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
443     runner = base_runner.Runner.get(runner_cfg)
444
445     print "Starting runner of type '%s'" % runner_cfg["type"]
446     runner.run(scenario_cfg, context_cfg)
447
448     return runner
449
450
451 def parse_nodes_with_context(scenario_cfg):
452     '''paras the 'nodes' fields in scenario '''
453     nodes = scenario_cfg["nodes"]
454
455     nodes_cfg = {}
456     for nodename in nodes:
457         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
458
459     return nodes_cfg
460
461
462 def runner_join(runner):
463     '''join (wait for) a runner, exit process at runner failure'''
464     status = runner.join()
465     base_runner.Runner.release(runner)
466     if status != 0:
467         sys.exit("Runner failed")
468
469
470 def print_invalid_header(source_name, args):
471     print(("Invalid %(source)s passed:\n\n %(args)s\n")
472           % {"source": source_name, "args": args})
473
474
475 def parse_task_args(src_name, args):
476     try:
477         kw = args and yaml.safe_load(args)
478         kw = {} if kw is None else kw
479     except yaml.parser.ParserError as e:
480         print_invalid_header(src_name, args)
481         print(("%(source)s has to be YAML. Details:\n\n%(err)s\n")
482               % {"source": src_name, "err": e})
483         raise TypeError()
484
485     if not isinstance(kw, dict):
486         print_invalid_header(src_name, args)
487         print(("%(src)s had to be dict, actually %(src_type)s\n")
488               % {"src": src_name, "src_type": type(kw)})
489         raise TypeError()
490     return kw
491
492
493 def check_environment():
494     auth_url = os.environ.get('OS_AUTH_URL', None)
495     if not auth_url:
496         try:
497             source_env(constants.OPENSTACK_RC_FILE)
498         except IOError as e:
499             if e.errno != errno.EEXIST:
500                 raise
501             LOG.debug('OPENRC file not found')