Yardstick framework concurrent support
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 import sys
13 import os
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 import uuid
20 import errno
21 from itertools import ifilter
22
23 from yardstick.benchmark.contexts.base import Context
24 from yardstick.benchmark.runners import base as base_runner
25 from yardstick.common.task_template import TaskTemplate
26 from yardstick.common.utils import source_env
27 from yardstick.common import constants
28
29 output_file_default = "/tmp/yardstick.out"
30 test_cases_dir_default = "tests/opnfv/test_cases/"
31 LOG = logging.getLogger(__name__)
32
33
34 class Task(object):     # pragma: no cover
35     '''Task commands.
36
37        Set of commands to manage benchmark tasks.
38     '''
39
40     def start(self, args, **kwargs):
41         '''Start a benchmark scenario.'''
42
43         atexit.register(atexit_handler)
44
45         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
46
47         check_environment()
48
49         total_start_time = time.time()
50         parser = TaskParser(args.inputfile[0])
51
52         if args.suite:
53             # 1.parse suite, return suite_params info
54             task_files, task_args, task_args_fnames = \
55                 parser.parse_suite()
56         else:
57             task_files = [parser.path]
58             task_args = [args.task_args]
59             task_args_fnames = [args.task_args_file]
60
61         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
62                  task_files, task_args, task_args_fnames)
63
64         if args.parse_only:
65             sys.exit(0)
66
67         if os.path.isfile(args.output_file):
68             os.remove(args.output_file)
69         # parse task_files
70         for i in range(0, len(task_files)):
71             one_task_start_time = time.time()
72             parser.path = task_files[i]
73             scenarios, run_in_parallel, meet_precondition = parser.parse_task(
74                  self.task_id, task_args[i], task_args_fnames[i])
75
76             if not meet_precondition:
77                 LOG.info("meet_precondition is %s, please check envrionment",
78                          meet_precondition)
79                 continue
80
81             self._run(scenarios, run_in_parallel, args.output_file)
82
83             if args.keep_deploy:
84                 # keep deployment, forget about stack
85                 # (hide it for exit handler)
86                 Context.list = []
87             else:
88                 for context in Context.list:
89                     context.undeploy()
90                 Context.list = []
91             one_task_end_time = time.time()
92             LOG.info("task %s finished in %d secs", task_files[i],
93                      one_task_end_time - one_task_start_time)
94
95         total_end_time = time.time()
96         LOG.info("total finished in %d secs",
97                  total_end_time - total_start_time)
98
99         print "Done, exiting"
100
101     def _run(self, scenarios, run_in_parallel, output_file):
102         '''Deploys context and calls runners'''
103         for context in Context.list:
104             context.deploy()
105
106         background_runners = []
107
108         # Start all background scenarios
109         for scenario in ifilter(_is_background_scenario, scenarios):
110             scenario["runner"] = dict(type="Duration", duration=1000000000)
111             runner = run_one_scenario(scenario, output_file)
112             background_runners.append(runner)
113
114         runners = []
115         if run_in_parallel:
116             for scenario in scenarios:
117                 if not _is_background_scenario(scenario):
118                     runner = run_one_scenario(scenario, output_file)
119                     runners.append(runner)
120
121             # Wait for runners to finish
122             for runner in runners:
123                 runner_join(runner)
124                 print "Runner ended, output in", output_file
125         else:
126             # run serially
127             for scenario in scenarios:
128                 if not _is_background_scenario(scenario):
129                     runner = run_one_scenario(scenario, output_file)
130                     runner_join(runner)
131                     print "Runner ended, output in", output_file
132
133         # Abort background runners
134         for runner in background_runners:
135             runner.abort()
136
137         # Wait for background runners to finish
138         for runner in background_runners:
139             if runner.join(timeout=60) is None:
140                 # Nuke if it did not stop nicely
141                 base_runner.Runner.terminate(runner)
142                 runner_join(runner)
143             else:
144                 base_runner.Runner.release(runner)
145             print "Background task ended"
146
147
148 # TODO: Move stuff below into TaskCommands class !?
149
150
151 class TaskParser(object):       # pragma: no cover
152     '''Parser for task config files in yaml format'''
153     def __init__(self, path):
154         self.path = path
155
156     def _meet_constraint(self, task, cur_pod, cur_installer):
157         if "constraint" in task:
158             constraint = task.get('constraint', None)
159             if constraint is not None:
160                 tc_fit_pod = constraint.get('pod', None)
161                 tc_fit_installer = constraint.get('installer', None)
162                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
163                          cur_pod, cur_installer, constraint)
164                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
165                     return False
166                 if cur_installer and tc_fit_installer and \
167                         cur_installer not in tc_fit_installer:
168                     return False
169         return True
170
171     def _get_task_para(self, task, cur_pod):
172         task_args = task.get('task_args', None)
173         if task_args is not None:
174             task_args = task_args.get(cur_pod, None)
175         task_args_fnames = task.get('task_args_fnames', None)
176         if task_args_fnames is not None:
177             task_args_fnames = task_args_fnames.get(cur_pod, None)
178         return task_args, task_args_fnames
179
180     def parse_suite(self):
181         '''parse the suite file and return a list of task config file paths
182            and lists of optional parameters if present'''
183         LOG.info("\nParsing suite file:%s", self.path)
184
185         try:
186             with open(self.path) as stream:
187                 cfg = yaml.load(stream)
188         except IOError as ioerror:
189             sys.exit(ioerror)
190
191         self._check_schema(cfg["schema"], "suite")
192         LOG.info("\nStarting scenario:%s", cfg["name"])
193
194         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
195         if test_cases_dir[-1] != os.sep:
196             test_cases_dir += os.sep
197
198         cur_pod = os.environ.get('NODE_NAME', None)
199         cur_installer = os.environ.get('INSTALLER_TYPE', None)
200
201         valid_task_files = []
202         valid_task_args = []
203         valid_task_args_fnames = []
204
205         for task in cfg["test_cases"]:
206             # 1.check file_name
207             if "file_name" in task:
208                 task_fname = task.get('file_name', None)
209                 if task_fname is None:
210                     continue
211             else:
212                 continue
213             # 2.check constraint
214             if self._meet_constraint(task, cur_pod, cur_installer):
215                 valid_task_files.append(test_cases_dir + task_fname)
216             else:
217                 continue
218             # 3.fetch task parameters
219             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
220             valid_task_args.append(task_args)
221             valid_task_args_fnames.append(task_args_fnames)
222
223         return valid_task_files, valid_task_args, valid_task_args_fnames
224
225     def parse_task(self, task_id, task_args=None, task_args_file=None):
226         '''parses the task file and return an context and scenario instances'''
227         print "Parsing task config:", self.path
228
229         try:
230             kw = {}
231             if task_args_file:
232                 with open(task_args_file) as f:
233                     kw.update(parse_task_args("task_args_file", f.read()))
234             kw.update(parse_task_args("task_args", task_args))
235         except TypeError:
236             raise TypeError()
237
238         try:
239             with open(self.path) as f:
240                 try:
241                     input_task = f.read()
242                     rendered_task = TaskTemplate.render(input_task, **kw)
243                 except Exception as e:
244                     print(("Failed to render template:\n%(task)s\n%(err)s\n")
245                           % {"task": input_task, "err": e})
246                     raise e
247                 print(("Input task is:\n%s\n") % rendered_task)
248
249                 cfg = yaml.load(rendered_task)
250         except IOError as ioerror:
251             sys.exit(ioerror)
252
253         self._check_schema(cfg["schema"], "task")
254         meet_precondition = self._check_precondition(cfg)
255
256         # TODO: support one or many contexts? Many would simpler and precise
257         # TODO: support hybrid context type
258         if "context" in cfg:
259             context_cfgs = [cfg["context"]]
260         elif "contexts" in cfg:
261             context_cfgs = cfg["contexts"]
262         else:
263             context_cfgs = [{"type": "Dummy"}]
264
265         name_suffix = '-{}'.format(task_id[:8])
266         for cfg_attrs in context_cfgs:
267             cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'], name_suffix)
268             context_type = cfg_attrs.get("type", "Heat")
269             if "Heat" == context_type and "networks" in cfg_attrs:
270                 # bugfix: if there are more than one network,
271                 # only add "external_network" on first one.
272                 # the name of netwrok should follow this rule:
273                 # test, test2, test3 ...
274                 # sort network with the length of network's name
275                 sorted_networks = sorted(cfg_attrs["networks"])
276                 # config external_network based on env var
277                 cfg_attrs["networks"][sorted_networks[0]]["external_network"] \
278                     = os.environ.get("EXTERNAL_NETWORK", "net04_ext")
279
280             context = Context.get(context_type)
281             context.init(cfg_attrs)
282
283         run_in_parallel = cfg.get("run_in_parallel", False)
284
285         # add tc and task id for influxdb extended tags
286         for scenario in cfg["scenarios"]:
287             task_name = os.path.splitext(os.path.basename(self.path))[0]
288             scenario["tc"] = task_name
289             scenario["task_id"] = task_id
290
291             change_server_name(scenario, name_suffix)
292
293             try:
294                 change_server_name(scenario['nodes'], name_suffix)
295             except KeyError:
296                 pass
297
298         # TODO we need something better here, a class that represent the file
299         return cfg["scenarios"], run_in_parallel, meet_precondition
300
301     def _check_schema(self, cfg_schema, schema_type):
302         '''Check if config file is using the correct schema type'''
303
304         if cfg_schema != "yardstick:" + schema_type + ":0.1":
305             sys.exit("error: file %s has unknown schema %s" % (self.path,
306                                                                cfg_schema))
307
308     def _check_precondition(self, cfg):
309         '''Check if the envrionment meet the preconditon'''
310
311         if "precondition" in cfg:
312             precondition = cfg["precondition"]
313             installer_type = precondition.get("installer_type", None)
314             deploy_scenarios = precondition.get("deploy_scenarios", None)
315             tc_fit_pods = precondition.get("pod_name", None)
316             installer_type_env = os.environ.get('INSTALL_TYPE', None)
317             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
318             pod_name_env = os.environ.get('NODE_NAME', None)
319
320             LOG.info("installer_type: %s, installer_type_env: %s",
321                      installer_type, installer_type_env)
322             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
323                      deploy_scenarios, deploy_scenario_env)
324             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
325                      tc_fit_pods, pod_name_env)
326             if installer_type and installer_type_env:
327                 if installer_type_env not in installer_type:
328                     return False
329             if deploy_scenarios and deploy_scenario_env:
330                 deploy_scenarios_list = deploy_scenarios.split(',')
331                 for deploy_scenario in deploy_scenarios_list:
332                     if deploy_scenario_env.startswith(deploy_scenario):
333                         return True
334                 return False
335             if tc_fit_pods and pod_name_env:
336                 if pod_name_env not in tc_fit_pods:
337                     return False
338         return True
339
340
341 def atexit_handler():
342     '''handler for process termination'''
343     base_runner.Runner.terminate_all()
344
345     if len(Context.list) > 0:
346         print "Undeploying all contexts"
347         for context in Context.list:
348             context.undeploy()
349
350
351 def is_ip_addr(addr):
352     '''check if string addr is an IP address'''
353     try:
354         ipaddress.ip_address(unicode(addr))
355         return True
356     except ValueError:
357         return False
358
359
360 def _is_same_heat_context(host_attr, target_attr):
361     '''check if two servers are in the same heat context
362     host_attr: either a name for a server created by yardstick or a dict
363     with attribute name mapping when using external heat templates
364     target_attr: either a name for a server created by yardstick or a dict
365     with attribute name mapping when using external heat templates
366     '''
367     host = None
368     target = None
369     for context in Context.list:
370         if context.__context_type__ != "Heat":
371             continue
372
373         host = context._get_server(host_attr)
374         if host is None:
375             continue
376
377         target = context._get_server(target_attr)
378         if target is None:
379             return False
380
381         # Both host and target is not None, then they are in the
382         # same heat context.
383         return True
384
385     return False
386
387
388 def _is_background_scenario(scenario):
389     if "run_in_background" in scenario:
390         return scenario["run_in_background"]
391     else:
392         return False
393
394
395 def run_one_scenario(scenario_cfg, output_file):
396     '''run one scenario using context'''
397     runner_cfg = scenario_cfg["runner"]
398     runner_cfg['output_filename'] = output_file
399
400     # TODO support get multi hosts/vms info
401     context_cfg = {}
402     if "host" in scenario_cfg:
403         context_cfg['host'] = Context.get_server(scenario_cfg["host"])
404
405     if "target" in scenario_cfg:
406         if is_ip_addr(scenario_cfg["target"]):
407             context_cfg['target'] = {}
408             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
409         else:
410             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
411             if _is_same_heat_context(scenario_cfg["host"],
412                                      scenario_cfg["target"]):
413                 context_cfg["target"]["ipaddr"] = \
414                     context_cfg["target"]["private_ip"]
415             else:
416                 context_cfg["target"]["ipaddr"] = \
417                     context_cfg["target"]["ip"]
418
419     if "targets" in scenario_cfg:
420         ip_list = []
421         for target in scenario_cfg["targets"]:
422             if is_ip_addr(target):
423                 ip_list.append(target)
424                 context_cfg['target'] = {}
425             else:
426                 context_cfg['target'] = Context.get_server(target)
427                 if _is_same_heat_context(scenario_cfg["host"], target):
428                     ip_list.append(context_cfg["target"]["private_ip"])
429                 else:
430                     ip_list.append(context_cfg["target"]["ip"])
431         context_cfg['target']['ipaddr'] = ','.join(ip_list)
432
433     if "nodes" in scenario_cfg:
434         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
435     runner = base_runner.Runner.get(runner_cfg)
436
437     print "Starting runner of type '%s'" % runner_cfg["type"]
438     runner.run(scenario_cfg, context_cfg)
439
440     return runner
441
442
443 def parse_nodes_with_context(scenario_cfg):
444     '''paras the 'nodes' fields in scenario '''
445     nodes = scenario_cfg["nodes"]
446
447     nodes_cfg = {}
448     for nodename in nodes:
449         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
450
451     return nodes_cfg
452
453
454 def runner_join(runner):
455     '''join (wait for) a runner, exit process at runner failure'''
456     status = runner.join()
457     base_runner.Runner.release(runner)
458     if status != 0:
459         sys.exit("Runner failed")
460
461
462 def print_invalid_header(source_name, args):
463     print(("Invalid %(source)s passed:\n\n %(args)s\n")
464           % {"source": source_name, "args": args})
465
466
467 def parse_task_args(src_name, args):
468     try:
469         kw = args and yaml.safe_load(args)
470         kw = {} if kw is None else kw
471     except yaml.parser.ParserError as e:
472         print_invalid_header(src_name, args)
473         print(("%(source)s has to be YAML. Details:\n\n%(err)s\n")
474               % {"source": src_name, "err": e})
475         raise TypeError()
476
477     if not isinstance(kw, dict):
478         print_invalid_header(src_name, args)
479         print(("%(src)s had to be dict, actually %(src_type)s\n")
480               % {"src": src_name, "src_type": type(kw)})
481         raise TypeError()
482     return kw
483
484
485 def check_environment():
486     auth_url = os.environ.get('OS_AUTH_URL', None)
487     if not auth_url:
488         try:
489             source_env(constants.OPENSTACK_RC_FILE)
490         except IOError as e:
491             if e.errno != errno.EEXIST:
492                 raise
493             LOG.debug('OPENRC file not found')
494
495
496 def change_server_name(scenario, suffix):
497     try:
498         scenario['host'] += suffix
499     except KeyError:
500         pass
501
502     try:
503         scenario['target'] += suffix
504     except KeyError:
505         pass
506
507     try:
508         key = 'targets'
509         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
510     except KeyError:
511         pass