Heat context code refactor part 2
[yardstick.git] / yardstick / cmd / commands / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 import sys
13 import os
14 import yaml
15 import atexit
16 import ipaddress
17
18 from yardstick.benchmark.contexts.base import Context
19 from yardstick.benchmark.runners import base as base_runner
20 from yardstick.common.task_template import TaskTemplate
21 from yardstick.common.utils import cliargs
22
23 output_file_default = "/tmp/yardstick.out"
24 test_cases_dir_default = "tests/opnfv/test_cases/"
25
26
27 class TaskCommands(object):
28     '''Task commands.
29
30        Set of commands to manage benchmark tasks.
31     '''
32
33     @cliargs("inputfile", type=str, help="path to task or suite file", nargs=1)
34     @cliargs("--task-args", dest="task_args",
35              help="Input task args (dict in json). These args are used"
36              "to render input task that is jinja2 template.")
37     @cliargs("--task-args-file", dest="task_args_file",
38              help="Path to the file with input task args (dict in "
39              "json/yaml). These args are used to render input"
40              "task that is jinja2 template.")
41     @cliargs("--keep-deploy", help="keep context deployed in cloud",
42              action="store_true")
43     @cliargs("--parse-only", help="parse the config file and exit",
44              action="store_true")
45     @cliargs("--output-file", help="file where output is stored, default %s" %
46              output_file_default, default=output_file_default)
47     @cliargs("--suite", help="process test suite file instead of a task file",
48              action="store_true")
49     def do_start(self, args):
50         '''Start a benchmark scenario.'''
51
52         atexit.register(atexit_handler)
53
54         parser = TaskParser(args.inputfile[0])
55
56         suite_params = {}
57         if args.suite:
58             suite_params = parser.parse_suite()
59             test_cases_dir = suite_params["test_cases_dir"]
60             if test_cases_dir[-1] != os.sep:
61                 test_cases_dir += os.sep
62             task_files = [test_cases_dir + task
63                           for task in suite_params["task_fnames"]]
64         else:
65             task_files = [parser.path]
66
67         task_args = suite_params.get("task_args", [args.task_args])
68         task_args_fnames = suite_params.get("task_args_fnames",
69                                             [args.task_args_file])
70
71         if args.parse_only:
72             sys.exit(0)
73
74         if os.path.isfile(args.output_file):
75             os.remove(args.output_file)
76
77         for i in range(0, len(task_files)):
78             parser.path = task_files[i]
79             scenarios, run_in_parallel = parser.parse_task(task_args[i],
80                                                            task_args_fnames[i])
81
82             self._run(scenarios, run_in_parallel, args.output_file)
83
84             if args.keep_deploy:
85                 # keep deployment, forget about stack
86                 # (hide it for exit handler)
87                 Context.list = []
88             else:
89                 for context in Context.list:
90                     context.undeploy()
91                 Context.list = []
92
93         print "Done, exiting"
94
95     def _run(self, scenarios, run_in_parallel, output_file):
96         '''Deploys context and calls runners'''
97         for context in Context.list:
98             context.deploy()
99
100         runners = []
101         if run_in_parallel:
102             for scenario in scenarios:
103                 runner = run_one_scenario(scenario, output_file)
104                 runners.append(runner)
105
106             # Wait for runners to finish
107             for runner in runners:
108                 runner_join(runner)
109                 print "Runner ended, output in", output_file
110         else:
111             # run serially
112             for scenario in scenarios:
113                 runner = run_one_scenario(scenario, output_file)
114                 runner_join(runner)
115                 print "Runner ended, output in", output_file
116
117 # TODO: Move stuff below into TaskCommands class !?
118
119
120 class TaskParser(object):
121     '''Parser for task config files in yaml format'''
122     def __init__(self, path):
123         self.path = path
124
125     def parse_suite(self):
126         '''parse the suite file and return a list of task config file paths
127            and lists of optional parameters if present'''
128         print "Parsing suite file:", self.path
129
130         try:
131             with open(self.path) as stream:
132                 cfg = yaml.load(stream)
133         except IOError as ioerror:
134             sys.exit(ioerror)
135
136         self._check_schema(cfg["schema"], "suite")
137         print "Starting suite:", cfg["name"]
138
139         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
140         task_fnames = []
141         task_args = []
142         task_args_fnames = []
143
144         for task in cfg["test_cases"]:
145             task_fnames.append(task["file_name"])
146             if "task_args" in task:
147                 task_args.append(task["task_args"])
148             else:
149                 task_args.append(None)
150
151             if "task_args_file" in task:
152                 task_args_fnames.append(task["task_args_file"])
153             else:
154                 task_args_fnames.append(None)
155
156         suite_params = {
157             "test_cases_dir": test_cases_dir,
158             "task_fnames": task_fnames,
159             "task_args": task_args,
160             "task_args_fnames": task_args_fnames
161         }
162
163         return suite_params
164
165     def parse_task(self, task_args=None, task_args_file=None):
166         '''parses the task file and return an context and scenario instances'''
167         print "Parsing task config:", self.path
168
169         try:
170             kw = {}
171             if task_args_file:
172                 with open(task_args_file) as f:
173                     kw.update(parse_task_args("task_args_file", f.read()))
174             kw.update(parse_task_args("task_args", task_args))
175         except TypeError:
176             raise TypeError()
177
178         try:
179             with open(self.path) as f:
180                 try:
181                     input_task = f.read()
182                     rendered_task = TaskTemplate.render(input_task, **kw)
183                 except Exception as e:
184                     print(("Failed to render template:\n%(task)s\n%(err)s\n")
185                           % {"task": input_task, "err": e})
186                     raise e
187                 print(("Input task is:\n%s\n") % rendered_task)
188
189                 cfg = yaml.load(rendered_task)
190         except IOError as ioerror:
191             sys.exit(ioerror)
192
193         self._check_schema(cfg["schema"], "task")
194
195         # TODO: support one or many contexts? Many would simpler and precise
196         # TODO: support hybrid context type
197         if "context" in cfg:
198             context_cfgs = [cfg["context"]]
199         else:
200             context_cfgs = cfg["contexts"]
201
202         for cfg_attrs in context_cfgs:
203             context_type = cfg_attrs.get("type", "Heat")
204             if "Heat" == context_type and "networks" in cfg_attrs:
205                 # config external_network based on env var
206                 for _, attrs in cfg_attrs["networks"].items():
207                     attrs["external_network"] = os.environ.get(
208                         'EXTERNAL_NETWORK', 'net04_ext')
209             context = Context.get(context_type)
210             context.init(cfg_attrs)
211
212         run_in_parallel = cfg.get("run_in_parallel", False)
213
214         # TODO we need something better here, a class that represent the file
215         return cfg["scenarios"], run_in_parallel
216
217     def _check_schema(self, cfg_schema, schema_type):
218         '''Check if config file is using the correct schema type'''
219
220         if cfg_schema != "yardstick:" + schema_type + ":0.1":
221             sys.exit("error: file %s has unknown schema %s" % (self.path,
222                                                                cfg_schema))
223
224
225 def atexit_handler():
226     '''handler for process termination'''
227     base_runner.Runner.terminate_all()
228
229     if len(Context.list) > 0:
230         print "Undeploying all contexts"
231         for context in Context.list:
232             context.undeploy()
233
234
235 def is_ip_addr(addr):
236     '''check if string addr is an IP address'''
237     try:
238         ipaddress.ip_address(unicode(addr))
239         return True
240     except ValueError:
241         return False
242
243
244 def _is_same_heat_context(host_attr, target_attr):
245     '''check if two servers are in the same heat context
246     host_attr: either a name for a server created by yardstick or a dict
247     with attribute name mapping when using external heat templates
248     target_attr: either a name for a server created by yardstick or a dict
249     with attribute name mapping when using external heat templates
250     '''
251     host = None
252     target = None
253     for context in Context.list:
254         if context.__context_type__ != "Heat":
255             continue
256
257         host = context._get_server(host_attr)
258         if host is None:
259             continue
260
261         target = context._get_server(target_attr)
262         if target is None:
263             return False
264
265         # Both host and target is not None, then they are in the
266         # same heat context.
267         return True
268
269     return False
270
271
272 def run_one_scenario(scenario_cfg, output_file):
273     '''run one scenario using context'''
274     runner_cfg = scenario_cfg["runner"]
275     runner_cfg['output_filename'] = output_file
276
277     # TODO support get multi hosts/vms info
278     context_cfg = {}
279     context_cfg['host'] = Context.get_server(scenario_cfg["host"])
280
281     if "target" in scenario_cfg:
282         if is_ip_addr(scenario_cfg["target"]):
283             context_cfg['target'] = {}
284             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
285         else:
286             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
287             if _is_same_heat_context(scenario_cfg["host"],
288                                      scenario_cfg["target"]):
289                 context_cfg["target"]["ipaddr"] = \
290                     context_cfg["target"]["private_ip"]
291             else:
292                 context_cfg["target"]["ipaddr"] = \
293                     context_cfg["target"]["ip"]
294
295     runner = base_runner.Runner.get(runner_cfg)
296
297     print "Starting runner of type '%s'" % runner_cfg["type"]
298     runner.run(scenario_cfg, context_cfg)
299
300     return runner
301
302
303 def runner_join(runner):
304     '''join (wait for) a runner, exit process at runner failure'''
305     status = runner.join()
306     base_runner.Runner.release(runner)
307     if status != 0:
308         sys.exit("Runner failed")
309
310
311 def print_invalid_header(source_name, args):
312     print(("Invalid %(source)s passed:\n\n %(args)s\n")
313           % {"source": source_name, "args": args})
314
315
316 def parse_task_args(src_name, args):
317     try:
318         kw = args and yaml.safe_load(args)
319         kw = {} if kw is None else kw
320     except yaml.parser.ParserError as e:
321         print_invalid_header(src_name, args)
322         print(("%(source)s has to be YAML. Details:\n\n%(err)s\n")
323               % {"source": src_name, "err": e})
324         raise TypeError()
325
326     if not isinstance(kw, dict):
327         print_invalid_header(src_name, args)
328         print(("%(src)s had to be dict, actually %(src_type)s\n")
329               % {"src": src_name, "src_type": type(kw)})
330         raise TypeError()
331     return kw