Merge "SFC Yardstick test"
[yardstick.git] / yardstick / cmd / commands / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 import sys
13 import os
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 import uuid
20 from itertools import ifilter
21
22 from yardstick.benchmark.contexts.base import Context
23 from yardstick.benchmark.runners import base as base_runner
24 from yardstick.common.task_template import TaskTemplate
25 from yardstick.common.utils import cliargs
26
27 output_file_default = "/tmp/yardstick.out"
28 test_cases_dir_default = "tests/opnfv/test_cases/"
29 LOG = logging.getLogger(__name__)
30
31
32 class TaskCommands(object):
33     '''Task commands.
34
35        Set of commands to manage benchmark tasks.
36     '''
37
38     @cliargs("inputfile", type=str, help="path to task or suite file", nargs=1)
39     @cliargs("--task-args", dest="task_args",
40              help="Input task args (dict in json). These args are used"
41              "to render input task that is jinja2 template.")
42     @cliargs("--task-args-file", dest="task_args_file",
43              help="Path to the file with input task args (dict in "
44              "json/yaml). These args are used to render input"
45              "task that is jinja2 template.")
46     @cliargs("--keep-deploy", help="keep context deployed in cloud",
47              action="store_true")
48     @cliargs("--parse-only", help="parse the config file and exit",
49              action="store_true")
50     @cliargs("--output-file", help="file where output is stored, default %s" %
51              output_file_default, default=output_file_default)
52     @cliargs("--suite", help="process test suite file instead of a task file",
53              action="store_true")
54     def do_start(self, args):
55         '''Start a benchmark scenario.'''
56
57         atexit.register(atexit_handler)
58
59         total_start_time = time.time()
60         parser = TaskParser(args.inputfile[0])
61
62         suite_params = {}
63         if args.suite:
64             suite_params = parser.parse_suite()
65             test_cases_dir = suite_params["test_cases_dir"]
66             if test_cases_dir[-1] != os.sep:
67                 test_cases_dir += os.sep
68             task_files = [test_cases_dir + task
69                           for task in suite_params["task_fnames"]]
70         else:
71             task_files = [parser.path]
72
73         task_args = suite_params.get("task_args", [args.task_args])
74         task_args_fnames = suite_params.get("task_args_fnames",
75                                             [args.task_args_file])
76
77         if args.parse_only:
78             sys.exit(0)
79
80         if os.path.isfile(args.output_file):
81             os.remove(args.output_file)
82
83         for i in range(0, len(task_files)):
84             one_task_start_time = time.time()
85             parser.path = task_files[i]
86             task_name = os.path.splitext(os.path.basename(task_files[i]))[0]
87             scenarios, run_in_parallel, meet_precondition = parser.parse_task(
88                 task_name, task_args[i], task_args_fnames[i])
89
90             if not meet_precondition:
91                 LOG.info("meet_precondition is %s, please check envrionment",
92                          meet_precondition)
93                 continue
94
95             self._run(scenarios, run_in_parallel, args.output_file)
96
97             if args.keep_deploy:
98                 # keep deployment, forget about stack
99                 # (hide it for exit handler)
100                 Context.list = []
101             else:
102                 for context in Context.list:
103                     context.undeploy()
104                 Context.list = []
105             one_task_end_time = time.time()
106             LOG.info("task %s finished in %d secs", task_files[i],
107                      one_task_end_time - one_task_start_time)
108
109         total_end_time = time.time()
110         LOG.info("total finished in %d secs",
111                  total_end_time - total_start_time)
112
113         print "Done, exiting"
114
115     def _run(self, scenarios, run_in_parallel, output_file):
116         '''Deploys context and calls runners'''
117         for context in Context.list:
118             context.deploy()
119
120         background_runners = []
121
122         # Start all background scenarios
123         for scenario in ifilter(_is_background_scenario, scenarios):
124             scenario["runner"] = dict(type="Duration", duration=1000000000)
125             runner = run_one_scenario(scenario, output_file)
126             background_runners.append(runner)
127
128         runners = []
129         if run_in_parallel:
130             for scenario in scenarios:
131                 if not _is_background_scenario(scenario):
132                     runner = run_one_scenario(scenario, output_file)
133                     runners.append(runner)
134
135             # Wait for runners to finish
136             for runner in runners:
137                 runner_join(runner)
138                 print "Runner ended, output in", output_file
139         else:
140             # run serially
141             for scenario in scenarios:
142                 if not _is_background_scenario(scenario):
143                     runner = run_one_scenario(scenario, output_file)
144                     runner_join(runner)
145                     print "Runner ended, output in", output_file
146
147         # Abort background runners
148         for runner in background_runners:
149             runner.abort()
150
151         # Wait for background runners to finish
152         for runner in background_runners:
153             if runner.join(timeout=60) is None:
154                 # Nuke if it did not stop nicely
155                 base_runner.Runner.terminate(runner)
156                 runner_join(runner)
157             else:
158                 base_runner.Runner.release(runner)
159             print "Background task ended"
160
161
162 # TODO: Move stuff below into TaskCommands class !?
163
164
165 class TaskParser(object):
166     '''Parser for task config files in yaml format'''
167     def __init__(self, path):
168         self.path = path
169
170     def parse_suite(self):
171         '''parse the suite file and return a list of task config file paths
172            and lists of optional parameters if present'''
173         print "Parsing suite file:", self.path
174
175         try:
176             with open(self.path) as stream:
177                 cfg = yaml.load(stream)
178         except IOError as ioerror:
179             sys.exit(ioerror)
180
181         self._check_schema(cfg["schema"], "suite")
182         print "Starting suite:", cfg["name"]
183
184         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
185         task_fnames = []
186         task_args = []
187         task_args_fnames = []
188
189         for task in cfg["test_cases"]:
190             task_fnames.append(task["file_name"])
191             if "task_args" in task:
192                 task_args.append(task["task_args"])
193             else:
194                 task_args.append(None)
195
196             if "task_args_file" in task:
197                 task_args_fnames.append(task["task_args_file"])
198             else:
199                 task_args_fnames.append(None)
200
201         suite_params = {
202             "test_cases_dir": test_cases_dir,
203             "task_fnames": task_fnames,
204             "task_args": task_args,
205             "task_args_fnames": task_args_fnames
206         }
207
208         return suite_params
209
210     def parse_task(self, task_name, task_args=None, task_args_file=None):
211         '''parses the task file and return an context and scenario instances'''
212         print "Parsing task config:", self.path
213
214         try:
215             kw = {}
216             if task_args_file:
217                 with open(task_args_file) as f:
218                     kw.update(parse_task_args("task_args_file", f.read()))
219             kw.update(parse_task_args("task_args", task_args))
220         except TypeError:
221             raise TypeError()
222
223         try:
224             with open(self.path) as f:
225                 try:
226                     input_task = f.read()
227                     rendered_task = TaskTemplate.render(input_task, **kw)
228                 except Exception as e:
229                     print(("Failed to render template:\n%(task)s\n%(err)s\n")
230                           % {"task": input_task, "err": e})
231                     raise e
232                 print(("Input task is:\n%s\n") % rendered_task)
233
234                 cfg = yaml.load(rendered_task)
235         except IOError as ioerror:
236             sys.exit(ioerror)
237
238         self._check_schema(cfg["schema"], "task")
239         meet_precondition = self._check_precondition(cfg)
240
241         # TODO: support one or many contexts? Many would simpler and precise
242         # TODO: support hybrid context type
243         if "context" in cfg:
244             context_cfgs = [cfg["context"]]
245         elif "contexts" in cfg:
246             context_cfgs = cfg["contexts"]
247         else:
248             context_cfgs = [{"type": "Dummy"}]
249
250         for cfg_attrs in context_cfgs:
251             context_type = cfg_attrs.get("type", "Heat")
252             if "Heat" == context_type and "networks" in cfg_attrs:
253                 # config external_network based on env var
254                 for _, attrs in cfg_attrs["networks"].items():
255                     attrs["external_network"] = os.environ.get(
256                         'EXTERNAL_NETWORK', 'net04_ext')
257             context = Context.get(context_type)
258             context.init(cfg_attrs)
259
260         run_in_parallel = cfg.get("run_in_parallel", False)
261
262         # add tc and task id for influxdb extended tags
263         task_id = str(uuid.uuid4())
264         for scenario in cfg["scenarios"]:
265             scenario["tc"] = task_name
266             scenario["task_id"] = task_id
267
268         # TODO we need something better here, a class that represent the file
269         return cfg["scenarios"], run_in_parallel, meet_precondition
270
271     def _check_schema(self, cfg_schema, schema_type):
272         '''Check if config file is using the correct schema type'''
273
274         if cfg_schema != "yardstick:" + schema_type + ":0.1":
275             sys.exit("error: file %s has unknown schema %s" % (self.path,
276                                                                cfg_schema))
277
278     def _check_precondition(self, cfg):
279         '''Check if the envrionment meet the preconditon'''
280
281         if "precondition" in cfg:
282             precondition = cfg["precondition"]
283             installer_type = precondition.get("installer_type", None)
284             deploy_scenarios = precondition.get("deploy_scenarios", None)
285             installer_type_env = os.environ.get('INSTALL_TYPE', None)
286             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
287             if installer_type and installer_type_env:
288                 if installer_type_env not in installer_type:
289                     return False
290             if deploy_scenarios and deploy_scenario_env:
291                 deploy_scenarios_list = deploy_scenarios.split(',')
292                 for deploy_scenario in deploy_scenarios_list:
293                     if deploy_scenario_env.startswith(deploy_scenario):
294                         return True
295                 return False
296         return True
297
298
299 def atexit_handler():
300     '''handler for process termination'''
301     base_runner.Runner.terminate_all()
302
303     if len(Context.list) > 0:
304         print "Undeploying all contexts"
305         for context in Context.list:
306             context.undeploy()
307
308
309 def is_ip_addr(addr):
310     '''check if string addr is an IP address'''
311     try:
312         ipaddress.ip_address(unicode(addr))
313         return True
314     except ValueError:
315         return False
316
317
318 def _is_same_heat_context(host_attr, target_attr):
319     '''check if two servers are in the same heat context
320     host_attr: either a name for a server created by yardstick or a dict
321     with attribute name mapping when using external heat templates
322     target_attr: either a name for a server created by yardstick or a dict
323     with attribute name mapping when using external heat templates
324     '''
325     host = None
326     target = None
327     for context in Context.list:
328         if context.__context_type__ != "Heat":
329             continue
330
331         host = context._get_server(host_attr)
332         if host is None:
333             continue
334
335         target = context._get_server(target_attr)
336         if target is None:
337             return False
338
339         # Both host and target is not None, then they are in the
340         # same heat context.
341         return True
342
343     return False
344
345
346 def _is_background_scenario(scenario):
347     if "run_in_background" in scenario:
348         return scenario["run_in_background"]
349     else:
350         return False
351
352
353 def run_one_scenario(scenario_cfg, output_file):
354     '''run one scenario using context'''
355     runner_cfg = scenario_cfg["runner"]
356     runner_cfg['output_filename'] = output_file
357
358     # TODO support get multi hosts/vms info
359     context_cfg = {}
360     if "host" in scenario_cfg:
361         context_cfg['host'] = Context.get_server(scenario_cfg["host"])
362
363     if "target" in scenario_cfg:
364         if is_ip_addr(scenario_cfg["target"]):
365             context_cfg['target'] = {}
366             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
367         else:
368             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
369             if _is_same_heat_context(scenario_cfg["host"],
370                                      scenario_cfg["target"]):
371                 context_cfg["target"]["ipaddr"] = \
372                     context_cfg["target"]["private_ip"]
373             else:
374                 context_cfg["target"]["ipaddr"] = \
375                     context_cfg["target"]["ip"]
376
377     if "targets" in scenario_cfg:
378         ip_list = []
379         for target in scenario_cfg["targets"]:
380             if is_ip_addr(target):
381                 ip_list.append(target)
382                 context_cfg['target'] = {}
383             else:
384                 context_cfg['target'] = Context.get_server(target)
385                 if _is_same_heat_context(scenario_cfg["host"], target):
386                     ip_list.append(context_cfg["target"]["private_ip"])
387                 else:
388                     ip_list.append(context_cfg["target"]["ip"])
389         context_cfg['target']['ipaddr'] = ','.join(ip_list)
390
391     if "nodes" in scenario_cfg:
392         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
393     runner = base_runner.Runner.get(runner_cfg)
394
395     print "Starting runner of type '%s'" % runner_cfg["type"]
396     runner.run(scenario_cfg, context_cfg)
397
398     return runner
399
400
401 def parse_nodes_with_context(scenario_cfg):
402     '''paras the 'nodes' fields in scenario '''
403     nodes = scenario_cfg["nodes"]
404
405     nodes_cfg = {}
406     for nodename in nodes:
407         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
408
409     return nodes_cfg
410
411
412 def runner_join(runner):
413     '''join (wait for) a runner, exit process at runner failure'''
414     status = runner.join()
415     base_runner.Runner.release(runner)
416     if status != 0:
417         sys.exit("Runner failed")
418
419
420 def print_invalid_header(source_name, args):
421     print(("Invalid %(source)s passed:\n\n %(args)s\n")
422           % {"source": source_name, "args": args})
423
424
425 def parse_task_args(src_name, args):
426     try:
427         kw = args and yaml.safe_load(args)
428         kw = {} if kw is None else kw
429     except yaml.parser.ParserError as e:
430         print_invalid_header(src_name, args)
431         print(("%(source)s has to be YAML. Details:\n\n%(err)s\n")
432               % {"source": src_name, "err": e})
433         raise TypeError()
434
435     if not isinstance(kw, dict):
436         print_invalid_header(src_name, args)
437         print(("%(src)s had to be dict, actually %(src_type)s\n")
438               % {"src": src_name, "src_type": type(kw)})
439         raise TypeError()
440     return kw