InfluxDB dispatcher add more tags
[yardstick.git] / yardstick / cmd / commands / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 import sys
13 import os
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 import uuid
20 from itertools import ifilter
21
22 from yardstick.benchmark.contexts.base import Context
23 from yardstick.benchmark.runners import base as base_runner
24 from yardstick.common.task_template import TaskTemplate
25 from yardstick.common.utils import cliargs
26
27 output_file_default = "/tmp/yardstick.out"
28 test_cases_dir_default = "tests/opnfv/test_cases/"
29 LOG = logging.getLogger(__name__)
30
31
32 class TaskCommands(object):
33     '''Task commands.
34
35        Set of commands to manage benchmark tasks.
36     '''
37
38     @cliargs("inputfile", type=str, help="path to task or suite file", nargs=1)
39     @cliargs("--task-args", dest="task_args",
40              help="Input task args (dict in json). These args are used"
41              "to render input task that is jinja2 template.")
42     @cliargs("--task-args-file", dest="task_args_file",
43              help="Path to the file with input task args (dict in "
44              "json/yaml). These args are used to render input"
45              "task that is jinja2 template.")
46     @cliargs("--keep-deploy", help="keep context deployed in cloud",
47              action="store_true")
48     @cliargs("--parse-only", help="parse the config file and exit",
49              action="store_true")
50     @cliargs("--output-file", help="file where output is stored, default %s" %
51              output_file_default, default=output_file_default)
52     @cliargs("--suite", help="process test suite file instead of a task file",
53              action="store_true")
54     def do_start(self, args):
55         '''Start a benchmark scenario.'''
56
57         atexit.register(atexit_handler)
58
59         total_start_time = time.time()
60         parser = TaskParser(args.inputfile[0])
61
62         suite_params = {}
63         if args.suite:
64             suite_params = parser.parse_suite()
65             test_cases_dir = suite_params["test_cases_dir"]
66             if test_cases_dir[-1] != os.sep:
67                 test_cases_dir += os.sep
68             task_files = [test_cases_dir + task
69                           for task in suite_params["task_fnames"]]
70         else:
71             task_files = [parser.path]
72
73         task_args = suite_params.get("task_args", [args.task_args])
74         task_args_fnames = suite_params.get("task_args_fnames",
75                                             [args.task_args_file])
76
77         if args.parse_only:
78             sys.exit(0)
79
80         if os.path.isfile(args.output_file):
81             os.remove(args.output_file)
82
83         for i in range(0, len(task_files)):
84             one_task_start_time = time.time()
85             parser.path = task_files[i]
86             task_name = os.path.splitext(os.path.basename(task_files[i]))[0]
87             scenarios, run_in_parallel = parser.parse_task(task_name,
88                                                            task_args[i],
89                                                            task_args_fnames[i])
90
91             self._run(scenarios, run_in_parallel, args.output_file)
92
93             if args.keep_deploy:
94                 # keep deployment, forget about stack
95                 # (hide it for exit handler)
96                 Context.list = []
97             else:
98                 for context in Context.list:
99                     context.undeploy()
100                 Context.list = []
101             one_task_end_time = time.time()
102             LOG.info("task %s finished in %d secs", task_files[i],
103                      one_task_end_time - one_task_start_time)
104
105         total_end_time = time.time()
106         LOG.info("total finished in %d secs",
107                  total_end_time - total_start_time)
108
109         print "Done, exiting"
110
111     def _run(self, scenarios, run_in_parallel, output_file):
112         '''Deploys context and calls runners'''
113         for context in Context.list:
114             context.deploy()
115
116         background_runners = []
117
118         # Start all background scenarios
119         for scenario in ifilter(_is_background_scenario, scenarios):
120             scenario["runner"] = dict(type="Duration", duration=1000000000)
121             runner = run_one_scenario(scenario, output_file)
122             background_runners.append(runner)
123
124         runners = []
125         if run_in_parallel:
126             for scenario in scenarios:
127                 if not _is_background_scenario(scenario):
128                     runner = run_one_scenario(scenario, output_file)
129                     runners.append(runner)
130
131             # Wait for runners to finish
132             for runner in runners:
133                 runner_join(runner)
134                 print "Runner ended, output in", output_file
135         else:
136             # run serially
137             for scenario in scenarios:
138                 if not _is_background_scenario(scenario):
139                     runner = run_one_scenario(scenario, output_file)
140                     runner_join(runner)
141                     print "Runner ended, output in", output_file
142
143         # Abort background runners
144         for runner in background_runners:
145             runner.abort()
146
147         # Wait for background runners to finish
148         for runner in background_runners:
149             if runner.join(timeout=60) is None:
150                 # Nuke if it did not stop nicely
151                 base_runner.Runner.terminate(runner)
152                 runner_join(runner)
153             else:
154                 base_runner.Runner.release(runner)
155             print "Background task ended"
156
157
158 # TODO: Move stuff below into TaskCommands class !?
159
160
161 class TaskParser(object):
162     '''Parser for task config files in yaml format'''
163     def __init__(self, path):
164         self.path = path
165
166     def parse_suite(self):
167         '''parse the suite file and return a list of task config file paths
168            and lists of optional parameters if present'''
169         print "Parsing suite file:", self.path
170
171         try:
172             with open(self.path) as stream:
173                 cfg = yaml.load(stream)
174         except IOError as ioerror:
175             sys.exit(ioerror)
176
177         self._check_schema(cfg["schema"], "suite")
178         print "Starting suite:", cfg["name"]
179
180         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
181         task_fnames = []
182         task_args = []
183         task_args_fnames = []
184
185         for task in cfg["test_cases"]:
186             task_fnames.append(task["file_name"])
187             if "task_args" in task:
188                 task_args.append(task["task_args"])
189             else:
190                 task_args.append(None)
191
192             if "task_args_file" in task:
193                 task_args_fnames.append(task["task_args_file"])
194             else:
195                 task_args_fnames.append(None)
196
197         suite_params = {
198             "test_cases_dir": test_cases_dir,
199             "task_fnames": task_fnames,
200             "task_args": task_args,
201             "task_args_fnames": task_args_fnames
202         }
203
204         return suite_params
205
206     def parse_task(self, task_name, task_args=None, task_args_file=None):
207         '''parses the task file and return an context and scenario instances'''
208         print "Parsing task config:", self.path
209
210         try:
211             kw = {}
212             if task_args_file:
213                 with open(task_args_file) as f:
214                     kw.update(parse_task_args("task_args_file", f.read()))
215             kw.update(parse_task_args("task_args", task_args))
216         except TypeError:
217             raise TypeError()
218
219         try:
220             with open(self.path) as f:
221                 try:
222                     input_task = f.read()
223                     rendered_task = TaskTemplate.render(input_task, **kw)
224                 except Exception as e:
225                     print(("Failed to render template:\n%(task)s\n%(err)s\n")
226                           % {"task": input_task, "err": e})
227                     raise e
228                 print(("Input task is:\n%s\n") % rendered_task)
229
230                 cfg = yaml.load(rendered_task)
231         except IOError as ioerror:
232             sys.exit(ioerror)
233
234         self._check_schema(cfg["schema"], "task")
235
236         # TODO: support one or many contexts? Many would simpler and precise
237         # TODO: support hybrid context type
238         if "context" in cfg:
239             context_cfgs = [cfg["context"]]
240         elif "contexts" in cfg:
241             context_cfgs = cfg["contexts"]
242         else:
243             context_cfgs = [{"type": "Dummy"}]
244
245         for cfg_attrs in context_cfgs:
246             context_type = cfg_attrs.get("type", "Heat")
247             if "Heat" == context_type and "networks" in cfg_attrs:
248                 # config external_network based on env var
249                 for _, attrs in cfg_attrs["networks"].items():
250                     attrs["external_network"] = os.environ.get(
251                         'EXTERNAL_NETWORK', 'net04_ext')
252             context = Context.get(context_type)
253             context.init(cfg_attrs)
254
255         run_in_parallel = cfg.get("run_in_parallel", False)
256
257         # add tc and task id for influxdb extended tags
258         task_id = str(uuid.uuid4())
259         for scenario in cfg["scenarios"]:
260             scenario["tc"] = task_name
261             scenario["task_id"] = task_id
262
263         # TODO we need something better here, a class that represent the file
264         return cfg["scenarios"], run_in_parallel
265
266     def _check_schema(self, cfg_schema, schema_type):
267         '''Check if config file is using the correct schema type'''
268
269         if cfg_schema != "yardstick:" + schema_type + ":0.1":
270             sys.exit("error: file %s has unknown schema %s" % (self.path,
271                                                                cfg_schema))
272
273
274 def atexit_handler():
275     '''handler for process termination'''
276     base_runner.Runner.terminate_all()
277
278     if len(Context.list) > 0:
279         print "Undeploying all contexts"
280         for context in Context.list:
281             context.undeploy()
282
283
284 def is_ip_addr(addr):
285     '''check if string addr is an IP address'''
286     try:
287         ipaddress.ip_address(unicode(addr))
288         return True
289     except ValueError:
290         return False
291
292
293 def _is_same_heat_context(host_attr, target_attr):
294     '''check if two servers are in the same heat context
295     host_attr: either a name for a server created by yardstick or a dict
296     with attribute name mapping when using external heat templates
297     target_attr: either a name for a server created by yardstick or a dict
298     with attribute name mapping when using external heat templates
299     '''
300     host = None
301     target = None
302     for context in Context.list:
303         if context.__context_type__ != "Heat":
304             continue
305
306         host = context._get_server(host_attr)
307         if host is None:
308             continue
309
310         target = context._get_server(target_attr)
311         if target is None:
312             return False
313
314         # Both host and target is not None, then they are in the
315         # same heat context.
316         return True
317
318     return False
319
320
321 def _is_background_scenario(scenario):
322     if "run_in_background" in scenario:
323         return scenario["run_in_background"]
324     else:
325         return False
326
327
328 def run_one_scenario(scenario_cfg, output_file):
329     '''run one scenario using context'''
330     runner_cfg = scenario_cfg["runner"]
331     runner_cfg['output_filename'] = output_file
332
333     # TODO support get multi hosts/vms info
334     context_cfg = {}
335     if "host" in scenario_cfg:
336         context_cfg['host'] = Context.get_server(scenario_cfg["host"])
337
338     if "target" in scenario_cfg:
339         if is_ip_addr(scenario_cfg["target"]):
340             context_cfg['target'] = {}
341             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
342         else:
343             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
344             if _is_same_heat_context(scenario_cfg["host"],
345                                      scenario_cfg["target"]):
346                 context_cfg["target"]["ipaddr"] = \
347                     context_cfg["target"]["private_ip"]
348             else:
349                 context_cfg["target"]["ipaddr"] = \
350                     context_cfg["target"]["ip"]
351
352     if "nodes" in scenario_cfg:
353         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
354     runner = base_runner.Runner.get(runner_cfg)
355
356     print "Starting runner of type '%s'" % runner_cfg["type"]
357     runner.run(scenario_cfg, context_cfg)
358
359     return runner
360
361
362 def parse_nodes_with_context(scenario_cfg):
363     '''paras the 'nodes' fields in scenario '''
364     nodes = scenario_cfg["nodes"]
365
366     nodes_cfg = {}
367     for nodename in nodes:
368         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
369
370     return nodes_cfg
371
372
373 def runner_join(runner):
374     '''join (wait for) a runner, exit process at runner failure'''
375     status = runner.join()
376     base_runner.Runner.release(runner)
377     if status != 0:
378         sys.exit("Runner failed")
379
380
381 def print_invalid_header(source_name, args):
382     print(("Invalid %(source)s passed:\n\n %(args)s\n")
383           % {"source": source_name, "args": args})
384
385
386 def parse_task_args(src_name, args):
387     try:
388         kw = args and yaml.safe_load(args)
389         kw = {} if kw is None else kw
390     except yaml.parser.ParserError as e:
391         print_invalid_header(src_name, args)
392         print(("%(source)s has to be YAML. Details:\n\n%(err)s\n")
393               % {"source": src_name, "err": e})
394         raise TypeError()
395
396     if not isinstance(kw, dict):
397         print_invalid_header(src_name, args)
398         print(("%(src)s had to be dict, actually %(src_type)s\n")
399               % {"src": src_name, "src_type": type(kw)})
400         raise TypeError()
401     return kw