Create API and command to create a influxDB container
[yardstick.git] / yardstick / cmd / commands / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 import sys
13 import os
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 import uuid
20 from itertools import ifilter
21
22 from yardstick.benchmark.contexts.base import Context
23 from yardstick.benchmark.runners import base as base_runner
24 from yardstick.common.task_template import TaskTemplate
25 from yardstick.common.utils import cliargs
26
27 output_file_default = "/tmp/yardstick.out"
28 test_cases_dir_default = "tests/opnfv/test_cases/"
29 LOG = logging.getLogger(__name__)
30
31
32 class TaskCommands(object):
33     '''Task commands.
34
35        Set of commands to manage benchmark tasks.
36     '''
37
38     @cliargs("inputfile", type=str, help="path to task or suite file", nargs=1)
39     @cliargs("--task-args", dest="task_args",
40              help="Input task args (dict in json). These args are used"
41              "to render input task that is jinja2 template.")
42     @cliargs("--task-args-file", dest="task_args_file",
43              help="Path to the file with input task args (dict in "
44              "json/yaml). These args are used to render input"
45              "task that is jinja2 template.")
46     @cliargs("--keep-deploy", help="keep context deployed in cloud",
47              action="store_true")
48     @cliargs("--parse-only", help="parse the config file and exit",
49              action="store_true")
50     @cliargs("--output-file", help="file where output is stored, default %s" %
51              output_file_default, default=output_file_default)
52     @cliargs("--suite", help="process test suite file instead of a task file",
53              action="store_true")
54     def do_start(self, args, **kwargs):
55         '''Start a benchmark scenario.'''
56
57         atexit.register(atexit_handler)
58
59         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
60
61         total_start_time = time.time()
62         parser = TaskParser(args.inputfile[0])
63
64         if args.suite:
65             # 1.parse suite, return suite_params info
66             task_files, task_args, task_args_fnames = \
67                 parser.parse_suite()
68         else:
69             task_files = [parser.path]
70             task_args = [args.task_args]
71             task_args_fnames = [args.task_args_file]
72
73         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
74                  task_files, task_args, task_args_fnames)
75
76         if args.parse_only:
77             sys.exit(0)
78
79         if os.path.isfile(args.output_file):
80             os.remove(args.output_file)
81         # parse task_files
82         for i in range(0, len(task_files)):
83             one_task_start_time = time.time()
84             parser.path = task_files[i]
85             scenarios, run_in_parallel, meet_precondition = parser.parse_task(
86                  self.task_id, task_args[i], task_args_fnames[i])
87
88             if not meet_precondition:
89                 LOG.info("meet_precondition is %s, please check envrionment",
90                          meet_precondition)
91                 continue
92
93             self._run(scenarios, run_in_parallel, args.output_file)
94
95             if args.keep_deploy:
96                 # keep deployment, forget about stack
97                 # (hide it for exit handler)
98                 Context.list = []
99             else:
100                 for context in Context.list:
101                     context.undeploy()
102                 Context.list = []
103             one_task_end_time = time.time()
104             LOG.info("task %s finished in %d secs", task_files[i],
105                      one_task_end_time - one_task_start_time)
106
107         total_end_time = time.time()
108         LOG.info("total finished in %d secs",
109                  total_end_time - total_start_time)
110
111         print "Done, exiting"
112
113     def _run(self, scenarios, run_in_parallel, output_file):
114         '''Deploys context and calls runners'''
115         for context in Context.list:
116             context.deploy()
117
118         background_runners = []
119
120         # Start all background scenarios
121         for scenario in ifilter(_is_background_scenario, scenarios):
122             scenario["runner"] = dict(type="Duration", duration=1000000000)
123             runner = run_one_scenario(scenario, output_file)
124             background_runners.append(runner)
125
126         runners = []
127         if run_in_parallel:
128             for scenario in scenarios:
129                 if not _is_background_scenario(scenario):
130                     runner = run_one_scenario(scenario, output_file)
131                     runners.append(runner)
132
133             # Wait for runners to finish
134             for runner in runners:
135                 runner_join(runner)
136                 print "Runner ended, output in", output_file
137         else:
138             # run serially
139             for scenario in scenarios:
140                 if not _is_background_scenario(scenario):
141                     runner = run_one_scenario(scenario, output_file)
142                     runner_join(runner)
143                     print "Runner ended, output in", output_file
144
145         # Abort background runners
146         for runner in background_runners:
147             runner.abort()
148
149         # Wait for background runners to finish
150         for runner in background_runners:
151             if runner.join(timeout=60) is None:
152                 # Nuke if it did not stop nicely
153                 base_runner.Runner.terminate(runner)
154                 runner_join(runner)
155             else:
156                 base_runner.Runner.release(runner)
157             print "Background task ended"
158
159
160 # TODO: Move stuff below into TaskCommands class !?
161
162
163 class TaskParser(object):
164     '''Parser for task config files in yaml format'''
165     def __init__(self, path):
166         self.path = path
167
168     def _meet_constraint(self, task, cur_pod, cur_installer):
169         if "constraint" in task:
170             constraint = task.get('constraint', None)
171             if constraint is not None:
172                 tc_fit_pod = constraint.get('pod', None)
173                 tc_fit_installer = constraint.get('installer', None)
174                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
175                          cur_pod, cur_installer, constraint)
176                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
177                     return False
178                 if cur_installer and tc_fit_installer and \
179                         cur_installer not in tc_fit_installer:
180                     return False
181         return True
182
183     def _get_task_para(self, task, cur_pod):
184         task_args = task.get('task_args', None)
185         if task_args is not None:
186             task_args = task_args.get(cur_pod, None)
187         task_args_fnames = task.get('task_args_fnames', None)
188         if task_args_fnames is not None:
189             task_args_fnames = task_args_fnames.get(cur_pod, None)
190         return task_args, task_args_fnames
191
192     def parse_suite(self):
193         '''parse the suite file and return a list of task config file paths
194            and lists of optional parameters if present'''
195         LOG.info("\nParsing suite file:%s", self.path)
196
197         try:
198             with open(self.path) as stream:
199                 cfg = yaml.load(stream)
200         except IOError as ioerror:
201             sys.exit(ioerror)
202
203         self._check_schema(cfg["schema"], "suite")
204         LOG.info("\nStarting scenario:%s", cfg["name"])
205
206         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
207         if test_cases_dir[-1] != os.sep:
208             test_cases_dir += os.sep
209
210         cur_pod = os.environ.get('NODE_NAME', None)
211         cur_installer = os.environ.get('INSTALLER_TYPE', None)
212
213         valid_task_files = []
214         valid_task_args = []
215         valid_task_args_fnames = []
216
217         for task in cfg["test_cases"]:
218             # 1.check file_name
219             if "file_name" in task:
220                 task_fname = task.get('file_name', None)
221                 if task_fname is None:
222                     continue
223             else:
224                 continue
225             # 2.check constraint
226             if self._meet_constraint(task, cur_pod, cur_installer):
227                 valid_task_files.append(test_cases_dir + task_fname)
228             else:
229                 continue
230             # 3.fetch task parameters
231             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
232             valid_task_args.append(task_args)
233             valid_task_args_fnames.append(task_args_fnames)
234
235         return valid_task_files, valid_task_args, valid_task_args_fnames
236
237     def parse_task(self, task_id, task_args=None, task_args_file=None):
238         '''parses the task file and return an context and scenario instances'''
239         print "Parsing task config:", self.path
240
241         try:
242             kw = {}
243             if task_args_file:
244                 with open(task_args_file) as f:
245                     kw.update(parse_task_args("task_args_file", f.read()))
246             kw.update(parse_task_args("task_args", task_args))
247         except TypeError:
248             raise TypeError()
249
250         try:
251             with open(self.path) as f:
252                 try:
253                     input_task = f.read()
254                     rendered_task = TaskTemplate.render(input_task, **kw)
255                 except Exception as e:
256                     print(("Failed to render template:\n%(task)s\n%(err)s\n")
257                           % {"task": input_task, "err": e})
258                     raise e
259                 print(("Input task is:\n%s\n") % rendered_task)
260
261                 cfg = yaml.load(rendered_task)
262         except IOError as ioerror:
263             sys.exit(ioerror)
264
265         self._check_schema(cfg["schema"], "task")
266         meet_precondition = self._check_precondition(cfg)
267
268         # TODO: support one or many contexts? Many would simpler and precise
269         # TODO: support hybrid context type
270         if "context" in cfg:
271             context_cfgs = [cfg["context"]]
272         elif "contexts" in cfg:
273             context_cfgs = cfg["contexts"]
274         else:
275             context_cfgs = [{"type": "Dummy"}]
276
277         for cfg_attrs in context_cfgs:
278             context_type = cfg_attrs.get("type", "Heat")
279             if "Heat" == context_type and "networks" in cfg_attrs:
280                 # bugfix: if there are more than one network,
281                 # only add "external_network" on first one.
282                 # the name of netwrok should follow this rule:
283                 # test, test2, test3 ...
284                 # sort network with the length of network's name
285                 sorted_networks = sorted(cfg_attrs["networks"].keys())
286                 # config external_network based on env var
287                 cfg_attrs["networks"][sorted_networks[0]]["external_network"] \
288                     = os.environ.get("EXTERNAL_NETWORK", "net04_ext")
289
290             context = Context.get(context_type)
291             context.init(cfg_attrs)
292
293         run_in_parallel = cfg.get("run_in_parallel", False)
294
295         # add tc and task id for influxdb extended tags
296         for scenario in cfg["scenarios"]:
297             task_name = os.path.splitext(os.path.basename(self.path))[0]
298             scenario["tc"] = task_name
299             scenario["task_id"] = task_id
300
301         # TODO we need something better here, a class that represent the file
302         return cfg["scenarios"], run_in_parallel, meet_precondition
303
304     def _check_schema(self, cfg_schema, schema_type):
305         '''Check if config file is using the correct schema type'''
306
307         if cfg_schema != "yardstick:" + schema_type + ":0.1":
308             sys.exit("error: file %s has unknown schema %s" % (self.path,
309                                                                cfg_schema))
310
311     def _check_precondition(self, cfg):
312         '''Check if the envrionment meet the preconditon'''
313
314         if "precondition" in cfg:
315             precondition = cfg["precondition"]
316             installer_type = precondition.get("installer_type", None)
317             deploy_scenarios = precondition.get("deploy_scenarios", None)
318             tc_fit_pods = precondition.get("pod_name", None)
319             installer_type_env = os.environ.get('INSTALL_TYPE', None)
320             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
321             pod_name_env = os.environ.get('NODE_NAME', None)
322
323             LOG.info("installer_type: %s, installer_type_env: %s",
324                      installer_type, installer_type_env)
325             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
326                      deploy_scenarios, deploy_scenario_env)
327             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
328                      tc_fit_pods, pod_name_env)
329             if installer_type and installer_type_env:
330                 if installer_type_env not in installer_type:
331                     return False
332             if deploy_scenarios and deploy_scenario_env:
333                 deploy_scenarios_list = deploy_scenarios.split(',')
334                 for deploy_scenario in deploy_scenarios_list:
335                     if deploy_scenario_env.startswith(deploy_scenario):
336                         return True
337                 return False
338             if tc_fit_pods and pod_name_env:
339                 if pod_name_env not in tc_fit_pods:
340                     return False
341         return True
342
343
344 def atexit_handler():
345     '''handler for process termination'''
346     base_runner.Runner.terminate_all()
347
348     if len(Context.list) > 0:
349         print "Undeploying all contexts"
350         for context in Context.list:
351             context.undeploy()
352
353
354 def is_ip_addr(addr):
355     '''check if string addr is an IP address'''
356     try:
357         ipaddress.ip_address(unicode(addr))
358         return True
359     except ValueError:
360         return False
361
362
363 def _is_same_heat_context(host_attr, target_attr):
364     '''check if two servers are in the same heat context
365     host_attr: either a name for a server created by yardstick or a dict
366     with attribute name mapping when using external heat templates
367     target_attr: either a name for a server created by yardstick or a dict
368     with attribute name mapping when using external heat templates
369     '''
370     host = None
371     target = None
372     for context in Context.list:
373         if context.__context_type__ != "Heat":
374             continue
375
376         host = context._get_server(host_attr)
377         if host is None:
378             continue
379
380         target = context._get_server(target_attr)
381         if target is None:
382             return False
383
384         # Both host and target is not None, then they are in the
385         # same heat context.
386         return True
387
388     return False
389
390
391 def _is_background_scenario(scenario):
392     if "run_in_background" in scenario:
393         return scenario["run_in_background"]
394     else:
395         return False
396
397
398 def run_one_scenario(scenario_cfg, output_file):
399     '''run one scenario using context'''
400     runner_cfg = scenario_cfg["runner"]
401     runner_cfg['output_filename'] = output_file
402
403     # TODO support get multi hosts/vms info
404     context_cfg = {}
405     if "host" in scenario_cfg:
406         context_cfg['host'] = Context.get_server(scenario_cfg["host"])
407
408     if "target" in scenario_cfg:
409         if is_ip_addr(scenario_cfg["target"]):
410             context_cfg['target'] = {}
411             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
412         else:
413             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
414             if _is_same_heat_context(scenario_cfg["host"],
415                                      scenario_cfg["target"]):
416                 context_cfg["target"]["ipaddr"] = \
417                     context_cfg["target"]["private_ip"]
418             else:
419                 context_cfg["target"]["ipaddr"] = \
420                     context_cfg["target"]["ip"]
421
422     if "targets" in scenario_cfg:
423         ip_list = []
424         for target in scenario_cfg["targets"]:
425             if is_ip_addr(target):
426                 ip_list.append(target)
427                 context_cfg['target'] = {}
428             else:
429                 context_cfg['target'] = Context.get_server(target)
430                 if _is_same_heat_context(scenario_cfg["host"], target):
431                     ip_list.append(context_cfg["target"]["private_ip"])
432                 else:
433                     ip_list.append(context_cfg["target"]["ip"])
434         context_cfg['target']['ipaddr'] = ','.join(ip_list)
435
436     if "nodes" in scenario_cfg:
437         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
438     runner = base_runner.Runner.get(runner_cfg)
439
440     print "Starting runner of type '%s'" % runner_cfg["type"]
441     runner.run(scenario_cfg, context_cfg)
442
443     return runner
444
445
446 def parse_nodes_with_context(scenario_cfg):
447     '''paras the 'nodes' fields in scenario '''
448     nodes = scenario_cfg["nodes"]
449
450     nodes_cfg = {}
451     for nodename in nodes:
452         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
453
454     return nodes_cfg
455
456
457 def runner_join(runner):
458     '''join (wait for) a runner, exit process at runner failure'''
459     status = runner.join()
460     base_runner.Runner.release(runner)
461     if status != 0:
462         sys.exit("Runner failed")
463
464
465 def print_invalid_header(source_name, args):
466     print(("Invalid %(source)s passed:\n\n %(args)s\n")
467           % {"source": source_name, "args": args})
468
469
470 def parse_task_args(src_name, args):
471     try:
472         kw = args and yaml.safe_load(args)
473         kw = {} if kw is None else kw
474     except yaml.parser.ParserError as e:
475         print_invalid_header(src_name, args)
476         print(("%(source)s has to be YAML. Details:\n\n%(err)s\n")
477               % {"source": src_name, "err": e})
478         raise TypeError()
479
480     if not isinstance(kw, dict):
481         print_invalid_header(src_name, args)
482         print(("%(src)s had to be dict, actually %(src_type)s\n")
483               % {"src": src_name, "src_type": type(kw)})
484         raise TypeError()
485     return kw