Merge "update trex version for constants.so import bug"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 from six.moves import filter
24
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.task_template import TaskTemplate
28 from yardstick.common.utils import source_env
29 from yardstick.common import utils
30 from yardstick.common import constants
31
32 output_file_default = "/tmp/yardstick.out"
33 config_file = '/etc/yardstick/yardstick.conf'
34 test_cases_dir_default = "tests/opnfv/test_cases/"
35 LOG = logging.getLogger(__name__)
36
37
38 class Task(object):     # pragma: no cover
39     """Task commands.
40
41        Set of commands to manage benchmark tasks.
42     """
43
44     def __init__(self):
45         self.config = {}
46         self.contexts = []
47
48     def start(self, args, **kwargs):
49         """Start a benchmark scenario."""
50
51         atexit.register(self.atexit_handler)
52
53         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
54
55         check_environment()
56
57         self.config['yardstick'] = utils.parse_ini_file(config_file)
58
59         total_start_time = time.time()
60         parser = TaskParser(args.inputfile[0])
61
62         if args.suite:
63             # 1.parse suite, return suite_params info
64             task_files, task_args, task_args_fnames = \
65                 parser.parse_suite()
66         else:
67             task_files = [parser.path]
68             task_args = [args.task_args]
69             task_args_fnames = [args.task_args_file]
70
71         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
72                  task_files, task_args, task_args_fnames)
73
74         if args.parse_only:
75             sys.exit(0)
76
77         # parse task_files
78         for i in range(0, len(task_files)):
79             one_task_start_time = time.time()
80             parser.path = task_files[i]
81             scenarios, run_in_parallel, meet_precondition, contexts = \
82                 parser.parse_task(self.task_id, task_args[i],
83                                   task_args_fnames[i])
84
85             self.contexts.extend(contexts)
86
87             if not meet_precondition:
88                 LOG.info("meet_precondition is %s, please check envrionment",
89                          meet_precondition)
90                 continue
91
92             self._run(scenarios, run_in_parallel, args.output_file)
93
94             if args.keep_deploy:
95                 # keep deployment, forget about stack
96                 # (hide it for exit handler)
97                 self.contexts = []
98             else:
99                 for context in self.contexts[::-1]:
100                     context.undeploy()
101                 self.contexts = []
102             one_task_end_time = time.time()
103             LOG.info("task %s finished in %d secs", task_files[i],
104                      one_task_end_time - one_task_start_time)
105
106         total_end_time = time.time()
107         LOG.info("total finished in %d secs",
108                  total_end_time - total_start_time)
109
110         print("Done, exiting")
111
112     def _run(self, scenarios, run_in_parallel, output_file):
113         """Deploys context and calls runners"""
114         for context in self.contexts:
115             context.deploy()
116
117         background_runners = []
118
119         # Start all background scenarios
120         for scenario in filter(_is_background_scenario, scenarios):
121             scenario["runner"] = dict(type="Duration", duration=1000000000)
122             runner = self.run_one_scenario(scenario, output_file)
123             background_runners.append(runner)
124
125         runners = []
126         if run_in_parallel:
127             for scenario in scenarios:
128                 if not _is_background_scenario(scenario):
129                     runner = self.run_one_scenario(scenario, output_file)
130                     runners.append(runner)
131
132             # Wait for runners to finish
133             for runner in runners:
134                 runner_join(runner)
135                 print("Runner ended, output in", output_file)
136         else:
137             # run serially
138             for scenario in scenarios:
139                 if not _is_background_scenario(scenario):
140                     runner = self.run_one_scenario(scenario, output_file)
141                     runner_join(runner)
142                     print("Runner ended, output in", output_file)
143
144         # Abort background runners
145         for runner in background_runners:
146             runner.abort()
147
148         # Wait for background runners to finish
149         for runner in background_runners:
150             if runner.join(timeout=60) is None:
151                 # Nuke if it did not stop nicely
152                 base_runner.Runner.terminate(runner)
153                 runner_join(runner)
154             else:
155                 base_runner.Runner.release(runner)
156             print("Background task ended")
157
158     def atexit_handler(self):
159         """handler for process termination"""
160         base_runner.Runner.terminate_all()
161
162         if self.contexts:
163             print("Undeploying all contexts")
164             for context in self.contexts[::-1]:
165                 context.undeploy()
166
167     def run_one_scenario(self, scenario_cfg, output_file):
168         """run one scenario using context"""
169         runner_cfg = scenario_cfg["runner"]
170         runner_cfg['output_filename'] = output_file
171
172         # TODO support get multi hosts/vms info
173         context_cfg = {}
174         if "host" in scenario_cfg:
175             context_cfg['host'] = Context.get_server(scenario_cfg["host"])
176
177         if "target" in scenario_cfg:
178             if is_ip_addr(scenario_cfg["target"]):
179                 context_cfg['target'] = {}
180                 context_cfg['target']["ipaddr"] = scenario_cfg["target"]
181             else:
182                 context_cfg['target'] = Context.get_server(
183                     scenario_cfg["target"])
184                 if self._is_same_heat_context(scenario_cfg["host"],
185                                               scenario_cfg["target"]):
186                     context_cfg["target"]["ipaddr"] = \
187                         context_cfg["target"]["private_ip"]
188                 else:
189                     context_cfg["target"]["ipaddr"] = \
190                         context_cfg["target"]["ip"]
191
192         if "targets" in scenario_cfg:
193             ip_list = []
194             for target in scenario_cfg["targets"]:
195                 if is_ip_addr(target):
196                     ip_list.append(target)
197                     context_cfg['target'] = {}
198                 else:
199                     context_cfg['target'] = Context.get_server(target)
200                     if self._is_same_heat_context(scenario_cfg["host"],
201                                                   target):
202                         ip_list.append(context_cfg["target"]["private_ip"])
203                     else:
204                         ip_list.append(context_cfg["target"]["ip"])
205             context_cfg['target']['ipaddr'] = ','.join(ip_list)
206
207         if "nodes" in scenario_cfg:
208             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
209         runner = base_runner.Runner.get(runner_cfg, self.config)
210
211         print("Starting runner of type '%s'" % runner_cfg["type"])
212         runner.run(scenario_cfg, context_cfg)
213
214         return runner
215
216     def _is_same_heat_context(self, host_attr, target_attr):
217         """check if two servers are in the same heat context
218         host_attr: either a name for a server created by yardstick or a dict
219         with attribute name mapping when using external heat templates
220         target_attr: either a name for a server created by yardstick or a dict
221         with attribute name mapping when using external heat templates
222         """
223         return True
224         host = None
225         target = None
226         for context in self.contexts:
227             if context.__context_type__ != "Heat":
228                 continue
229
230             host = context._get_server(host_attr)
231             if host is None:
232                 continue
233
234             target = context._get_server(target_attr)
235             if target is None:
236                 return False
237
238             # Both host and target is not None, then they are in the
239             # same heat context.
240             return True
241
242         return False
243
244
245 class TaskParser(object):       # pragma: no cover
246     """Parser for task config files in yaml format"""
247
248     def __init__(self, path):
249         self.path = path
250
251     def _meet_constraint(self, task, cur_pod, cur_installer):
252         if "constraint" in task:
253             constraint = task.get('constraint', None)
254             if constraint is not None:
255                 tc_fit_pod = constraint.get('pod', None)
256                 tc_fit_installer = constraint.get('installer', None)
257                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
258                          cur_pod, cur_installer, constraint)
259                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
260                     return False
261                 if cur_installer and tc_fit_installer and \
262                         cur_installer not in tc_fit_installer:
263                     return False
264         return True
265
266     def _get_task_para(self, task, cur_pod):
267         task_args = task.get('task_args', None)
268         if task_args is not None:
269             task_args = task_args.get(cur_pod, None)
270         task_args_fnames = task.get('task_args_fnames', None)
271         if task_args_fnames is not None:
272             task_args_fnames = task_args_fnames.get(cur_pod, None)
273         return task_args, task_args_fnames
274
275     def parse_suite(self):
276         """parse the suite file and return a list of task config file paths
277            and lists of optional parameters if present"""
278         LOG.info("\nParsing suite file:%s", self.path)
279
280         try:
281             with open(self.path) as stream:
282                 cfg = yaml.load(stream)
283         except IOError as ioerror:
284             sys.exit(ioerror)
285
286         self._check_schema(cfg["schema"], "suite")
287         LOG.info("\nStarting scenario:%s", cfg["name"])
288
289         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
290         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
291                                       test_cases_dir)
292         if test_cases_dir[-1] != os.sep:
293             test_cases_dir += os.sep
294
295         cur_pod = os.environ.get('NODE_NAME', None)
296         cur_installer = os.environ.get('INSTALLER_TYPE', None)
297
298         valid_task_files = []
299         valid_task_args = []
300         valid_task_args_fnames = []
301
302         for task in cfg["test_cases"]:
303             # 1.check file_name
304             if "file_name" in task:
305                 task_fname = task.get('file_name', None)
306                 if task_fname is None:
307                     continue
308             else:
309                 continue
310             # 2.check constraint
311             if self._meet_constraint(task, cur_pod, cur_installer):
312                 valid_task_files.append(test_cases_dir + task_fname)
313             else:
314                 continue
315             # 3.fetch task parameters
316             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
317             valid_task_args.append(task_args)
318             valid_task_args_fnames.append(task_args_fnames)
319
320         return valid_task_files, valid_task_args, valid_task_args_fnames
321
322     def parse_task(self, task_id, task_args=None, task_args_file=None):
323         """parses the task file and return an context and scenario instances"""
324         print("Parsing task config:", self.path)
325
326         try:
327             kw = {}
328             if task_args_file:
329                 with open(task_args_file) as f:
330                     kw.update(parse_task_args("task_args_file", f.read()))
331             kw.update(parse_task_args("task_args", task_args))
332         except TypeError:
333             raise TypeError()
334
335         try:
336             with open(self.path) as f:
337                 try:
338                     input_task = f.read()
339                     rendered_task = TaskTemplate.render(input_task, **kw)
340                 except Exception as e:
341                     print("Failed to render template:\n%(task)s\n%(err)s\n"
342                           % {"task": input_task, "err": e})
343                     raise e
344                 print("Input task is:\n%s\n" % rendered_task)
345
346                 cfg = yaml.load(rendered_task)
347         except IOError as ioerror:
348             sys.exit(ioerror)
349
350         self._check_schema(cfg["schema"], "task")
351         meet_precondition = self._check_precondition(cfg)
352
353         # TODO: support one or many contexts? Many would simpler and precise
354         # TODO: support hybrid context type
355         if "context" in cfg:
356             context_cfgs = [cfg["context"]]
357         elif "contexts" in cfg:
358             context_cfgs = cfg["contexts"]
359         else:
360             context_cfgs = [{"type": "Dummy"}]
361
362         contexts = []
363         name_suffix = '-{}'.format(task_id[:8])
364         for cfg_attrs in context_cfgs:
365             try:
366                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
367                                                   name_suffix)
368             except KeyError:
369                 pass
370             # default to Heat context because we are testing OpenStack
371             context_type = cfg_attrs.get("type", "Heat")
372             context = Context.get(context_type)
373             context.init(cfg_attrs)
374             contexts.append(context)
375
376         run_in_parallel = cfg.get("run_in_parallel", False)
377
378         # add tc and task id for influxdb extended tags
379         for scenario in cfg["scenarios"]:
380             task_name = os.path.splitext(os.path.basename(self.path))[0]
381             scenario["tc"] = task_name
382             scenario["task_id"] = task_id
383
384             change_server_name(scenario, name_suffix)
385
386             try:
387                 for node in scenario['nodes']:
388                     scenario['nodes'][node] += name_suffix
389             except KeyError:
390                 pass
391
392         # TODO we need something better here, a class that represent the file
393         return cfg["scenarios"], run_in_parallel, meet_precondition, contexts
394
395     def _check_schema(self, cfg_schema, schema_type):
396         """Check if config file is using the correct schema type"""
397
398         if cfg_schema != "yardstick:" + schema_type + ":0.1":
399             sys.exit("error: file %s has unknown schema %s" % (self.path,
400                                                                cfg_schema))
401
402     def _check_precondition(self, cfg):
403         """Check if the envrionment meet the preconditon"""
404
405         if "precondition" in cfg:
406             precondition = cfg["precondition"]
407             installer_type = precondition.get("installer_type", None)
408             deploy_scenarios = precondition.get("deploy_scenarios", None)
409             tc_fit_pods = precondition.get("pod_name", None)
410             installer_type_env = os.environ.get('INSTALL_TYPE', None)
411             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
412             pod_name_env = os.environ.get('NODE_NAME', None)
413
414             LOG.info("installer_type: %s, installer_type_env: %s",
415                      installer_type, installer_type_env)
416             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
417                      deploy_scenarios, deploy_scenario_env)
418             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
419                      tc_fit_pods, pod_name_env)
420             if installer_type and installer_type_env:
421                 if installer_type_env not in installer_type:
422                     return False
423             if deploy_scenarios and deploy_scenario_env:
424                 deploy_scenarios_list = deploy_scenarios.split(',')
425                 for deploy_scenario in deploy_scenarios_list:
426                     if deploy_scenario_env.startswith(deploy_scenario):
427                         return True
428                 return False
429             if tc_fit_pods and pod_name_env:
430                 if pod_name_env not in tc_fit_pods:
431                     return False
432         return True
433
434
435 def is_ip_addr(addr):
436     """check if string addr is an IP address"""
437     try:
438         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
439     except AttributeError:
440         pass
441
442     try:
443         ipaddress.ip_address(addr.encode('utf-8'))
444     except ValueError:
445         return False
446     else:
447         return True
448
449
450 def _is_background_scenario(scenario):
451     if "run_in_background" in scenario:
452         return scenario["run_in_background"]
453     else:
454         return False
455
456
457 def parse_nodes_with_context(scenario_cfg):
458     """paras the 'nodes' fields in scenario """
459     nodes = scenario_cfg["nodes"]
460
461     nodes_cfg = {}
462     for nodename in nodes:
463         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
464
465     return nodes_cfg
466
467
468 def runner_join(runner):
469     """join (wait for) a runner, exit process at runner failure"""
470     status = runner.join()
471     base_runner.Runner.release(runner)
472     if status != 0:
473         sys.exit("Runner failed")
474
475
476 def print_invalid_header(source_name, args):
477     print("Invalid %(source)s passed:\n\n %(args)s\n"
478           % {"source": source_name, "args": args})
479
480
481 def parse_task_args(src_name, args):
482     try:
483         kw = args and yaml.safe_load(args)
484         kw = {} if kw is None else kw
485     except yaml.parser.ParserError as e:
486         print_invalid_header(src_name, args)
487         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
488               % {"source": src_name, "err": e})
489         raise TypeError()
490
491     if not isinstance(kw, dict):
492         print_invalid_header(src_name, args)
493         print("%(src)s had to be dict, actually %(src_type)s\n"
494               % {"src": src_name, "src_type": type(kw)})
495         raise TypeError()
496     return kw
497
498
499 def check_environment():
500     auth_url = os.environ.get('OS_AUTH_URL', None)
501     if not auth_url:
502         try:
503             source_env(constants.OPENSTACK_RC_FILE)
504         except IOError as e:
505             if e.errno != errno.EEXIST:
506                 raise
507             LOG.debug('OPENRC file not found')
508
509
510 def change_server_name(scenario, suffix):
511     try:
512         host = scenario['host']
513     except KeyError:
514         pass
515     else:
516         try:
517             host['name'] += suffix
518         except TypeError:
519             scenario['host'] += suffix
520
521     try:
522         target = scenario['target']
523     except KeyError:
524         pass
525     else:
526         try:
527             target['name'] += suffix
528         except TypeError:
529             scenario['target'] += suffix
530
531     try:
532         key = 'targets'
533         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
534     except KeyError:
535         pass