a56824aac53721347deafdafbcc24ea917de5d90
[yardstick.git] / yardstick / cmd / commands / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 import sys
13 import os
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 from itertools import ifilter
20 from yardstick.benchmark.contexts.base import Context
21 from yardstick.benchmark.runners import base as base_runner
22 from yardstick.common.task_template import TaskTemplate
23 from yardstick.common.utils import cliargs
24
25 output_file_default = "/tmp/yardstick.out"
26 test_cases_dir_default = "tests/opnfv/test_cases/"
27 LOG = logging.getLogger(__name__)
28
29
30 class TaskCommands(object):
31     '''Task commands.
32
33        Set of commands to manage benchmark tasks.
34     '''
35
36     @cliargs("inputfile", type=str, help="path to task or suite file", nargs=1)
37     @cliargs("--task-args", dest="task_args",
38              help="Input task args (dict in json). These args are used"
39              "to render input task that is jinja2 template.")
40     @cliargs("--task-args-file", dest="task_args_file",
41              help="Path to the file with input task args (dict in "
42              "json/yaml). These args are used to render input"
43              "task that is jinja2 template.")
44     @cliargs("--keep-deploy", help="keep context deployed in cloud",
45              action="store_true")
46     @cliargs("--parse-only", help="parse the config file and exit",
47              action="store_true")
48     @cliargs("--output-file", help="file where output is stored, default %s" %
49              output_file_default, default=output_file_default)
50     @cliargs("--suite", help="process test suite file instead of a task file",
51              action="store_true")
52     def do_start(self, args):
53         '''Start a benchmark scenario.'''
54
55         atexit.register(atexit_handler)
56
57         total_start_time = time.time()
58         parser = TaskParser(args.inputfile[0])
59
60         suite_params = {}
61         if args.suite:
62             suite_params = parser.parse_suite()
63             test_cases_dir = suite_params["test_cases_dir"]
64             if test_cases_dir[-1] != os.sep:
65                 test_cases_dir += os.sep
66             task_files = [test_cases_dir + task
67                           for task in suite_params["task_fnames"]]
68         else:
69             task_files = [parser.path]
70
71         task_args = suite_params.get("task_args", [args.task_args])
72         task_args_fnames = suite_params.get("task_args_fnames",
73                                             [args.task_args_file])
74
75         if args.parse_only:
76             sys.exit(0)
77
78         if os.path.isfile(args.output_file):
79             os.remove(args.output_file)
80
81         for i in range(0, len(task_files)):
82             one_task_start_time = time.time()
83             parser.path = task_files[i]
84             scenarios, run_in_parallel = parser.parse_task(task_args[i],
85                                                            task_args_fnames[i])
86
87             self._run(scenarios, run_in_parallel, args.output_file)
88
89             if args.keep_deploy:
90                 # keep deployment, forget about stack
91                 # (hide it for exit handler)
92                 Context.list = []
93             else:
94                 for context in Context.list:
95                     context.undeploy()
96                 Context.list = []
97             one_task_end_time = time.time()
98             LOG.info("task %s finished in %d secs", task_files[i],
99                      one_task_end_time - one_task_start_time)
100
101         total_end_time = time.time()
102         LOG.info("total finished in %d secs",
103                  total_end_time - total_start_time)
104
105         print "Done, exiting"
106
107     def _run(self, scenarios, run_in_parallel, output_file):
108         '''Deploys context and calls runners'''
109         for context in Context.list:
110             context.deploy()
111
112         background_runners = []
113
114         # Start all background scenarios
115         for scenario in ifilter(_is_background_scenario, scenarios):
116             scenario["runner"] = dict(type="Duration", duration=1000000000)
117             runner = run_one_scenario(scenario, output_file)
118             background_runners.append(runner)
119
120         runners = []
121         if run_in_parallel:
122             for scenario in scenarios:
123                 if not _is_background_scenario(scenario):
124                     runner = run_one_scenario(scenario, output_file)
125                     runners.append(runner)
126
127             # Wait for runners to finish
128             for runner in runners:
129                 runner_join(runner)
130                 print "Runner ended, output in", output_file
131         else:
132             # run serially
133             for scenario in scenarios:
134                 if not _is_background_scenario(scenario):
135                     runner = run_one_scenario(scenario, output_file)
136                     runner_join(runner)
137                     print "Runner ended, output in", output_file
138
139         # Abort background runners
140         for runner in background_runners:
141             runner.abort()
142
143         # Wait for background runners to finish
144         for runner in background_runners:
145             if runner.join(timeout=60) is None:
146                 # Nuke if it did not stop nicely
147                 base_runner.Runner.terminate(runner)
148                 runner_join(runner)
149             else:
150                 base_runner.Runner.release(runner)
151             print "Background task ended"
152
153
154 # TODO: Move stuff below into TaskCommands class !?
155
156
157 class TaskParser(object):
158     '''Parser for task config files in yaml format'''
159     def __init__(self, path):
160         self.path = path
161
162     def parse_suite(self):
163         '''parse the suite file and return a list of task config file paths
164            and lists of optional parameters if present'''
165         print "Parsing suite file:", self.path
166
167         try:
168             with open(self.path) as stream:
169                 cfg = yaml.load(stream)
170         except IOError as ioerror:
171             sys.exit(ioerror)
172
173         self._check_schema(cfg["schema"], "suite")
174         print "Starting suite:", cfg["name"]
175
176         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
177         task_fnames = []
178         task_args = []
179         task_args_fnames = []
180
181         for task in cfg["test_cases"]:
182             task_fnames.append(task["file_name"])
183             if "task_args" in task:
184                 task_args.append(task["task_args"])
185             else:
186                 task_args.append(None)
187
188             if "task_args_file" in task:
189                 task_args_fnames.append(task["task_args_file"])
190             else:
191                 task_args_fnames.append(None)
192
193         suite_params = {
194             "test_cases_dir": test_cases_dir,
195             "task_fnames": task_fnames,
196             "task_args": task_args,
197             "task_args_fnames": task_args_fnames
198         }
199
200         return suite_params
201
202     def parse_task(self, task_args=None, task_args_file=None):
203         '''parses the task file and return an context and scenario instances'''
204         print "Parsing task config:", self.path
205
206         try:
207             kw = {}
208             if task_args_file:
209                 with open(task_args_file) as f:
210                     kw.update(parse_task_args("task_args_file", f.read()))
211             kw.update(parse_task_args("task_args", task_args))
212         except TypeError:
213             raise TypeError()
214
215         try:
216             with open(self.path) as f:
217                 try:
218                     input_task = f.read()
219                     rendered_task = TaskTemplate.render(input_task, **kw)
220                 except Exception as e:
221                     print(("Failed to render template:\n%(task)s\n%(err)s\n")
222                           % {"task": input_task, "err": e})
223                     raise e
224                 print(("Input task is:\n%s\n") % rendered_task)
225
226                 cfg = yaml.load(rendered_task)
227         except IOError as ioerror:
228             sys.exit(ioerror)
229
230         self._check_schema(cfg["schema"], "task")
231
232         # TODO: support one or many contexts? Many would simpler and precise
233         # TODO: support hybrid context type
234         if "context" in cfg:
235             context_cfgs = [cfg["context"]]
236         else:
237             context_cfgs = cfg["contexts"]
238
239         for cfg_attrs in context_cfgs:
240             context_type = cfg_attrs.get("type", "Heat")
241             if "Heat" == context_type and "networks" in cfg_attrs:
242                 # config external_network based on env var
243                 for _, attrs in cfg_attrs["networks"].items():
244                     attrs["external_network"] = os.environ.get(
245                         'EXTERNAL_NETWORK', 'net04_ext')
246             context = Context.get(context_type)
247             context.init(cfg_attrs)
248
249         run_in_parallel = cfg.get("run_in_parallel", False)
250
251         # TODO we need something better here, a class that represent the file
252         return cfg["scenarios"], run_in_parallel
253
254     def _check_schema(self, cfg_schema, schema_type):
255         '''Check if config file is using the correct schema type'''
256
257         if cfg_schema != "yardstick:" + schema_type + ":0.1":
258             sys.exit("error: file %s has unknown schema %s" % (self.path,
259                                                                cfg_schema))
260
261
262 def atexit_handler():
263     '''handler for process termination'''
264     base_runner.Runner.terminate_all()
265
266     if len(Context.list) > 0:
267         print "Undeploying all contexts"
268         for context in Context.list:
269             context.undeploy()
270
271
272 def is_ip_addr(addr):
273     '''check if string addr is an IP address'''
274     try:
275         ipaddress.ip_address(unicode(addr))
276         return True
277     except ValueError:
278         return False
279
280
281 def _is_same_heat_context(host_attr, target_attr):
282     '''check if two servers are in the same heat context
283     host_attr: either a name for a server created by yardstick or a dict
284     with attribute name mapping when using external heat templates
285     target_attr: either a name for a server created by yardstick or a dict
286     with attribute name mapping when using external heat templates
287     '''
288     host = None
289     target = None
290     for context in Context.list:
291         if context.__context_type__ != "Heat":
292             continue
293
294         host = context._get_server(host_attr)
295         if host is None:
296             continue
297
298         target = context._get_server(target_attr)
299         if target is None:
300             return False
301
302         # Both host and target is not None, then they are in the
303         # same heat context.
304         return True
305
306     return False
307
308
309 def _is_background_scenario(scenario):
310     if "run_in_background" in scenario:
311         return scenario["run_in_background"]
312     else:
313         return False
314
315
316 def run_one_scenario(scenario_cfg, output_file):
317     '''run one scenario using context'''
318     runner_cfg = scenario_cfg["runner"]
319     runner_cfg['output_filename'] = output_file
320
321     # TODO support get multi hosts/vms info
322     context_cfg = {}
323     if "host" in scenario_cfg:
324         context_cfg['host'] = Context.get_server(scenario_cfg["host"])
325
326     if "target" in scenario_cfg:
327         if is_ip_addr(scenario_cfg["target"]):
328             context_cfg['target'] = {}
329             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
330         else:
331             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
332             if _is_same_heat_context(scenario_cfg["host"],
333                                      scenario_cfg["target"]):
334                 context_cfg["target"]["ipaddr"] = \
335                     context_cfg["target"]["private_ip"]
336             else:
337                 context_cfg["target"]["ipaddr"] = \
338                     context_cfg["target"]["ip"]
339
340     if "nodes" in scenario_cfg:
341         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
342     runner = base_runner.Runner.get(runner_cfg)
343
344     print "Starting runner of type '%s'" % runner_cfg["type"]
345     runner.run(scenario_cfg, context_cfg)
346
347     return runner
348
349
350 def parse_nodes_with_context(scenario_cfg):
351     '''paras the 'nodes' fields in scenario '''
352     nodes = scenario_cfg["nodes"]
353
354     nodes_cfg = {}
355     for nodename in nodes:
356         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
357
358     return nodes_cfg
359
360
361 def runner_join(runner):
362     '''join (wait for) a runner, exit process at runner failure'''
363     status = runner.join()
364     base_runner.Runner.release(runner)
365     if status != 0:
366         sys.exit("Runner failed")
367
368
369 def print_invalid_header(source_name, args):
370     print(("Invalid %(source)s passed:\n\n %(args)s\n")
371           % {"source": source_name, "args": args})
372
373
374 def parse_task_args(src_name, args):
375     try:
376         kw = args and yaml.safe_load(args)
377         kw = {} if kw is None else kw
378     except yaml.parser.ParserError as e:
379         print_invalid_header(src_name, args)
380         print(("%(source)s has to be YAML. Details:\n\n%(err)s\n")
381               % {"source": src_name, "err": e})
382         raise TypeError()
383
384     if not isinstance(kw, dict):
385         print_invalid_header(src_name, args)
386         print(("%(src)s had to be dict, actually %(src_type)s\n")
387               % {"src": src_name, "src_type": type(kw)})
388         raise TypeError()
389     return kw