Merge "ci in huawei-pod1 do not run test case after tc074"
[yardstick.git] / yardstick / benchmark / core / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys
15 import os
16 import yaml
17 import atexit
18 import ipaddress
19 import time
20 import logging
21 import uuid
22 import errno
23 from six.moves import filter
24
25 from yardstick.benchmark.contexts.base import Context
26 from yardstick.benchmark.runners import base as base_runner
27 from yardstick.common.task_template import TaskTemplate
28 from yardstick.common.utils import source_env
29 from yardstick.common import constants
30
31 output_file_default = "/tmp/yardstick.out"
32 test_cases_dir_default = "tests/opnfv/test_cases/"
33 LOG = logging.getLogger(__name__)
34
35
36 class Task(object):     # pragma: no cover
37     """Task commands.
38
39        Set of commands to manage benchmark tasks.
40     """
41
42     def start(self, args, **kwargs):
43         """Start a benchmark scenario."""
44
45         atexit.register(atexit_handler)
46
47         self.task_id = kwargs.get('task_id', str(uuid.uuid4()))
48
49         check_environment()
50
51         total_start_time = time.time()
52         parser = TaskParser(args.inputfile[0])
53
54         if args.suite:
55             # 1.parse suite, return suite_params info
56             task_files, task_args, task_args_fnames = \
57                 parser.parse_suite()
58         else:
59             task_files = [parser.path]
60             task_args = [args.task_args]
61             task_args_fnames = [args.task_args_file]
62
63         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
64                  task_files, task_args, task_args_fnames)
65
66         if args.parse_only:
67             sys.exit(0)
68
69         # parse task_files
70         for i in range(0, len(task_files)):
71             one_task_start_time = time.time()
72             parser.path = task_files[i]
73             scenarios, run_in_parallel, meet_precondition = parser.parse_task(
74                 self.task_id, task_args[i], task_args_fnames[i])
75
76             if not meet_precondition:
77                 LOG.info("meet_precondition is %s, please check envrionment",
78                          meet_precondition)
79                 continue
80
81             self._run(scenarios, run_in_parallel, args.output_file)
82
83             if args.keep_deploy:
84                 # keep deployment, forget about stack
85                 # (hide it for exit handler)
86                 Context.list = []
87             else:
88                 for context in Context.list[::-1]:
89                     context.undeploy()
90                 Context.list = []
91             one_task_end_time = time.time()
92             LOG.info("task %s finished in %d secs", task_files[i],
93                      one_task_end_time - one_task_start_time)
94
95         total_end_time = time.time()
96         LOG.info("total finished in %d secs",
97                  total_end_time - total_start_time)
98
99         print("Done, exiting")
100
101     def _run(self, scenarios, run_in_parallel, output_file):
102         """Deploys context and calls runners"""
103         for context in Context.list:
104             context.deploy()
105
106         background_runners = []
107
108         # Start all background scenarios
109         for scenario in filter(_is_background_scenario, scenarios):
110             scenario["runner"] = dict(type="Duration", duration=1000000000)
111             runner = run_one_scenario(scenario, output_file)
112             background_runners.append(runner)
113
114         runners = []
115         if run_in_parallel:
116             for scenario in scenarios:
117                 if not _is_background_scenario(scenario):
118                     runner = run_one_scenario(scenario, output_file)
119                     runners.append(runner)
120
121             # Wait for runners to finish
122             for runner in runners:
123                 runner_join(runner)
124                 print("Runner ended, output in", output_file)
125         else:
126             # run serially
127             for scenario in scenarios:
128                 if not _is_background_scenario(scenario):
129                     runner = run_one_scenario(scenario, output_file)
130                     runner_join(runner)
131                     print("Runner ended, output in", output_file)
132
133         # Abort background runners
134         for runner in background_runners:
135             runner.abort()
136
137         # Wait for background runners to finish
138         for runner in background_runners:
139             if runner.join(timeout=60) is None:
140                 # Nuke if it did not stop nicely
141                 base_runner.Runner.terminate(runner)
142                 runner_join(runner)
143             else:
144                 base_runner.Runner.release(runner)
145             print("Background task ended")
146
147
148 # TODO: Move stuff below into TaskCommands class !?
149
150
151 class TaskParser(object):       # pragma: no cover
152     """Parser for task config files in yaml format"""
153
154     def __init__(self, path):
155         self.path = path
156
157     def _meet_constraint(self, task, cur_pod, cur_installer):
158         if "constraint" in task:
159             constraint = task.get('constraint', None)
160             if constraint is not None:
161                 tc_fit_pod = constraint.get('pod', None)
162                 tc_fit_installer = constraint.get('installer', None)
163                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
164                          cur_pod, cur_installer, constraint)
165                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
166                     return False
167                 if cur_installer and tc_fit_installer and \
168                         cur_installer not in tc_fit_installer:
169                     return False
170         return True
171
172     def _get_task_para(self, task, cur_pod):
173         task_args = task.get('task_args', None)
174         if task_args is not None:
175             task_args = task_args.get(cur_pod, None)
176         task_args_fnames = task.get('task_args_fnames', None)
177         if task_args_fnames is not None:
178             task_args_fnames = task_args_fnames.get(cur_pod, None)
179         return task_args, task_args_fnames
180
181     def parse_suite(self):
182         """parse the suite file and return a list of task config file paths
183            and lists of optional parameters if present"""
184         LOG.info("\nParsing suite file:%s", self.path)
185
186         try:
187             with open(self.path) as stream:
188                 cfg = yaml.load(stream)
189         except IOError as ioerror:
190             sys.exit(ioerror)
191
192         self._check_schema(cfg["schema"], "suite")
193         LOG.info("\nStarting scenario:%s", cfg["name"])
194
195         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
196         test_cases_dir = os.path.join(constants.YARDSTICK_ROOT_PATH,
197                                       test_cases_dir)
198         if test_cases_dir[-1] != os.sep:
199             test_cases_dir += os.sep
200
201         cur_pod = os.environ.get('NODE_NAME', None)
202         cur_installer = os.environ.get('INSTALLER_TYPE', None)
203
204         valid_task_files = []
205         valid_task_args = []
206         valid_task_args_fnames = []
207
208         for task in cfg["test_cases"]:
209             # 1.check file_name
210             if "file_name" in task:
211                 task_fname = task.get('file_name', None)
212                 if task_fname is None:
213                     continue
214             else:
215                 continue
216             # 2.check constraint
217             if self._meet_constraint(task, cur_pod, cur_installer):
218                 valid_task_files.append(test_cases_dir + task_fname)
219             else:
220                 continue
221             # 3.fetch task parameters
222             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
223             valid_task_args.append(task_args)
224             valid_task_args_fnames.append(task_args_fnames)
225
226         return valid_task_files, valid_task_args, valid_task_args_fnames
227
228     def parse_task(self, task_id, task_args=None, task_args_file=None):
229         """parses the task file and return an context and scenario instances"""
230         print("Parsing task config:", self.path)
231
232         try:
233             kw = {}
234             if task_args_file:
235                 with open(task_args_file) as f:
236                     kw.update(parse_task_args("task_args_file", f.read()))
237             kw.update(parse_task_args("task_args", task_args))
238         except TypeError:
239             raise TypeError()
240
241         try:
242             with open(self.path) as f:
243                 try:
244                     input_task = f.read()
245                     rendered_task = TaskTemplate.render(input_task, **kw)
246                 except Exception as e:
247                     print("Failed to render template:\n%(task)s\n%(err)s\n"
248                           % {"task": input_task, "err": e})
249                     raise e
250                 print("Input task is:\n%s\n" % rendered_task)
251
252                 cfg = yaml.load(rendered_task)
253         except IOError as ioerror:
254             sys.exit(ioerror)
255
256         self._check_schema(cfg["schema"], "task")
257         meet_precondition = self._check_precondition(cfg)
258
259         # TODO: support one or many contexts? Many would simpler and precise
260         # TODO: support hybrid context type
261         if "context" in cfg:
262             context_cfgs = [cfg["context"]]
263         elif "contexts" in cfg:
264             context_cfgs = cfg["contexts"]
265         else:
266             context_cfgs = [{"type": "Dummy"}]
267
268         name_suffix = '-{}'.format(task_id[:8])
269         for cfg_attrs in context_cfgs:
270             try:
271                 cfg_attrs['name'] = '{}{}'.format(cfg_attrs['name'],
272                                                   name_suffix)
273             except KeyError:
274                 pass
275             context_type = cfg_attrs.get("type", "Heat")
276             if "Heat" == context_type and "networks" in cfg_attrs:
277                 # bugfix: if there are more than one network,
278                 # only add "external_network" on first one.
279                 # the name of netwrok should follow this rule:
280                 # test, test2, test3 ...
281                 # sort network with the length of network's name
282                 sorted_networks = sorted(cfg_attrs["networks"])
283                 # config external_network based on env var
284                 cfg_attrs["networks"][sorted_networks[0]]["external_network"] \
285                     = os.environ.get("EXTERNAL_NETWORK", "net04_ext")
286
287             context = Context.get(context_type)
288             context.init(cfg_attrs)
289
290         run_in_parallel = cfg.get("run_in_parallel", False)
291
292         # add tc and task id for influxdb extended tags
293         for scenario in cfg["scenarios"]:
294             task_name = os.path.splitext(os.path.basename(self.path))[0]
295             scenario["tc"] = task_name
296             scenario["task_id"] = task_id
297
298             change_server_name(scenario, name_suffix)
299
300             try:
301                 for node in scenario['nodes']:
302                     scenario['nodes'][node] += name_suffix
303             except KeyError:
304                 pass
305
306         # TODO we need something better here, a class that represent the file
307         return cfg["scenarios"], run_in_parallel, meet_precondition
308
309     def _check_schema(self, cfg_schema, schema_type):
310         """Check if config file is using the correct schema type"""
311
312         if cfg_schema != "yardstick:" + schema_type + ":0.1":
313             sys.exit("error: file %s has unknown schema %s" % (self.path,
314                                                                cfg_schema))
315
316     def _check_precondition(self, cfg):
317         """Check if the envrionment meet the preconditon"""
318
319         if "precondition" in cfg:
320             precondition = cfg["precondition"]
321             installer_type = precondition.get("installer_type", None)
322             deploy_scenarios = precondition.get("deploy_scenarios", None)
323             tc_fit_pods = precondition.get("pod_name", None)
324             installer_type_env = os.environ.get('INSTALL_TYPE', None)
325             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
326             pod_name_env = os.environ.get('NODE_NAME', None)
327
328             LOG.info("installer_type: %s, installer_type_env: %s",
329                      installer_type, installer_type_env)
330             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
331                      deploy_scenarios, deploy_scenario_env)
332             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
333                      tc_fit_pods, pod_name_env)
334             if installer_type and installer_type_env:
335                 if installer_type_env not in installer_type:
336                     return False
337             if deploy_scenarios and deploy_scenario_env:
338                 deploy_scenarios_list = deploy_scenarios.split(',')
339                 for deploy_scenario in deploy_scenarios_list:
340                     if deploy_scenario_env.startswith(deploy_scenario):
341                         return True
342                 return False
343             if tc_fit_pods and pod_name_env:
344                 if pod_name_env not in tc_fit_pods:
345                     return False
346         return True
347
348
349 def atexit_handler():
350     """handler for process termination"""
351     base_runner.Runner.terminate_all()
352
353     if len(Context.list) > 0:
354         print("Undeploying all contexts")
355         for context in Context.list[::-1]:
356             context.undeploy()
357
358
359 def is_ip_addr(addr):
360     """check if string addr is an IP address"""
361     try:
362         addr = addr.get('public_ip_attr', addr.get('private_ip_attr'))
363     except AttributeError:
364         pass
365
366     try:
367         ipaddress.ip_address(addr.encode('utf-8'))
368     except ValueError:
369         return False
370     else:
371         return True
372
373
374 def _is_same_heat_context(host_attr, target_attr):
375     """check if two servers are in the same heat context
376     host_attr: either a name for a server created by yardstick or a dict
377     with attribute name mapping when using external heat templates
378     target_attr: either a name for a server created by yardstick or a dict
379     with attribute name mapping when using external heat templates
380     """
381     host = None
382     target = None
383     for context in Context.list:
384         if context.__context_type__ != "Heat":
385             continue
386
387         host = context._get_server(host_attr)
388         if host is None:
389             continue
390
391         target = context._get_server(target_attr)
392         if target is None:
393             return False
394
395         # Both host and target is not None, then they are in the
396         # same heat context.
397         return True
398
399     return False
400
401
402 def _is_background_scenario(scenario):
403     if "run_in_background" in scenario:
404         return scenario["run_in_background"]
405     else:
406         return False
407
408
409 def run_one_scenario(scenario_cfg, output_file):
410     """run one scenario using context"""
411     runner_cfg = scenario_cfg["runner"]
412     runner_cfg['output_filename'] = output_file
413
414     # TODO support get multi hosts/vms info
415     context_cfg = {}
416     if "host" in scenario_cfg:
417         context_cfg['host'] = Context.get_server(scenario_cfg["host"])
418
419     if "target" in scenario_cfg:
420         if is_ip_addr(scenario_cfg["target"]):
421             context_cfg['target'] = {}
422             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
423         else:
424             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
425             if _is_same_heat_context(scenario_cfg["host"],
426                                      scenario_cfg["target"]):
427                 context_cfg["target"]["ipaddr"] = \
428                     context_cfg["target"]["private_ip"]
429             else:
430                 context_cfg["target"]["ipaddr"] = \
431                     context_cfg["target"]["ip"]
432
433     if "targets" in scenario_cfg:
434         ip_list = []
435         for target in scenario_cfg["targets"]:
436             if is_ip_addr(target):
437                 ip_list.append(target)
438                 context_cfg['target'] = {}
439             else:
440                 context_cfg['target'] = Context.get_server(target)
441                 if _is_same_heat_context(scenario_cfg["host"], target):
442                     ip_list.append(context_cfg["target"]["private_ip"])
443                 else:
444                     ip_list.append(context_cfg["target"]["ip"])
445         context_cfg['target']['ipaddr'] = ','.join(ip_list)
446
447     if "nodes" in scenario_cfg:
448         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
449     runner = base_runner.Runner.get(runner_cfg)
450
451     print("Starting runner of type '%s'" % runner_cfg["type"])
452     runner.run(scenario_cfg, context_cfg)
453
454     return runner
455
456
457 def parse_nodes_with_context(scenario_cfg):
458     """paras the 'nodes' fields in scenario """
459     nodes = scenario_cfg["nodes"]
460
461     nodes_cfg = {}
462     for nodename in nodes:
463         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
464
465     return nodes_cfg
466
467
468 def runner_join(runner):
469     """join (wait for) a runner, exit process at runner failure"""
470     status = runner.join()
471     base_runner.Runner.release(runner)
472     if status != 0:
473         sys.exit("Runner failed")
474
475
476 def print_invalid_header(source_name, args):
477     print("Invalid %(source)s passed:\n\n %(args)s\n"
478           % {"source": source_name, "args": args})
479
480
481 def parse_task_args(src_name, args):
482     try:
483         kw = args and yaml.safe_load(args)
484         kw = {} if kw is None else kw
485     except yaml.parser.ParserError as e:
486         print_invalid_header(src_name, args)
487         print("%(source)s has to be YAML. Details:\n\n%(err)s\n"
488               % {"source": src_name, "err": e})
489         raise TypeError()
490
491     if not isinstance(kw, dict):
492         print_invalid_header(src_name, args)
493         print("%(src)s had to be dict, actually %(src_type)s\n"
494               % {"src": src_name, "src_type": type(kw)})
495         raise TypeError()
496     return kw
497
498
499 def check_environment():
500     auth_url = os.environ.get('OS_AUTH_URL', None)
501     if not auth_url:
502         try:
503             source_env(constants.OPENSTACK_RC_FILE)
504         except IOError as e:
505             if e.errno != errno.EEXIST:
506                 raise
507             LOG.debug('OPENRC file not found')
508
509
510 def change_server_name(scenario, suffix):
511     try:
512         host = scenario['host']
513     except KeyError:
514         pass
515     else:
516         try:
517             host['name'] += suffix
518         except TypeError:
519             scenario['host'] += suffix
520
521     try:
522         target = scenario['target']
523     except KeyError:
524         pass
525     else:
526         try:
527             target['name'] += suffix
528         except TypeError:
529             scenario['target'] += suffix
530
531     try:
532         key = 'targets'
533         scenario[key] = ['{}{}'.format(a, suffix) for a in scenario[key]]
534     except KeyError:
535         pass