Merge "Modify grafana config for TC037_Network Latency, Throughput, Packet Loss and...
[yardstick.git] / yardstick / cmd / commands / task.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 """ Handler for yardstick command 'task' """
11
12 import sys
13 import os
14 import yaml
15 import atexit
16 import ipaddress
17 import time
18 import logging
19 import uuid
20 from itertools import ifilter
21
22 from yardstick.benchmark.contexts.base import Context
23 from yardstick.benchmark.runners import base as base_runner
24 from yardstick.common.task_template import TaskTemplate
25 from yardstick.common.utils import cliargs
26
27 output_file_default = "/tmp/yardstick.out"
28 test_cases_dir_default = "tests/opnfv/test_cases/"
29 LOG = logging.getLogger(__name__)
30
31
32 class TaskCommands(object):
33     '''Task commands.
34
35        Set of commands to manage benchmark tasks.
36     '''
37
38     @cliargs("inputfile", type=str, help="path to task or suite file", nargs=1)
39     @cliargs("--task-args", dest="task_args",
40              help="Input task args (dict in json). These args are used"
41              "to render input task that is jinja2 template.")
42     @cliargs("--task-args-file", dest="task_args_file",
43              help="Path to the file with input task args (dict in "
44              "json/yaml). These args are used to render input"
45              "task that is jinja2 template.")
46     @cliargs("--keep-deploy", help="keep context deployed in cloud",
47              action="store_true")
48     @cliargs("--parse-only", help="parse the config file and exit",
49              action="store_true")
50     @cliargs("--output-file", help="file where output is stored, default %s" %
51              output_file_default, default=output_file_default)
52     @cliargs("--suite", help="process test suite file instead of a task file",
53              action="store_true")
54     def do_start(self, args):
55         '''Start a benchmark scenario.'''
56
57         atexit.register(atexit_handler)
58
59         total_start_time = time.time()
60         parser = TaskParser(args.inputfile[0])
61
62         if args.suite:
63             # 1.parse suite, return suite_params info
64             task_files, task_args, task_args_fnames = \
65                 parser.parse_suite()
66         else:
67             task_files = [parser.path]
68             task_args = [args.task_args]
69             task_args_fnames = [args.task_args_file]
70
71         LOG.info("\ntask_files:%s, \ntask_args:%s, \ntask_args_fnames:%s",
72                  task_files, task_args, task_args_fnames)
73
74         if args.parse_only:
75             sys.exit(0)
76
77         if os.path.isfile(args.output_file):
78             os.remove(args.output_file)
79         # parse task_files
80         for i in range(0, len(task_files)):
81             one_task_start_time = time.time()
82             parser.path = task_files[i]
83             scenarios, run_in_parallel, meet_precondition = parser.parse_task(
84                  task_args[i], task_args_fnames[i])
85
86             if not meet_precondition:
87                 LOG.info("meet_precondition is %s, please check envrionment",
88                          meet_precondition)
89                 continue
90
91             self._run(scenarios, run_in_parallel, args.output_file)
92
93             if args.keep_deploy:
94                 # keep deployment, forget about stack
95                 # (hide it for exit handler)
96                 Context.list = []
97             else:
98                 for context in Context.list:
99                     context.undeploy()
100                 Context.list = []
101             one_task_end_time = time.time()
102             LOG.info("task %s finished in %d secs", task_files[i],
103                      one_task_end_time - one_task_start_time)
104
105         total_end_time = time.time()
106         LOG.info("total finished in %d secs",
107                  total_end_time - total_start_time)
108
109         print "Done, exiting"
110
111     def _run(self, scenarios, run_in_parallel, output_file):
112         '''Deploys context and calls runners'''
113         for context in Context.list:
114             context.deploy()
115
116         background_runners = []
117
118         # Start all background scenarios
119         for scenario in ifilter(_is_background_scenario, scenarios):
120             scenario["runner"] = dict(type="Duration", duration=1000000000)
121             runner = run_one_scenario(scenario, output_file)
122             background_runners.append(runner)
123
124         runners = []
125         if run_in_parallel:
126             for scenario in scenarios:
127                 if not _is_background_scenario(scenario):
128                     runner = run_one_scenario(scenario, output_file)
129                     runners.append(runner)
130
131             # Wait for runners to finish
132             for runner in runners:
133                 runner_join(runner)
134                 print "Runner ended, output in", output_file
135         else:
136             # run serially
137             for scenario in scenarios:
138                 if not _is_background_scenario(scenario):
139                     runner = run_one_scenario(scenario, output_file)
140                     runner_join(runner)
141                     print "Runner ended, output in", output_file
142
143         # Abort background runners
144         for runner in background_runners:
145             runner.abort()
146
147         # Wait for background runners to finish
148         for runner in background_runners:
149             if runner.join(timeout=60) is None:
150                 # Nuke if it did not stop nicely
151                 base_runner.Runner.terminate(runner)
152                 runner_join(runner)
153             else:
154                 base_runner.Runner.release(runner)
155             print "Background task ended"
156
157
158 # TODO: Move stuff below into TaskCommands class !?
159
160
161 class TaskParser(object):
162     '''Parser for task config files in yaml format'''
163     def __init__(self, path):
164         self.path = path
165
166     def _meet_constraint(self, task, cur_pod, cur_installer):
167         if "constraint" in task:
168             constraint = task.get('constraint', None)
169             if constraint is not None:
170                 tc_fit_pod = constraint.get('pod', None)
171                 tc_fit_installer = constraint.get('installer', None)
172                 LOG.info("cur_pod:%s, cur_installer:%s,tc_constraints:%s",
173                          cur_pod, cur_installer, constraint)
174                 if cur_pod and tc_fit_pod and cur_pod not in tc_fit_pod:
175                     return False
176                 if cur_installer and tc_fit_installer and \
177                         cur_installer not in tc_fit_installer:
178                     return False
179         return True
180
181     def _get_task_para(self, task, cur_pod):
182         task_args = task.get('task_args', None)
183         if task_args is not None:
184             task_args = task_args.get(cur_pod, None)
185         task_args_fnames = task.get('task_args_fnames', None)
186         if task_args_fnames is not None:
187             task_args_fnames = task_args_fnames.get(cur_pod, None)
188         return task_args, task_args_fnames
189
190     def parse_suite(self):
191         '''parse the suite file and return a list of task config file paths
192            and lists of optional parameters if present'''
193         LOG.info("\nParsing suite file:%s", self.path)
194
195         try:
196             with open(self.path) as stream:
197                 cfg = yaml.load(stream)
198         except IOError as ioerror:
199             sys.exit(ioerror)
200
201         self._check_schema(cfg["schema"], "suite")
202         LOG.info("\nStarting scenario:%s", cfg["name"])
203
204         test_cases_dir = cfg.get("test_cases_dir", test_cases_dir_default)
205         if test_cases_dir[-1] != os.sep:
206             test_cases_dir += os.sep
207
208         cur_pod = os.environ.get('NODE_NAME', None)
209         cur_installer = os.environ.get('INSTALLER_TYPE', None)
210
211         valid_task_files = []
212         valid_task_args = []
213         valid_task_args_fnames = []
214
215         for task in cfg["test_cases"]:
216             # 1.check file_name
217             if "file_name" in task:
218                 task_fname = task.get('file_name', None)
219                 if task_fname is None:
220                     continue
221             else:
222                 continue
223             # 2.check constraint
224             if self._meet_constraint(task, cur_pod, cur_installer):
225                 valid_task_files.append(test_cases_dir + task_fname)
226             else:
227                 continue
228             # 3.fetch task parameters
229             task_args, task_args_fnames = self._get_task_para(task, cur_pod)
230             valid_task_args.append(task_args)
231             valid_task_args_fnames.append(task_args_fnames)
232
233         return valid_task_files, valid_task_args, valid_task_args_fnames
234
235     def parse_task(self, task_args=None, task_args_file=None):
236         '''parses the task file and return an context and scenario instances'''
237         print "Parsing task config:", self.path
238
239         try:
240             kw = {}
241             if task_args_file:
242                 with open(task_args_file) as f:
243                     kw.update(parse_task_args("task_args_file", f.read()))
244             kw.update(parse_task_args("task_args", task_args))
245         except TypeError:
246             raise TypeError()
247
248         try:
249             with open(self.path) as f:
250                 try:
251                     input_task = f.read()
252                     rendered_task = TaskTemplate.render(input_task, **kw)
253                 except Exception as e:
254                     print(("Failed to render template:\n%(task)s\n%(err)s\n")
255                           % {"task": input_task, "err": e})
256                     raise e
257                 print(("Input task is:\n%s\n") % rendered_task)
258
259                 cfg = yaml.load(rendered_task)
260         except IOError as ioerror:
261             sys.exit(ioerror)
262
263         self._check_schema(cfg["schema"], "task")
264         meet_precondition = self._check_precondition(cfg)
265
266         # TODO: support one or many contexts? Many would simpler and precise
267         # TODO: support hybrid context type
268         if "context" in cfg:
269             context_cfgs = [cfg["context"]]
270         elif "contexts" in cfg:
271             context_cfgs = cfg["contexts"]
272         else:
273             context_cfgs = [{"type": "Dummy"}]
274
275         for cfg_attrs in context_cfgs:
276             context_type = cfg_attrs.get("type", "Heat")
277             if "Heat" == context_type and "networks" in cfg_attrs:
278                 # bugfix: if there are more than one network,
279                 # only add "external_network" on first one.
280                 # the name of netwrok should follow this rule:
281                 # test, test2, test3 ...
282                 # sort network with the length of network's name
283                 sorted_networks = sorted(cfg_attrs["networks"].keys())
284                 # config external_network based on env var
285                 cfg_attrs["networks"][sorted_networks[0]]["external_network"] \
286                     = os.environ.get("EXTERNAL_NETWORK", "net04_ext")
287
288             context = Context.get(context_type)
289             context.init(cfg_attrs)
290
291         run_in_parallel = cfg.get("run_in_parallel", False)
292
293         # add tc and task id for influxdb extended tags
294         task_id = str(uuid.uuid4())
295         for scenario in cfg["scenarios"]:
296             task_name = os.path.splitext(os.path.basename(self.path))[0]
297             scenario["tc"] = task_name
298             scenario["task_id"] = task_id
299
300         # TODO we need something better here, a class that represent the file
301         return cfg["scenarios"], run_in_parallel, meet_precondition
302
303     def _check_schema(self, cfg_schema, schema_type):
304         '''Check if config file is using the correct schema type'''
305
306         if cfg_schema != "yardstick:" + schema_type + ":0.1":
307             sys.exit("error: file %s has unknown schema %s" % (self.path,
308                                                                cfg_schema))
309
310     def _check_precondition(self, cfg):
311         '''Check if the envrionment meet the preconditon'''
312
313         if "precondition" in cfg:
314             precondition = cfg["precondition"]
315             installer_type = precondition.get("installer_type", None)
316             deploy_scenarios = precondition.get("deploy_scenarios", None)
317             tc_fit_pods = precondition.get("pod_name", None)
318             installer_type_env = os.environ.get('INSTALL_TYPE', None)
319             deploy_scenario_env = os.environ.get('DEPLOY_SCENARIO', None)
320             pod_name_env = os.environ.get('NODE_NAME', None)
321
322             LOG.info("installer_type: %s, installer_type_env: %s",
323                      installer_type, installer_type_env)
324             LOG.info("deploy_scenarios: %s, deploy_scenario_env: %s",
325                      deploy_scenarios, deploy_scenario_env)
326             LOG.info("tc_fit_pods: %s, pod_name_env: %s",
327                      tc_fit_pods, pod_name_env)
328             if installer_type and installer_type_env:
329                 if installer_type_env not in installer_type:
330                     return False
331             if deploy_scenarios and deploy_scenario_env:
332                 deploy_scenarios_list = deploy_scenarios.split(',')
333                 for deploy_scenario in deploy_scenarios_list:
334                     if deploy_scenario_env.startswith(deploy_scenario):
335                         return True
336                 return False
337             if tc_fit_pods and pod_name_env:
338                 if pod_name_env not in tc_fit_pods:
339                     return False
340         return True
341
342
343 def atexit_handler():
344     '''handler for process termination'''
345     base_runner.Runner.terminate_all()
346
347     if len(Context.list) > 0:
348         print "Undeploying all contexts"
349         for context in Context.list:
350             context.undeploy()
351
352
353 def is_ip_addr(addr):
354     '''check if string addr is an IP address'''
355     try:
356         ipaddress.ip_address(unicode(addr))
357         return True
358     except ValueError:
359         return False
360
361
362 def _is_same_heat_context(host_attr, target_attr):
363     '''check if two servers are in the same heat context
364     host_attr: either a name for a server created by yardstick or a dict
365     with attribute name mapping when using external heat templates
366     target_attr: either a name for a server created by yardstick or a dict
367     with attribute name mapping when using external heat templates
368     '''
369     host = None
370     target = None
371     for context in Context.list:
372         if context.__context_type__ != "Heat":
373             continue
374
375         host = context._get_server(host_attr)
376         if host is None:
377             continue
378
379         target = context._get_server(target_attr)
380         if target is None:
381             return False
382
383         # Both host and target is not None, then they are in the
384         # same heat context.
385         return True
386
387     return False
388
389
390 def _is_background_scenario(scenario):
391     if "run_in_background" in scenario:
392         return scenario["run_in_background"]
393     else:
394         return False
395
396
397 def run_one_scenario(scenario_cfg, output_file):
398     '''run one scenario using context'''
399     runner_cfg = scenario_cfg["runner"]
400     runner_cfg['output_filename'] = output_file
401
402     # TODO support get multi hosts/vms info
403     context_cfg = {}
404     if "host" in scenario_cfg:
405         context_cfg['host'] = Context.get_server(scenario_cfg["host"])
406
407     if "target" in scenario_cfg:
408         if is_ip_addr(scenario_cfg["target"]):
409             context_cfg['target'] = {}
410             context_cfg['target']["ipaddr"] = scenario_cfg["target"]
411         else:
412             context_cfg['target'] = Context.get_server(scenario_cfg["target"])
413             if _is_same_heat_context(scenario_cfg["host"],
414                                      scenario_cfg["target"]):
415                 context_cfg["target"]["ipaddr"] = \
416                     context_cfg["target"]["private_ip"]
417             else:
418                 context_cfg["target"]["ipaddr"] = \
419                     context_cfg["target"]["ip"]
420
421     if "targets" in scenario_cfg:
422         ip_list = []
423         for target in scenario_cfg["targets"]:
424             if is_ip_addr(target):
425                 ip_list.append(target)
426                 context_cfg['target'] = {}
427             else:
428                 context_cfg['target'] = Context.get_server(target)
429                 if _is_same_heat_context(scenario_cfg["host"], target):
430                     ip_list.append(context_cfg["target"]["private_ip"])
431                 else:
432                     ip_list.append(context_cfg["target"]["ip"])
433         context_cfg['target']['ipaddr'] = ','.join(ip_list)
434
435     if "nodes" in scenario_cfg:
436         context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
437     runner = base_runner.Runner.get(runner_cfg)
438
439     print "Starting runner of type '%s'" % runner_cfg["type"]
440     runner.run(scenario_cfg, context_cfg)
441
442     return runner
443
444
445 def parse_nodes_with_context(scenario_cfg):
446     '''paras the 'nodes' fields in scenario '''
447     nodes = scenario_cfg["nodes"]
448
449     nodes_cfg = {}
450     for nodename in nodes:
451         nodes_cfg[nodename] = Context.get_server(nodes[nodename])
452
453     return nodes_cfg
454
455
456 def runner_join(runner):
457     '''join (wait for) a runner, exit process at runner failure'''
458     status = runner.join()
459     base_runner.Runner.release(runner)
460     if status != 0:
461         sys.exit("Runner failed")
462
463
464 def print_invalid_header(source_name, args):
465     print(("Invalid %(source)s passed:\n\n %(args)s\n")
466           % {"source": source_name, "args": args})
467
468
469 def parse_task_args(src_name, args):
470     try:
471         kw = args and yaml.safe_load(args)
472         kw = {} if kw is None else kw
473     except yaml.parser.ParserError as e:
474         print_invalid_header(src_name, args)
475         print(("%(source)s has to be YAML. Details:\n\n%(err)s\n")
476               % {"source": src_name, "err": e})
477         raise TypeError()
478
479     if not isinstance(kw, dict):
480         print_invalid_header(src_name, args)
481         print(("%(src)s had to be dict, actually %(src_type)s\n")
482               % {"src": src_name, "src_type": type(kw)})
483         raise TypeError()
484     return kw