Merge "change ci from base-on-pod to base-on-scenario (in progress)"
[yardstick.git] / yardstick / cmd / commands / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 import sys
13 import os
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 import uuid
20 from itertools import ifilter
21
22 from yardstick.benchmark.contexts.base import Context
23 from yardstick.benchmark.runners import base as base_runner
24 from yardstick.common.task_template import TaskTemplate
25 from yardstick.common.utils import cliargs
26
27 output_file_default = "/tmp/yardstick.out"
28 test_cases_dir_default = "tests/opnfv/test_cases/"
29 LOG = logging.getLogger(__name__)
30
31
32 class TaskCommands(object):
33     '''Task commands.
34
35        Set of commands to manage benchmark tasks.
36     '''
37
38     @cliargs("inputfile", type=str, help="path to task or suite file", nargs=1)
39     @cliargs("--task-args", dest="task_args",
40              help="Input task args (dict in json). These args are used"
41              "to render input task that is jinja2 template.")
42     @cliargs("--task-args-file", dest="task_args_file",
43              help="Path to the file with input task args (dict in "
44              "json/yaml). These args are used to render input"
45              "task that is jinja2 template.")
46     @cliargs("--keep-deploy", help="keep context deployed in cloud",
47              action="store_true")
48     @cliargs("--parse-only", help="parse the config file and exit",
49              action="store_true")
50     @cliargs("--output-file", help="file where output is stored, default %s" %
51              output_file_default, default=output_file_default)
52     @cliargs("--suite", help="process test suite file instead of a task file",
53              action="store_true")
54     def do_start(self, args):
55         '''Start a benchmark scenario.'''
56
57         atexit.register(atexit_handler)
58
59         total_start_time = time.time()
60         parser = TaskParser(args.inputfile[0])
61
62         suite_params = {}
63         if args.suite:
64             suite_params = parser.parse_suite()
65             test_cases_dir = suite_params["test_cases_dir"]
66             if test_cases_dir[-1] != os.sep:
67                 test_cases_dir += os.sep
68             task_files = [test_cases_dir + task
69                           for task in suite_params["task_fnames"]]
70         else:
71             task_files = [parser.path]
72
73         task_args = suite_params.get("task_args", [args.task_args])
74         task_args_fnames = suite_params.get("task_args_fnames",
75                                             [args.task_args_file])
76
77         if args.parse_only:
78             sys.exit(0)
79
80         if os.path.isfile(args.output_file):
81             os.remove(args.output_file)
82
83         for i in range(0, len(task_files)):
84             one_task_start_time = time.time()
85             parser.path = task_files[i]
86             task_name = os.path.splitext(os.path.basename(task_files[i]))[0]
87             scenarios, run_in_parallel, meet_precondition = parser.parse_task(
88                 task_name, task_args[i], task_args_fnames[i])
89
90             if not meet_precondition:
91                 LOG.info("meet_precondition is %s, please check envrionment",
92                          meet_precondition)
93                 continue
94
95             self._run(scenarios, run_in_parallel, args.output_file)
96
97             if args.keep_deploy:
98                 # keep deployment, forget about stack
99                 # (hide it for exit handler)
100                 Context.list = []
101             else:
102                 for context in Context.list:
103                     context.undeploy()
104                 Context.list = []
105             one_task_end_time = time.time()
106             LOG.info("task %s finished in %d secs", task_files[i],
107                      one_task_end_time - one_task_start_time)
108
109         total_end_time = time.time()
110         LOG.info("total finished in %d secs",
111                  total_end_time - total_start_time)
112
113         print "Done, exiting"
114
115     def _run(self, scenarios, run_in_parallel, output_file):
116         '''Deploys context and calls runners'''
117         for context in Context.list:
118             context.deploy()
119
120         background_runners = []
121
122         # Start all background scenarios
123         for scenario in ifilter(_is_background_scenario, scenarios):
124             scenario["runner"] = dict(type="Duration", duration=1000000000)
125             runner = run_one_scenario(scenario, output_file)
126             background_runners.append(runner)
127
128         runners = []
129         if run_in_parallel:
130             for scenario in scenarios:
131                 if not _is_background_scenario(scenario):
132                     runner = run_one_scenario(scenario, output_file)
133                     runners.append(runner)
134
135             # Wait for runners to finish
136             for runner in runners:
137                 runner_join(runner)
138                 print "Runner ended, output in", output_file
139         else:
140             # run serially
141             for scenario in scenarios:
142                 if not _is_background_scenario(scenario):
143                     runner = run_one_scenario(scenario, output_file)
144                     runner_join(runner)
145                     print "Runner ended, output in", output_file
146
147         # Abort background runners
148         for runner in background_runners:
149             runner.abort()
150
151         # Wait for background runners to finish
152         for runner in background_runners:
153             if runner.join(timeout=60) is None:
154                 # Nuke if it did not stop nicely
155                 base_runner.Runner.terminate(runner)
156                 runner_join(runner)
157             else:
158                 base_runner.Runner.release(runner)
159             print "Background task ended"
160
161
162 # TODO: Move stuff below into TaskCommands class !?
163
164
165 class TaskParser(object):
166     '''Parser for task config files in yaml format'''
167     def __init__(self, path):
168         self.path = path
169
170     def parse_suite(self):
171         '''parse the suite file and return a list of task config file paths
172            and lists of optional parameters if present'''
173         print "Parsing suite file:", self.path
174
175         try:
176             with open(self.path) as stream:
177                 cfg = yaml.load(stream)
178         except IOError as ioerror:
179             sys.exit(ioerror)
180
181         self._check_schema(cfg["schema"], "suite")
182         print "Starting suite:", cfg["name"]
183
184         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
185         task_fnames = []
186         task_args = []
187         task_args_fnames = []
188
189         for task in cfg["test_cases"]:
190             task_fnames.append(task["file_name"])
191             if "task_args" in task:
192                 task_args.append(task["task_args"])
193             else:
194                 task_args.append(None)
195
196             if "task_args_file" in task:
197                 task_args_fnames.append(task["task_args_file"])
198             else:
199                 task_args_fnames.append(None)
200
201         suite_params = {
202             "test_cases_dir": test_cases_dir,
203             "task_fnames": task_fnames,
204             "task_args": task_args,
205             "task_args_fnames": task_args_fnames
206         }
207
208         return suite_params
209
210     def parse_task(self, task_name, task_args=None, task_args_file=None):
211         '''parses the task file and return an context and scenario instances'''
212         print "Parsing task config:", self.path
213
214         try:
215             kw = {}
216             if task_args_file:
217                 with open(task_args_file) as f:
218                     kw.update(parse_task_args("task_args_file", f.read()))
219             kw.update(parse_task_args("task_args", task_args))
220         except TypeError:
221             raise TypeError()
222
223         try:
224             with open(self.path) as f:
225                 try:
226                     input_task = f.read()
227                     rendered_task = TaskTemplate.render(input_task, **kw)
228                 except Exception as e:
229                     print(("Failed to render template:\n%(task)s\n%(err)s\n")
230                           % {"task": input_task, "err": e})
231                     raise e
232                 print(("Input task is:\n%s\n") % rendered_task)
233
234                 cfg = yaml.load(rendered_task)
235         except IOError as ioerror:
236             sys.exit(ioerror)
237
238         self._check_schema(cfg["schema"], "task")
239         meet_precondition = self._check_precondition(cfg)
240
241         # TODO: support one or many contexts? Many would simpler and precise
242         # TODO: support hybrid context type
243         if "context" in cfg:
244             context_cfgs = [cfg["context"]]
245         elif "contexts" in cfg:
246             context_cfgs = cfg["contexts"]
247         else:
248             context_cfgs = [{"type": "Dummy"}]
249
250         for cfg_attrs in context_cfgs:
251             context_type = cfg_attrs.get("type", "Heat")
252             if "Heat" == context_type and "networks" in cfg_attrs:
253                 # config external_network based on env var
254                 for _, attrs in cfg_attrs["networks"].items():
255                     attrs["external_network"] = os.environ.get(
256                         'EXTERNAL_NETWORK', 'net04_ext')
257             context = Context.get(context_type)
258             context.init(cfg_attrs)
259
260         run_in_parallel = cfg.get("run_in_parallel", False)
261
262         # add tc and task id for influxdb extended tags
263         task_id = str(uuid.uuid4())
264         for scenario in cfg["scenarios"]:
265             scenario["tc"] = task_name
266             scenario["task_id"] = task_id
267
268         # TODO we need something better here, a class that represent the file
269         return cfg["scenarios"], run_in_parallel, meet_precondition
270
271     def _check_schema(self, cfg_schema, schema_type):
272         '''Check if config file is using the correct schema type'''
273
274         if cfg_schema != "yardstick:" + schema_type + ":0.1":
275             sys.exit("error: file %s has unknown schema %s" % (self.path,
276                                                                cfg_schema))
277
278     def _check_precondition(self, cfg):
279         '''Check if the envrionment meet the preconditon'''
280
281         if "precondition" in cfg:
282             precondition = cfg["precondition"]
283             installer_type = precondition.get("installer_type", None)
284             deploy_scenarios = precondition.get("deploy_scenarios", None)
285             tc_fit_pods = precondition.get("pod_name", None)
286             installer_type_env = os.environ.get('INSTALL_TYPE', None)
287             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
288             pod_name_env = os.environ.get('NODE_NAME', None)
289
290             LOG.info("installer_type: %s, installer_type_env: %s",
291                      installer_type, installer_type_env)
292             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
293                      deploy_scenarios, deploy_scenario_env)
294             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
295                      tc_fit_pods, pod_name_env)
296             if installer_type and installer_type_env:
297                 if installer_type_env not in installer_type:
298                     return False
299             if deploy_scenarios and deploy_scenario_env:
300                 deploy_scenarios_list = deploy_scenarios.split(',')
301                 for deploy_scenario in deploy_scenarios_list:
302                     if deploy_scenario_env.startswith(deploy_scenario):
303                         return True
304                 return False
305             if tc_fit_pods and pod_name_env:
306                 if pod_name_env not in tc_fit_pods:
307                     return False
308         return True
309
310
311 def atexit_handler():
312     '''handler for process termination'''
313     base_runner.Runner.terminate_all()
314
315     if len(Context.list) > 0:
316         print "Undeploying all contexts"
317         for context in Context.list:
318             context.undeploy()
319
320
321 def is_ip_addr(addr):
322     '''check if string addr is an IP address'''
323     try:
324         ipaddress.ip_address(unicode(addr))
325         return True
326     except ValueError:
327         return False
328
329
330 def _is_same_heat_context(host_attr, target_attr):
331     '''check if two servers are in the same heat context
332     host_attr: either a name for a server created by yardstick or a dict
333     with attribute name mapping when using external heat templates
334     target_attr: either a name for a server created by yardstick or a dict
335     with attribute name mapping when using external heat templates
336     '''
337     host = None
338     target = None
339     for context in Context.list:
340         if context.__context_type__ != "Heat":
341             continue
342
343         host = context._get_server(host_attr)
344         if host is None:
345             continue
346
347         target = context._get_server(target_attr)
348         if target is None:
349             return False
350
351         # Both host and target is not None, then they are in the
352         # same heat context.
353         return True
354
355     return False
356
357
358 def _is_background_scenario(scenario):
359     if "run_in_background" in scenario:
360         return scenario["run_in_background"]
361     else:
362         return False
363
364
365 def run_one_scenario(scenario_cfg, output_file):
366     '''run one scenario using context'''
367     runner_cfg = scenario_cfg["runner"]
368     runner_cfg['output_filename'] = output_file
369
370     # TODO support get multi hosts/vms info
371     context_cfg = {}
372     if "host" in scenario_cfg:
373         context_cfg['host'] = Context.get_server(scenario_cfg["host"])
374
375     if "target" in scenario_cfg:
376         if is_ip_addr(scenario_cfg["target"]):
377             context_cfg['target'] = {}
378             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
379         else:
380             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
381             if _is_same_heat_context(scenario_cfg["host"],
382                                      scenario_cfg["target"]):
383                 context_cfg["target"]["ipaddr"] = \
384                     context_cfg["target"]["private_ip"]
385             else:
386                 context_cfg["target"]["ipaddr"] = \
387                     context_cfg["target"]["ip"]
388
389     if "targets" in scenario_cfg:
390         ip_list = []
391         for target in scenario_cfg["targets"]:
392             if is_ip_addr(target):
393                 ip_list.append(target)
394                 context_cfg['target'] = {}
395             else:
396                 context_cfg['target'] = Context.get_server(target)
397                 if _is_same_heat_context(scenario_cfg["host"], target):
398                     ip_list.append(context_cfg["target"]["private_ip"])
399                 else:
400                     ip_list.append(context_cfg["target"]["ip"])
401         context_cfg['target']['ipaddr'] = ','.join(ip_list)
402
403     if "nodes" in scenario_cfg:
404         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
405     runner = base_runner.Runner.get(runner_cfg)
406
407     print "Starting runner of type '%s'" % runner_cfg["type"]
408     runner.run(scenario_cfg, context_cfg)
409
410     return runner
411
412
413 def parse_nodes_with_context(scenario_cfg):
414     '''paras the 'nodes' fields in scenario '''
415     nodes = scenario_cfg["nodes"]
416
417     nodes_cfg = {}
418     for nodename in nodes:
419         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
420
421     return nodes_cfg
422
423
424 def runner_join(runner):
425     '''join (wait for) a runner, exit process at runner failure'''
426     status = runner.join()
427     base_runner.Runner.release(runner)
428     if status != 0:
429         sys.exit("Runner failed")
430
431
432 def print_invalid_header(source_name, args):
433     print(("Invalid %(source)s passed:\n\n %(args)s\n")
434           % {"source": source_name, "args": args})
435
436
437 def parse_task_args(src_name, args):
438     try:
439         kw = args and yaml.safe_load(args)
440         kw = {} if kw is None else kw
441     except yaml.parser.ParserError as e:
442         print_invalid_header(src_name, args)
443         print(("%(source)s has to be YAML. Details:\n\n%(err)s\n")
444               % {"source": src_name, "err": e})
445         raise TypeError()
446
447     if not isinstance(kw, dict):
448         print_invalid_header(src_name, args)
449         print(("%(src)s had to be dict, actually %(src_type)s\n")
450               % {"src": src_name, "src_type": type(kw)})
451         raise TypeError()
452     return kw