Add Dummy context and scenario type
[yardstick.git] / yardstick / cmd / commands / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 import sys
13 import os
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 from itertools import ifilter
20 from yardstick.benchmark.contexts.base import Context
21 from yardstick.benchmark.runners import base as base_runner
22 from yardstick.common.task_template import TaskTemplate
23 from yardstick.common.utils import cliargs
24
25 output_file_default = "/tmp/yardstick.out"
26 test_cases_dir_default = "tests/opnfv/test_cases/"
27 LOG = logging.getLogger(__name__)
28
29
30 class TaskCommands(object):
31     '''Task commands.
32
33        Set of commands to manage benchmark tasks.
34     '''
35
36     @cliargs("inputfile", type=str, help="path to task or suite file", nargs=1)
37     @cliargs("--task-args", dest="task_args",
38              help="Input task args (dict in json). These args are used"
39              "to render input task that is jinja2 template.")
40     @cliargs("--task-args-file", dest="task_args_file",
41              help="Path to the file with input task args (dict in "
42              "json/yaml). These args are used to render input"
43              "task that is jinja2 template.")
44     @cliargs("--keep-deploy", help="keep context deployed in cloud",
45              action="store_true")
46     @cliargs("--parse-only", help="parse the config file and exit",
47              action="store_true")
48     @cliargs("--output-file", help="file where output is stored, default %s" %
49              output_file_default, default=output_file_default)
50     @cliargs("--suite", help="process test suite file instead of a task file",
51              action="store_true")
52     def do_start(self, args):
53         '''Start a benchmark scenario.'''
54
55         atexit.register(atexit_handler)
56
57         total_start_time = time.time()
58         parser = TaskParser(args.inputfile[0])
59
60         suite_params = {}
61         if args.suite:
62             suite_params = parser.parse_suite()
63             test_cases_dir = suite_params["test_cases_dir"]
64             if test_cases_dir[-1] != os.sep:
65                 test_cases_dir += os.sep
66             task_files = [test_cases_dir + task
67                           for task in suite_params["task_fnames"]]
68         else:
69             task_files = [parser.path]
70
71         task_args = suite_params.get("task_args", [args.task_args])
72         task_args_fnames = suite_params.get("task_args_fnames",
73                                             [args.task_args_file])
74
75         if args.parse_only:
76             sys.exit(0)
77
78         if os.path.isfile(args.output_file):
79             os.remove(args.output_file)
80
81         for i in range(0, len(task_files)):
82             one_task_start_time = time.time()
83             parser.path = task_files[i]
84             scenarios, run_in_parallel = parser.parse_task(task_args[i],
85                                                            task_args_fnames[i])
86
87             self._run(scenarios, run_in_parallel, args.output_file)
88
89             if args.keep_deploy:
90                 # keep deployment, forget about stack
91                 # (hide it for exit handler)
92                 Context.list = []
93             else:
94                 for context in Context.list:
95                     context.undeploy()
96                 Context.list = []
97             one_task_end_time = time.time()
98             LOG.info("task %s finished in %d secs", task_files[i],
99                      one_task_end_time - one_task_start_time)
100
101         total_end_time = time.time()
102         LOG.info("total finished in %d secs",
103                  total_end_time - total_start_time)
104
105         print "Done, exiting"
106
107     def _run(self, scenarios, run_in_parallel, output_file):
108         '''Deploys context and calls runners'''
109         for context in Context.list:
110             context.deploy()
111
112         background_runners = []
113
114         # Start all background scenarios
115         for scenario in ifilter(_is_background_scenario, scenarios):
116             scenario["runner"] = dict(type="Duration", duration=1000000000)
117             runner = run_one_scenario(scenario, output_file)
118             background_runners.append(runner)
119
120         runners = []
121         if run_in_parallel:
122             for scenario in scenarios:
123                 if not _is_background_scenario(scenario):
124                     runner = run_one_scenario(scenario, output_file)
125                     runners.append(runner)
126
127             # Wait for runners to finish
128             for runner in runners:
129                 runner_join(runner)
130                 print "Runner ended, output in", output_file
131         else:
132             # run serially
133             for scenario in scenarios:
134                 if not _is_background_scenario(scenario):
135                     runner = run_one_scenario(scenario, output_file)
136                     runner_join(runner)
137                     print "Runner ended, output in", output_file
138
139         # Abort background runners
140         for runner in background_runners:
141             runner.abort()
142
143         # Wait for background runners to finish
144         for runner in background_runners:
145             if runner.join(timeout=60) is None:
146                 # Nuke if it did not stop nicely
147                 base_runner.Runner.terminate(runner)
148                 runner_join(runner)
149             else:
150                 base_runner.Runner.release(runner)
151             print "Background task ended"
152
153
154 # TODO: Move stuff below into TaskCommands class !?
155
156
157 class TaskParser(object):
158     '''Parser for task config files in yaml format'''
159     def __init__(self, path):
160         self.path = path
161
162     def parse_suite(self):
163         '''parse the suite file and return a list of task config file paths
164            and lists of optional parameters if present'''
165         print "Parsing suite file:", self.path
166
167         try:
168             with open(self.path) as stream:
169                 cfg = yaml.load(stream)
170         except IOError as ioerror:
171             sys.exit(ioerror)
172
173         self._check_schema(cfg["schema"], "suite")
174         print "Starting suite:", cfg["name"]
175
176         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
177         task_fnames = []
178         task_args = []
179         task_args_fnames = []
180
181         for task in cfg["test_cases"]:
182             task_fnames.append(task["file_name"])
183             if "task_args" in task:
184                 task_args.append(task["task_args"])
185             else:
186                 task_args.append(None)
187
188             if "task_args_file" in task:
189                 task_args_fnames.append(task["task_args_file"])
190             else:
191                 task_args_fnames.append(None)
192
193         suite_params = {
194             "test_cases_dir": test_cases_dir,
195             "task_fnames": task_fnames,
196             "task_args": task_args,
197             "task_args_fnames": task_args_fnames
198         }
199
200         return suite_params
201
202     def parse_task(self, task_args=None, task_args_file=None):
203         '''parses the task file and return an context and scenario instances'''
204         print "Parsing task config:", self.path
205
206         try:
207             kw = {}
208             if task_args_file:
209                 with open(task_args_file) as f:
210                     kw.update(parse_task_args("task_args_file", f.read()))
211             kw.update(parse_task_args("task_args", task_args))
212         except TypeError:
213             raise TypeError()
214
215         try:
216             with open(self.path) as f:
217                 try:
218                     input_task = f.read()
219                     rendered_task = TaskTemplate.render(input_task, **kw)
220                 except Exception as e:
221                     print(("Failed to render template:\n%(task)s\n%(err)s\n")
222                           % {"task": input_task, "err": e})
223                     raise e
224                 print(("Input task is:\n%s\n") % rendered_task)
225
226                 cfg = yaml.load(rendered_task)
227         except IOError as ioerror:
228             sys.exit(ioerror)
229
230         self._check_schema(cfg["schema"], "task")
231
232         # TODO: support one or many contexts? Many would simpler and precise
233         # TODO: support hybrid context type
234         if "context" in cfg:
235             context_cfgs = [cfg["context"]]
236         elif "contexts" in cfg:
237             context_cfgs = cfg["contexts"]
238         else:
239             context_cfgs = [{"type": "Dummy"}]
240
241         for cfg_attrs in context_cfgs:
242             context_type = cfg_attrs.get("type", "Heat")
243             if "Heat" == context_type and "networks" in cfg_attrs:
244                 # config external_network based on env var
245                 for _, attrs in cfg_attrs["networks"].items():
246                     attrs["external_network"] = os.environ.get(
247                         'EXTERNAL_NETWORK', 'net04_ext')
248             context = Context.get(context_type)
249             context.init(cfg_attrs)
250
251         run_in_parallel = cfg.get("run_in_parallel", False)
252
253         # TODO we need something better here, a class that represent the file
254         return cfg["scenarios"], run_in_parallel
255
256     def _check_schema(self, cfg_schema, schema_type):
257         '''Check if config file is using the correct schema type'''
258
259         if cfg_schema != "yardstick:" + schema_type + ":0.1":
260             sys.exit("error: file %s has unknown schema %s" % (self.path,
261                                                                cfg_schema))
262
263
264 def atexit_handler():
265     '''handler for process termination'''
266     base_runner.Runner.terminate_all()
267
268     if len(Context.list) > 0:
269         print "Undeploying all contexts"
270         for context in Context.list:
271             context.undeploy()
272
273
274 def is_ip_addr(addr):
275     '''check if string addr is an IP address'''
276     try:
277         ipaddress.ip_address(unicode(addr))
278         return True
279     except ValueError:
280         return False
281
282
283 def _is_same_heat_context(host_attr, target_attr):
284     '''check if two servers are in the same heat context
285     host_attr: either a name for a server created by yardstick or a dict
286     with attribute name mapping when using external heat templates
287     target_attr: either a name for a server created by yardstick or a dict
288     with attribute name mapping when using external heat templates
289     '''
290     host = None
291     target = None
292     for context in Context.list:
293         if context.__context_type__ != "Heat":
294             continue
295
296         host = context._get_server(host_attr)
297         if host is None:
298             continue
299
300         target = context._get_server(target_attr)
301         if target is None:
302             return False
303
304         # Both host and target is not None, then they are in the
305         # same heat context.
306         return True
307
308     return False
309
310
311 def _is_background_scenario(scenario):
312     if "run_in_background" in scenario:
313         return scenario["run_in_background"]
314     else:
315         return False
316
317
318 def run_one_scenario(scenario_cfg, output_file):
319     '''run one scenario using context'''
320     runner_cfg = scenario_cfg["runner"]
321     runner_cfg['output_filename'] = output_file
322
323     # TODO support get multi hosts/vms info
324     context_cfg = {}
325     if "host" in scenario_cfg:
326         context_cfg['host'] = Context.get_server(scenario_cfg["host"])
327
328     if "target" in scenario_cfg:
329         if is_ip_addr(scenario_cfg["target"]):
330             context_cfg['target'] = {}
331             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
332         else:
333             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
334             if _is_same_heat_context(scenario_cfg["host"],
335                                      scenario_cfg["target"]):
336                 context_cfg["target"]["ipaddr"] = \
337                     context_cfg["target"]["private_ip"]
338             else:
339                 context_cfg["target"]["ipaddr"] = \
340                     context_cfg["target"]["ip"]
341
342     if "nodes" in scenario_cfg:
343         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
344     runner = base_runner.Runner.get(runner_cfg)
345
346     print "Starting runner of type '%s'" % runner_cfg["type"]
347     runner.run(scenario_cfg, context_cfg)
348
349     return runner
350
351
352 def parse_nodes_with_context(scenario_cfg):
353     '''paras the 'nodes' fields in scenario '''
354     nodes = scenario_cfg["nodes"]
355
356     nodes_cfg = {}
357     for nodename in nodes:
358         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
359
360     return nodes_cfg
361
362
363 def runner_join(runner):
364     '''join (wait for) a runner, exit process at runner failure'''
365     status = runner.join()
366     base_runner.Runner.release(runner)
367     if status != 0:
368         sys.exit("Runner failed")
369
370
371 def print_invalid_header(source_name, args):
372     print(("Invalid %(source)s passed:\n\n %(args)s\n")
373           % {"source": source_name, "args": args})
374
375
376 def parse_task_args(src_name, args):
377     try:
378         kw = args and yaml.safe_load(args)
379         kw = {} if kw is None else kw
380     except yaml.parser.ParserError as e:
381         print_invalid_header(src_name, args)
382         print(("%(source)s has to be YAML. Details:\n\n%(err)s\n")
383               % {"source": src_name, "err": e})
384         raise TypeError()
385
386     if not isinstance(kw, dict):
387         print_invalid_header(src_name, args)
388         print(("%(src)s had to be dict, actually %(src_type)s\n")
389               % {"src": src_name, "src_type": type(kw)})
390         raise TypeError()
391     return kw