Add support for Python 3
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 from six.moves import filter
24
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.task_template import TaskTemplate
28 from yardstick.common.utils import source_env
29 from yardstick.common import constants
30
31 output_file_default = "/tmp/yardstick.out"
32 test_cases_dir_default = "tests/opnfv/test_cases/"
33 LOG = logging.getLogger(__name__)
34
35
36 class Task(object):     # pragma: no cover
37     '''Task commands.
38
39        Set of commands to manage benchmark tasks.
40     '''
41
42     def start(self, args, **kwargs):
43         '''Start a benchmark scenario.'''
44
45         atexit.register(atexit_handler)
46
47         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
48
49         check_environment()
50
51         total_start_time = time.time()
52         parser = TaskParser(args.inputfile[0])
53
54         if args.suite:
55             # 1.parse suite, return suite_params info
56             task_files, task_args, task_args_fnames = \
57                 parser.parse_suite()
58         else:
59             task_files = [parser.path]
60             task_args = [args.task_args]
61             task_args_fnames = [args.task_args_file]
62
63         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
64                  task_files, task_args, task_args_fnames)
65
66         if args.parse_only:
67             sys.exit(0)
68
69         if os.path.isfile(args.output_file):
70             os.remove(args.output_file)
71         # parse task_files
72         for i in range(0, len(task_files)):
73             one_task_start_time = time.time()
74             parser.path = task_files[i]
75             scenarios, run_in_parallel, meet_precondition = parser.parse_task(
76                 self.task_id, task_args[i], task_args_fnames[i])
77
78             if not meet_precondition:
79                 LOG.info("meet_precondition is %s, please check envrionment",
80                          meet_precondition)
81                 continue
82
83             self._run(scenarios, run_in_parallel, args.output_file)
84
85             if args.keep_deploy:
86                 # keep deployment, forget about stack
87                 # (hide it for exit handler)
88                 Context.list = []
89             else:
90                 for context in Context.list:
91                     context.undeploy()
92                 Context.list = []
93             one_task_end_time = time.time()
94             LOG.info("task %s finished in %d secs", task_files[i],
95                      one_task_end_time - one_task_start_time)
96
97         total_end_time = time.time()
98         LOG.info("total finished in %d secs",
99                  total_end_time - total_start_time)
100
101         print("Done, exiting")
102
103     def _run(self, scenarios, run_in_parallel, output_file):
104         '''Deploys context and calls runners'''
105         for context in Context.list:
106             context.deploy()
107
108         background_runners = []
109
110         # Start all background scenarios
111         for scenario in filter(_is_background_scenario, scenarios):
112             scenario["runner"] = dict(type="Duration", duration=1000000000)
113             runner = run_one_scenario(scenario, output_file)
114             background_runners.append(runner)
115
116         runners = []
117         if run_in_parallel:
118             for scenario in scenarios:
119                 if not _is_background_scenario(scenario):
120                     runner = run_one_scenario(scenario, output_file)
121                     runners.append(runner)
122
123             # Wait for runners to finish
124             for runner in runners:
125                 runner_join(runner)
126                 print("Runner ended, output in", output_file)
127         else:
128             # run serially
129             for scenario in scenarios:
130                 if not _is_background_scenario(scenario):
131                     runner = run_one_scenario(scenario, output_file)
132                     runner_join(runner)
133                     print("Runner ended, output in", output_file)
134
135         # Abort background runners
136         for runner in background_runners:
137             runner.abort()
138
139         # Wait for background runners to finish
140         for runner in background_runners:
141             if runner.join(timeout=60) is None:
142                 # Nuke if it did not stop nicely
143                 base_runner.Runner.terminate(runner)
144                 runner_join(runner)
145             else:
146                 base_runner.Runner.release(runner)
147             print("Background task ended")
148
149
150 # TODO: Move stuff below into TaskCommands class !?
151
152
153 class TaskParser(object):       # pragma: no cover
154     '''Parser for task config files in yaml format'''
155
156     def __init__(self, path):
157         self.path = path
158
159     def _meet_constraint(self, task, cur_pod, cur_installer):
160         if "constraint" in task:
161             constraint = task.get('constraint', None)
162             if constraint is not None:
163                 tc_fit_pod = constraint.get('pod', None)
164                 tc_fit_installer = constraint.get('installer', None)
165                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
166                          cur_pod, cur_installer, constraint)
167                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
168                     return False
169                 if cur_installer and tc_fit_installer and \
170                         cur_installer not in tc_fit_installer:
171                     return False
172         return True
173
174     def _get_task_para(self, task, cur_pod):
175         task_args = task.get('task_args', None)
176         if task_args is not None:
177             task_args = task_args.get(cur_pod, None)
178         task_args_fnames = task.get('task_args_fnames', None)
179         if task_args_fnames is not None:
180             task_args_fnames = task_args_fnames.get(cur_pod, None)
181         return task_args, task_args_fnames
182
183     def parse_suite(self):
184         '''parse the suite file and return a list of task config file paths
185            and lists of optional parameters if present'''
186         LOG.info("\nParsing suite file:%s", self.path)
187
188         try:
189             with open(self.path) as stream:
190                 cfg = yaml.load(stream)
191         except IOError as ioerror:
192             sys.exit(ioerror)
193
194         self._check_schema(cfg["schema"], "suite")
195         LOG.info("\nStarting scenario:%s", cfg["name"])
196
197         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
198         if test_cases_dir[-1] != os.sep:
199             test_cases_dir += os.sep
200
201         cur_pod = os.environ.get('NODE_NAME', None)
202         cur_installer = os.environ.get('INSTALLER_TYPE', None)
203
204         valid_task_files = []
205         valid_task_args = []
206         valid_task_args_fnames = []
207
208         for task in cfg["test_cases"]:
209             # 1.check file_name
210             if "file_name" in task:
211                 task_fname = task.get('file_name', None)
212                 if task_fname is None:
213                     continue
214             else:
215                 continue
216             # 2.check constraint
217             if self._meet_constraint(task, cur_pod, cur_installer):
218                 valid_task_files.append(test_cases_dir + task_fname)
219             else:
220                 continue
221             # 3.fetch task parameters
222             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
223             valid_task_args.append(task_args)
224             valid_task_args_fnames.append(task_args_fnames)
225
226         return valid_task_files, valid_task_args, valid_task_args_fnames
227
228     def parse_task(self, task_id, task_args=None, task_args_file=None):
229         '''parses the task file and return an context and scenario instances'''
230         print("Parsing task config:", self.path)
231
232         try:
233             kw = {}
234             if task_args_file:
235                 with open(task_args_file) as f:
236                     kw.update(parse_task_args("task_args_file", f.read()))
237             kw.update(parse_task_args("task_args", task_args))
238         except TypeError:
239             raise TypeError()
240
241         try:
242             with open(self.path) as f:
243                 try:
244                     input_task = f.read()
245                     rendered_task = TaskTemplate.render(input_task, **kw)
246                 except Exception as e:
247                     print("Failed to render template:\n%(task)s\n%(err)s\n"
248                           % {"task": input_task, "err": e})
249                     raise e
250                 print("Input task is:\n%s\n" % rendered_task)
251
252                 cfg = yaml.load(rendered_task)
253         except IOError as ioerror:
254             sys.exit(ioerror)
255
256         self._check_schema(cfg["schema"], "task")
257         meet_precondition = self._check_precondition(cfg)
258
259         # TODO: support one or many contexts? Many would simpler and precise
260         # TODO: support hybrid context type
261         if "context" in cfg:
262             context_cfgs = [cfg["context"]]
263         elif "contexts" in cfg:
264             context_cfgs = cfg["contexts"]
265         else:
266             context_cfgs = [{"type": "Dummy"}]
267
268         name_suffix = '-{}'.format(task_id[:8])
269         for cfg_attrs in context_cfgs:
270             cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'], name_suffix)
271             context_type = cfg_attrs.get("type", "Heat")
272             if "Heat" == context_type and "networks" in cfg_attrs:
273                 # bugfix: if there are more than one network,
274                 # only add "external_network" on first one.
275                 # the name of netwrok should follow this rule:
276                 # test, test2, test3 ...
277                 # sort network with the length of network's name
278                 sorted_networks = sorted(cfg_attrs["networks"])
279                 # config external_network based on env var
280                 cfg_attrs["networks"][sorted_networks[0]]["external_network"] \
281                     = os.environ.get("EXTERNAL_NETWORK", "net04_ext")
282
283             context = Context.get(context_type)
284             context.init(cfg_attrs)
285
286         run_in_parallel = cfg.get("run_in_parallel", False)
287
288         # add tc and task id for influxdb extended tags
289         for scenario in cfg["scenarios"]:
290             task_name = os.path.splitext(os.path.basename(self.path))[0]
291             scenario["tc"] = task_name
292             scenario["task_id"] = task_id
293
294             change_server_name(scenario, name_suffix)
295
296             try:
297                 change_server_name(scenario['nodes'], name_suffix)
298             except KeyError:
299                 pass
300
301         # TODO we need something better here, a class that represent the file
302         return cfg["scenarios"], run_in_parallel, meet_precondition
303
304     def _check_schema(self, cfg_schema, schema_type):
305         '''Check if config file is using the correct schema type'''
306
307         if cfg_schema != "yardstick:" + schema_type + ":0.1":
308             sys.exit("error: file %s has unknown schema %s" % (self.path,
309                                                                cfg_schema))
310
311     def _check_precondition(self, cfg):
312         '''Check if the envrionment meet the preconditon'''
313
314         if "precondition" in cfg:
315             precondition = cfg["precondition"]
316             installer_type = precondition.get("installer_type", None)
317             deploy_scenarios = precondition.get("deploy_scenarios", None)
318             tc_fit_pods = precondition.get("pod_name", None)
319             installer_type_env = os.environ.get('INSTALL_TYPE', None)
320             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
321             pod_name_env = os.environ.get('NODE_NAME', None)
322
323             LOG.info("installer_type: %s, installer_type_env: %s",
324                      installer_type, installer_type_env)
325             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
326                      deploy_scenarios, deploy_scenario_env)
327             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
328                      tc_fit_pods, pod_name_env)
329             if installer_type and installer_type_env:
330                 if installer_type_env not in installer_type:
331                     return False
332             if deploy_scenarios and deploy_scenario_env:
333                 deploy_scenarios_list = deploy_scenarios.split(',')
334                 for deploy_scenario in deploy_scenarios_list:
335                     if deploy_scenario_env.startswith(deploy_scenario):
336                         return True
337                 return False
338             if tc_fit_pods and pod_name_env:
339                 if pod_name_env not in tc_fit_pods:
340                     return False
341         return True
342
343
344 def atexit_handler():
345     '''handler for process termination'''
346     base_runner.Runner.terminate_all()
347
348     if len(Context.list) > 0:
349         print("Undeploying all contexts")
350         for context in Context.list:
351             context.undeploy()
352
353
354 def is_ip_addr(addr):
355     '''check if string addr is an IP address'''
356     try:
357         ipaddress.ip_address(addr.encode('utf-8'))
358         return True
359     except ValueError:
360         return False
361
362
363 def _is_same_heat_context(host_attr, target_attr):
364     '''check if two servers are in the same heat context
365     host_attr: either a name for a server created by yardstick or a dict
366     with attribute name mapping when using external heat templates
367     target_attr: either a name for a server created by yardstick or a dict
368     with attribute name mapping when using external heat templates
369     '''
370     host = None
371     target = None
372     for context in Context.list:
373         if context.__context_type__ != "Heat":
374             continue
375
376         host = context._get_server(host_attr)
377         if host is None:
378             continue
379
380         target = context._get_server(target_attr)
381         if target is None:
382             return False
383
384         # Both host and target is not None, then they are in the
385         # same heat context.
386         return True
387
388     return False
389
390
391 def _is_background_scenario(scenario):
392     if "run_in_background" in scenario:
393         return scenario["run_in_background"]
394     else:
395         return False
396
397
398 def run_one_scenario(scenario_cfg, output_file):
399     '''run one scenario using context'''
400     runner_cfg = scenario_cfg["runner"]
401     runner_cfg['output_filename'] = output_file
402
403     # TODO support get multi hosts/vms info
404     context_cfg = {}
405     if "host" in scenario_cfg:
406         context_cfg['host'] = Context.get_server(scenario_cfg["host"])
407
408     if "target" in scenario_cfg:
409         if is_ip_addr(scenario_cfg["target"]):
410             context_cfg['target'] = {}
411             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
412         else:
413             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
414             if _is_same_heat_context(scenario_cfg["host"],
415                                      scenario_cfg["target"]):
416                 context_cfg["target"]["ipaddr"] = \
417                     context_cfg["target"]["private_ip"]
418             else:
419                 context_cfg["target"]["ipaddr"] = \
420                     context_cfg["target"]["ip"]
421
422     if "targets" in scenario_cfg:
423         ip_list = []
424         for target in scenario_cfg["targets"]:
425             if is_ip_addr(target):
426                 ip_list.append(target)
427                 context_cfg['target'] = {}
428             else:
429                 context_cfg['target'] = Context.get_server(target)
430                 if _is_same_heat_context(scenario_cfg["host"], target):
431                     ip_list.append(context_cfg["target"]["private_ip"])
432                 else:
433                     ip_list.append(context_cfg["target"]["ip"])
434         context_cfg['target']['ipaddr'] = ','.join(ip_list)
435
436     if "nodes" in scenario_cfg:
437         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
438     runner = base_runner.Runner.get(runner_cfg)
439
440     print("Starting runner of type '%s'" % runner_cfg["type"])
441     runner.run(scenario_cfg, context_cfg)
442
443     return runner
444
445
446 def parse_nodes_with_context(scenario_cfg):
447     '''paras the 'nodes' fields in scenario '''
448     nodes = scenario_cfg["nodes"]
449
450     nodes_cfg = {}
451     for nodename in nodes:
452         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
453
454     return nodes_cfg
455
456
457 def runner_join(runner):
458     '''join (wait for) a runner, exit process at runner failure'''
459     status = runner.join()
460     base_runner.Runner.release(runner)
461     if status != 0:
462         sys.exit("Runner failed")
463
464
465 def print_invalid_header(source_name, args):
466     print("Invalid %(source)s passed:\n\n %(args)s\n"
467           % {"source": source_name, "args": args})
468
469
470 def parse_task_args(src_name, args):
471     try:
472         kw = args and yaml.safe_load(args)
473         kw = {} if kw is None else kw
474     except yaml.parser.ParserError as e:
475         print_invalid_header(src_name, args)
476         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
477               % {"source": src_name, "err": e})
478         raise TypeError()
479
480     if not isinstance(kw, dict):
481         print_invalid_header(src_name, args)
482         print("%(src)s had to be dict, actually %(src_type)s\n"
483               % {"src": src_name, "src_type": type(kw)})
484         raise TypeError()
485     return kw
486
487
488 def check_environment():
489     auth_url = os.environ.get('OS_AUTH_URL', None)
490     if not auth_url:
491         try:
492             source_env(constants.OPENSTACK_RC_FILE)
493         except IOError as e:
494             if e.errno != errno.EEXIST:
495                 raise
496             LOG.debug('OPENRC file not found')
497
498
499 def change_server_name(scenario, suffix):
500     try:
501         scenario['host'] += suffix
502     except KeyError:
503         pass
504
505     try:
506         scenario['target'] += suffix
507     except KeyError:
508         pass
509
510     try:
511         key = 'targets'
512         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
513     except KeyError:
514         pass