1e88dadb7920affd81face25fcffeaec758aea3c
[functest.git] / functest / utils / functest_utils.py
1 #!/usr/bin/env python
2 #
3 # jose.lausuch@ericsson.com
4 # valentin.boucher@orange.com
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 import functools
11 import logging
12 import os
13 import pkg_resources
14 import re
15 import shutil
16 import subprocess
17 import sys
18 import time
19
20 import dns.resolver
21 from six.moves import urllib
22 import yaml
23
24 from functest.utils import constants
25
26 logger = logging.getLogger(__name__)
27
28
29 # ----------------------------------------------------------
30 #
31 #               INTERNET UTILS
32 #
33 # -----------------------------------------------------------
34 def check_internet_connectivity(url='http://www.opnfv.org/'):
35     """
36     Check if there is access to the internet
37     """
38     try:
39         urllib.request.urlopen(url, timeout=5)
40         return True
41     except urllib.error.URLError:
42         return False
43
44
45 def download_url(url, dest_path):
46     """
47     Download a file to a destination path given a URL
48     """
49     name = url.rsplit('/')[-1]
50     dest = dest_path + "/" + name
51     try:
52         response = urllib.request.urlopen(url)
53     except (urllib.error.HTTPError, urllib.error.URLError):
54         return False
55
56     with open(dest, 'wb') as f:
57         shutil.copyfileobj(response, f)
58     return True
59
60
61 # ----------------------------------------------------------
62 #
63 #               CI UTILS
64 #
65 # -----------------------------------------------------------
66 def get_resolvconf_ns():
67     """
68     Get nameservers from current resolv.conf
69     """
70     nameservers = []
71     rconf = open("/etc/resolv.conf", "r")
72     line = rconf.readline()
73     resolver = dns.resolver.Resolver()
74     while line:
75         ip = re.search(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", line)
76         if ip:
77             resolver.nameservers = [ip.group(0)]
78             try:
79                 result = resolver.query('opnfv.org')[0]
80                 if result != "":
81                     nameservers.append(ip.group())
82             except dns.exception.Timeout:
83                 pass
84         line = rconf.readline()
85     return nameservers
86
87
88 def get_ci_envvars():
89     """
90     Get the CI env variables
91     """
92     ci_env_var = {
93         "installer": os.environ.get('INSTALLER_TYPE'),
94         "scenario": os.environ.get('DEPLOY_SCENARIO')}
95     return ci_env_var
96
97
98 def execute_command_raise(cmd, info=False, error_msg="",
99                           verbose=True, output_file=None):
100     ret = execute_command(cmd, info, error_msg, verbose, output_file)
101     if ret != 0:
102         raise Exception(error_msg)
103
104
105 def execute_command(cmd, info=False, error_msg="",
106                     verbose=True, output_file=None):
107     if not error_msg:
108         error_msg = ("The command '%s' failed." % cmd)
109     msg_exec = ("Executing command: '%s'" % cmd)
110     if verbose:
111         if info:
112             logger.info(msg_exec)
113         else:
114             logger.debug(msg_exec)
115     p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
116                          stderr=subprocess.STDOUT)
117     if output_file:
118         f = open(output_file, "w")
119     for line in iter(p.stdout.readline, b''):
120         if output_file:
121             f.write(line)
122         else:
123             line = line.replace('\n', '')
124             print(line)
125             sys.stdout.flush()
126     if output_file:
127         f.close()
128     p.stdout.close()
129     returncode = p.wait()
130     if returncode != 0:
131         if verbose:
132             logger.error(error_msg)
133
134     return returncode
135
136
137 def get_dict_by_test(testname):
138     with open(pkg_resources.resource_filename(
139             'functest', 'ci/testcases.yaml')) as f:
140         testcases_yaml = yaml.safe_load(f)
141
142     for dic_tier in testcases_yaml.get("tiers"):
143         for dic_testcase in dic_tier['testcases']:
144             if dic_testcase['case_name'] == testname:
145                 return dic_testcase
146
147     logger.error('Project %s is not defined in testcases.yaml' % testname)
148     return None
149
150
151 def get_criteria_by_test(testname):
152     dict = get_dict_by_test(testname)
153     if dict:
154         return dict['criteria']
155     return None
156
157
158 # ----------------------------------------------------------
159 #
160 #               YAML UTILS
161 #
162 # -----------------------------------------------------------
163 def get_parameter_from_yaml(parameter, file):
164     """
165     Returns the value of a given parameter in file.yaml
166     parameter must be given in string format with dots
167     Example: general.openstack.image_name
168     """
169     with open(file) as f:
170         file_yaml = yaml.safe_load(f)
171     f.close()
172     value = file_yaml
173     for element in parameter.split("."):
174         value = value.get(element)
175         if value is None:
176             raise ValueError("The parameter %s is not defined in"
177                              " %s" % (parameter, file))
178     return value
179
180
181 def get_functest_config(parameter):
182     yaml_ = constants.CONST.__getattribute__('CONFIG_FUNCTEST_YAML')
183     return get_parameter_from_yaml(parameter, yaml_)
184
185
186 def get_functest_yaml():
187     with open(constants.CONST.__getattribute__('CONFIG_FUNCTEST_YAML')) as f:
188         functest_yaml = yaml.safe_load(f)
189     f.close()
190     return functest_yaml
191
192
193 def print_separator():
194     logger.info("==============================================")
195
196
197 def timethis(func):
198     """Measure the time it takes for a function to complete"""
199     @functools.wraps(func)
200     def timed(*args, **kwargs):
201         ts = time.time()
202         result = func(*args, **kwargs)
203         te = time.time()
204         elapsed = '{0}'.format(te - ts)
205         logger.info('{f}(*{a}, **{kw}) took: {t} sec'.format(
206             f=func.__name__, a=args, kw=kwargs, t=elapsed))
207         return result, elapsed
208     return timed