397ba00b087fc42178070093644c38e48f714609
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 import sys
13 import os
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 import uuid
20 import errno
21 from itertools import ifilter
22
23 from yardstick.benchmark.contexts.base import Context
24 from yardstick.benchmark.runners import base as base_runner
25 from yardstick.common.task_template import TaskTemplate
26 from yardstick.common.utils import source_env
27 from yardstick.common import constants
28
29 output_file_default = "/tmp/yardstick.out"
30 test_cases_dir_default = "tests/opnfv/test_cases/"
31 LOG = logging.getLogger(__name__)
32
33
34 class Task(object):     # pragma: no cover
35     '''Task commands.
36
37        Set of commands to manage benchmark tasks.
38     '''
39
40     def start(self, args, **kwargs):
41         '''Start a benchmark scenario.'''
42
43         atexit.register(atexit_handler)
44
45         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
46
47         check_environment()
48
49         total_start_time = time.time()
50         parser = TaskParser(args.inputfile[0])
51
52         if args.suite:
53             # 1.parse suite, return suite_params info
54             task_files, task_args, task_args_fnames = \
55                 parser.parse_suite()
56         else:
57             task_files = [parser.path]
58             task_args = [args.task_args]
59             task_args_fnames = [args.task_args_file]
60
61         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
62                  task_files, task_args, task_args_fnames)
63
64         if args.parse_only:
65             sys.exit(0)
66
67         if os.path.isfile(args.output_file):
68             os.remove(args.output_file)
69         # parse task_files
70         for i in range(0, len(task_files)):
71             one_task_start_time = time.time()
72             parser.path = task_files[i]
73             scenarios, run_in_parallel, meet_precondition = parser.parse_task(
74                  self.task_id, task_args[i], task_args_fnames[i])
75
76             if not meet_precondition:
77                 LOG.info("meet_precondition is %s, please check envrionment",
78                          meet_precondition)
79                 continue
80
81             self._run(scenarios, run_in_parallel, args.output_file)
82
83             if args.keep_deploy:
84                 # keep deployment, forget about stack
85                 # (hide it for exit handler)
86                 Context.list = []
87             else:
88                 for context in Context.list:
89                     context.undeploy()
90                 Context.list = []
91             one_task_end_time = time.time()
92             LOG.info("task %s finished in %d secs", task_files[i],
93                      one_task_end_time - one_task_start_time)
94
95         total_end_time = time.time()
96         LOG.info("total finished in %d secs",
97                  total_end_time - total_start_time)
98
99         print "Done, exiting"
100
101     def _run(self, scenarios, run_in_parallel, output_file):
102         '''Deploys context and calls runners'''
103         for context in Context.list:
104             context.deploy()
105
106         background_runners = []
107
108         # Start all background scenarios
109         for scenario in ifilter(_is_background_scenario, scenarios):
110             scenario["runner"] = dict(type="Duration", duration=1000000000)
111             runner = run_one_scenario(scenario, output_file)
112             background_runners.append(runner)
113
114         runners = []
115         if run_in_parallel:
116             for scenario in scenarios:
117                 if not _is_background_scenario(scenario):
118                     runner = run_one_scenario(scenario, output_file)
119                     runners.append(runner)
120
121             # Wait for runners to finish
122             for runner in runners:
123                 runner_join(runner)
124                 print "Runner ended, output in", output_file
125         else:
126             # run serially
127             for scenario in scenarios:
128                 if not _is_background_scenario(scenario):
129                     runner = run_one_scenario(scenario, output_file)
130                     runner_join(runner)
131                     print "Runner ended, output in", output_file
132
133         # Abort background runners
134         for runner in background_runners:
135             runner.abort()
136
137         # Wait for background runners to finish
138         for runner in background_runners:
139             if runner.join(timeout=60) is None:
140                 # Nuke if it did not stop nicely
141                 base_runner.Runner.terminate(runner)
142                 runner_join(runner)
143             else:
144                 base_runner.Runner.release(runner)
145             print "Background task ended"
146
147
148 # TODO: Move stuff below into TaskCommands class !?
149
150
151 class TaskParser(object):       # pragma: no cover
152     '''Parser for task config files in yaml format'''
153     def __init__(self, path):
154         self.path = path
155
156     def _meet_constraint(self, task, cur_pod, cur_installer):
157         if "constraint" in task:
158             constraint = task.get('constraint', None)
159             if constraint is not None:
160                 tc_fit_pod = constraint.get('pod', None)
161                 tc_fit_installer = constraint.get('installer', None)
162                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
163                          cur_pod, cur_installer, constraint)
164                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
165                     return False
166                 if cur_installer and tc_fit_installer and \
167                         cur_installer not in tc_fit_installer:
168                     return False
169         return True
170
171     def _get_task_para(self, task, cur_pod):
172         task_args = task.get('task_args', None)
173         if task_args is not None:
174             task_args = task_args.get(cur_pod, None)
175         task_args_fnames = task.get('task_args_fnames', None)
176         if task_args_fnames is not None:
177             task_args_fnames = task_args_fnames.get(cur_pod, None)
178         return task_args, task_args_fnames
179
180     def parse_suite(self):
181         '''parse the suite file and return a list of task config file paths
182            and lists of optional parameters if present'''
183         LOG.info("\nParsing suite file:%s", self.path)
184
185         try:
186             with open(self.path) as stream:
187                 cfg = yaml.load(stream)
188         except IOError as ioerror:
189             sys.exit(ioerror)
190
191         self._check_schema(cfg["schema"], "suite")
192         LOG.info("\nStarting scenario:%s", cfg["name"])
193
194         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
195         if test_cases_dir[-1] != os.sep:
196             test_cases_dir += os.sep
197
198         cur_pod = os.environ.get('NODE_NAME', None)
199         cur_installer = os.environ.get('INSTALLER_TYPE', None)
200
201         valid_task_files = []
202         valid_task_args = []
203         valid_task_args_fnames = []
204
205         for task in cfg["test_cases"]:
206             # 1.check file_name
207             if "file_name" in task:
208                 task_fname = task.get('file_name', None)
209                 if task_fname is None:
210                     continue
211             else:
212                 continue
213             # 2.check constraint
214             if self._meet_constraint(task, cur_pod, cur_installer):
215                 valid_task_files.append(test_cases_dir + task_fname)
216             else:
217                 continue
218             # 3.fetch task parameters
219             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
220             valid_task_args.append(task_args)
221             valid_task_args_fnames.append(task_args_fnames)
222
223         return valid_task_files, valid_task_args, valid_task_args_fnames
224
225     def parse_task(self, task_id, task_args=None, task_args_file=None):
226         '''parses the task file and return an context and scenario instances'''
227         print "Parsing task config:", self.path
228
229         try:
230             kw = {}
231             if task_args_file:
232                 with open(task_args_file) as f:
233                     kw.update(parse_task_args("task_args_file", f.read()))
234             kw.update(parse_task_args("task_args", task_args))
235         except TypeError:
236             raise TypeError()
237
238         try:
239             with open(self.path) as f:
240                 try:
241                     input_task = f.read()
242                     rendered_task = TaskTemplate.render(input_task, **kw)
243                 except Exception as e:
244                     print(("Failed to render template:\n%(task)s\n%(err)s\n")
245                           % {"task": input_task, "err": e})
246                     raise e
247                 print(("Input task is:\n%s\n") % rendered_task)
248
249                 cfg = yaml.load(rendered_task)
250         except IOError as ioerror:
251             sys.exit(ioerror)
252
253         self._check_schema(cfg["schema"], "task")
254         meet_precondition = self._check_precondition(cfg)
255
256         # TODO: support one or many contexts? Many would simpler and precise
257         # TODO: support hybrid context type
258         if "context" in cfg:
259             context_cfgs = [cfg["context"]]
260         elif "contexts" in cfg:
261             context_cfgs = cfg["contexts"]
262         else:
263             context_cfgs = [{"type": "Dummy"}]
264
265         for cfg_attrs in context_cfgs:
266             context_type = cfg_attrs.get("type", "Heat")
267             if "Heat" == context_type and "networks" in cfg_attrs:
268                 # bugfix: if there are more than one network,
269                 # only add "external_network" on first one.
270                 # the name of netwrok should follow this rule:
271                 # test, test2, test3 ...
272                 # sort network with the length of network's name
273                 sorted_networks = sorted(cfg_attrs["networks"].keys())
274                 # config external_network based on env var
275                 cfg_attrs["networks"][sorted_networks[0]]["external_network"] \
276                     = os.environ.get("EXTERNAL_NETWORK", "net04_ext")
277
278             context = Context.get(context_type)
279             context.init(cfg_attrs)
280
281         run_in_parallel = cfg.get("run_in_parallel", False)
282
283         # add tc and task id for influxdb extended tags
284         for scenario in cfg["scenarios"]:
285             task_name = os.path.splitext(os.path.basename(self.path))[0]
286             scenario["tc"] = task_name
287             scenario["task_id"] = task_id
288
289         # TODO we need something better here, a class that represent the file
290         return cfg["scenarios"], run_in_parallel, meet_precondition
291
292     def _check_schema(self, cfg_schema, schema_type):
293         '''Check if config file is using the correct schema type'''
294
295         if cfg_schema != "yardstick:" + schema_type + ":0.1":
296             sys.exit("error: file %s has unknown schema %s" % (self.path,
297                                                                cfg_schema))
298
299     def _check_precondition(self, cfg):
300         '''Check if the envrionment meet the preconditon'''
301
302         if "precondition" in cfg:
303             precondition = cfg["precondition"]
304             installer_type = precondition.get("installer_type", None)
305             deploy_scenarios = precondition.get("deploy_scenarios", None)
306             tc_fit_pods = precondition.get("pod_name", None)
307             installer_type_env = os.environ.get('INSTALL_TYPE', None)
308             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
309             pod_name_env = os.environ.get('NODE_NAME', None)
310
311             LOG.info("installer_type: %s, installer_type_env: %s",
312                      installer_type, installer_type_env)
313             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
314                      deploy_scenarios, deploy_scenario_env)
315             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
316                      tc_fit_pods, pod_name_env)
317             if installer_type and installer_type_env:
318                 if installer_type_env not in installer_type:
319                     return False
320             if deploy_scenarios and deploy_scenario_env:
321                 deploy_scenarios_list = deploy_scenarios.split(',')
322                 for deploy_scenario in deploy_scenarios_list:
323                     if deploy_scenario_env.startswith(deploy_scenario):
324                         return True
325                 return False
326             if tc_fit_pods and pod_name_env:
327                 if pod_name_env not in tc_fit_pods:
328                     return False
329         return True
330
331
332 def atexit_handler():
333     '''handler for process termination'''
334     base_runner.Runner.terminate_all()
335
336     if len(Context.list) > 0:
337         print "Undeploying all contexts"
338         for context in Context.list:
339             context.undeploy()
340
341
342 def is_ip_addr(addr):
343     '''check if string addr is an IP address'''
344     try:
345         ipaddress.ip_address(unicode(addr))
346         return True
347     except ValueError:
348         return False
349
350
351 def _is_same_heat_context(host_attr, target_attr):
352     '''check if two servers are in the same heat context
353     host_attr: either a name for a server created by yardstick or a dict
354     with attribute name mapping when using external heat templates
355     target_attr: either a name for a server created by yardstick or a dict
356     with attribute name mapping when using external heat templates
357     '''
358     host = None
359     target = None
360     for context in Context.list:
361         if context.__context_type__ != "Heat":
362             continue
363
364         host = context._get_server(host_attr)
365         if host is None:
366             continue
367
368         target = context._get_server(target_attr)
369         if target is None:
370             return False
371
372         # Both host and target is not None, then they are in the
373         # same heat context.
374         return True
375
376     return False
377
378
379 def _is_background_scenario(scenario):
380     if "run_in_background" in scenario:
381         return scenario["run_in_background"]
382     else:
383         return False
384
385
386 def run_one_scenario(scenario_cfg, output_file):
387     '''run one scenario using context'''
388     runner_cfg = scenario_cfg["runner"]
389     runner_cfg['output_filename'] = output_file
390
391     # TODO support get multi hosts/vms info
392     context_cfg = {}
393     if "host" in scenario_cfg:
394         context_cfg['host'] = Context.get_server(scenario_cfg["host"])
395
396     if "target" in scenario_cfg:
397         if is_ip_addr(scenario_cfg["target"]):
398             context_cfg['target'] = {}
399             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
400         else:
401             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
402             if _is_same_heat_context(scenario_cfg["host"],
403                                      scenario_cfg["target"]):
404                 context_cfg["target"]["ipaddr"] = \
405                     context_cfg["target"]["private_ip"]
406             else:
407                 context_cfg["target"]["ipaddr"] = \
408                     context_cfg["target"]["ip"]
409
410     if "targets" in scenario_cfg:
411         ip_list = []
412         for target in scenario_cfg["targets"]:
413             if is_ip_addr(target):
414                 ip_list.append(target)
415                 context_cfg['target'] = {}
416             else:
417                 context_cfg['target'] = Context.get_server(target)
418                 if _is_same_heat_context(scenario_cfg["host"], target):
419                     ip_list.append(context_cfg["target"]["private_ip"])
420                 else:
421                     ip_list.append(context_cfg["target"]["ip"])
422         context_cfg['target']['ipaddr'] = ','.join(ip_list)
423
424     if "nodes" in scenario_cfg:
425         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
426     runner = base_runner.Runner.get(runner_cfg)
427
428     print "Starting runner of type '%s'" % runner_cfg["type"]
429     runner.run(scenario_cfg, context_cfg)
430
431     return runner
432
433
434 def parse_nodes_with_context(scenario_cfg):
435     '''paras the 'nodes' fields in scenario '''
436     nodes = scenario_cfg["nodes"]
437
438     nodes_cfg = {}
439     for nodename in nodes:
440         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
441
442     return nodes_cfg
443
444
445 def runner_join(runner):
446     '''join (wait for) a runner, exit process at runner failure'''
447     status = runner.join()
448     base_runner.Runner.release(runner)
449     if status != 0:
450         sys.exit("Runner failed")
451
452
453 def print_invalid_header(source_name, args):
454     print(("Invalid %(source)s passed:\n\n %(args)s\n")
455           % {"source": source_name, "args": args})
456
457
458 def parse_task_args(src_name, args):
459     try:
460         kw = args and yaml.safe_load(args)
461         kw = {} if kw is None else kw
462     except yaml.parser.ParserError as e:
463         print_invalid_header(src_name, args)
464         print(("%(source)s has to be YAML. Details:\n\n%(err)s\n")
465               % {"source": src_name, "err": e})
466         raise TypeError()
467
468     if not isinstance(kw, dict):
469         print_invalid_header(src_name, args)
470         print(("%(src)s had to be dict, actually %(src_type)s\n")
471               % {"src": src_name, "src_type": type(kw)})
472         raise TypeError()
473     return kw
474
475
476 def check_environment():
477     auth_url = os.environ.get('OS_AUTH_URL', None)
478     if not auth_url:
479         try:
480             source_env(constants.OPENSTACK_RC_FILE)
481         except IOError as e:
482             if e.errno != errno.EEXIST:
483                 raise
484             LOG.debug('OPENRC file not found')