3 # jose.lausuch@ericsson.com
4 # valentin.boucher@orange.com
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
19 from datetime import datetime as dt
26 from functest.utils.constants import CONST
27 import functest.utils.functest_logger as ft_logger
30 logger = ft_logger.Logger("functest_utils").getLogger()
33 # ----------------------------------------------------------
37 # -----------------------------------------------------------
38 def check_internet_connectivity(url='http://www.opnfv.org/'):
40 Check if there is access to the internet
43 urllib2.urlopen(url, timeout=5)
45 except urllib2.URLError:
49 def download_url(url, dest_path):
51 Download a file to a destination path given a URL
53 name = url.rsplit('/')[-1]
54 dest = dest_path + "/" + name
56 response = urllib2.urlopen(url)
57 except (urllib2.HTTPError, urllib2.URLError):
60 with open(dest, 'wb') as f:
61 shutil.copyfileobj(response, f)
65 # ----------------------------------------------------------
69 # -----------------------------------------------------------
70 def get_git_branch(repo_path):
74 repo = Repo(repo_path)
75 branch = repo.active_branch
79 def get_installer_type():
81 Get installer type (fuel, apex, joid, compass)
84 installer = os.environ['INSTALLER_TYPE']
86 logger.error("Impossible to retrieve the installer type")
87 installer = "Unknown_installer"
97 scenario = os.environ['DEPLOY_SCENARIO']
99 logger.info("Impossible to retrieve the scenario."
100 "Use default os-nosdn-nofeature-noha")
101 scenario = "os-nosdn-nofeature-noha"
110 # Use the build tag to retrieve the version
111 # By default version is unknown
112 # if launched through CI the build tag has the following format
113 # jenkins-<project>-<installer>-<pod>-<job>-<branch>-<id>
114 # e.g. jenkins-functest-fuel-opnfv-jump-2-daily-master-190
115 # use regex to match branch info
116 rule = "daily-(.+?)-[0-9]*"
117 build_tag = get_build_tag()
118 m = re.search(rule, build_tag)
127 Get PoD Name from env variable NODE_NAME
130 return os.environ['NODE_NAME']
133 "Unable to retrieve the POD name from environment. " +
134 "Using pod name 'unknown-pod'")
140 Get build tag of jenkins jobs
143 build_tag = os.environ['BUILD_TAG']
145 logger.info("Impossible to retrieve the build tag")
155 return get_functest_config('results.test_db_url')
158 def logger_test_results(project, case_name, status, details):
159 pod_name = get_pod_name()
160 scenario = get_scenario()
161 version = get_version()
162 build_tag = get_build_tag()
166 "****************************************\n"
167 "\t %(p)s/%(n)s results \n\n"
168 "****************************************\n"
174 "build tag:\t%(b)s\n"
187 def write_results_to_file(project, case_name, start_date,
188 stop_date, criteria, details):
189 file_path = re.split(r'://', CONST.results_test_db_url)[1]
192 installer = os.environ['INSTALLER_TYPE']
193 scenario = os.environ['DEPLOY_SCENARIO']
194 pod_name = os.environ['NODE_NAME']
195 except KeyError as e:
196 logger.error("Please set env var: " + str(e))
199 test_start = dt.fromtimestamp(start_date).strftime('%Y-%m-%d %H:%M:%S')
200 test_stop = dt.fromtimestamp(stop_date).strftime('%Y-%m-%d %H:%M:%S')
202 params = {"project_name": project, "case_name": case_name,
203 "pod_name": pod_name, "installer": installer,
204 "scenario": scenario, "criteria": criteria,
205 "start_date": test_start, "stop_date": test_stop,
208 with open(file_path, "a+w") as outfile:
209 json.dump(params, outfile)
212 except Exception as e:
213 logger.error("write result data into a file failed: %s" % e)
217 def push_results_to_db(project, case_name,
218 start_date, stop_date, criteria, details):
220 POST results to the Result target DB
222 # Retrieve params from CI and conf
223 url = CONST.results_test_db_url + "/results"
226 installer = os.environ['INSTALLER_TYPE']
227 scenario = os.environ['DEPLOY_SCENARIO']
228 pod_name = os.environ['NODE_NAME']
229 build_tag = os.environ['BUILD_TAG']
230 except KeyError as e:
231 logger.error("Please set env var: " + str(e))
233 rule = "daily-(.+?)-[0-9]*"
234 m = re.search(rule, build_tag)
238 logger.error("Please fix BUILD_TAG env var: " + build_tag)
240 test_start = dt.fromtimestamp(start_date).strftime('%Y-%m-%d %H:%M:%S')
241 test_stop = dt.fromtimestamp(stop_date).strftime('%Y-%m-%d %H:%M:%S')
243 params = {"project_name": project, "case_name": case_name,
244 "pod_name": pod_name, "installer": installer,
245 "version": version, "scenario": scenario, "criteria": criteria,
246 "build_tag": build_tag, "start_date": test_start,
247 "stop_date": test_stop, "details": details}
250 headers = {'Content-Type': 'application/json'}
252 r = requests.post(url, data=json.dumps(params), headers=headers)
255 except requests.RequestException as exc:
257 error = ("Pushing Result to DB(%s) failed: %s" %
260 error = ("Pushing Result to DB(%s) failed: %s" % (url, exc))
261 except Exception as e:
262 error = ("Error [push_results_to_db("
264 "project: '%(project)s', "
268 "scenario: '%(s)s', "
269 "criteria: '%(c)s', "
270 "build_tag: '%(t)s', "
271 "details: '%(d)s')]: "
292 def get_resolvconf_ns():
294 Get nameservers from current resolv.conf
297 rconf = open("/etc/resolv.conf", "r")
298 line = rconf.readline()
299 resolver = dns.resolver.Resolver()
301 ip = re.search(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", line)
303 resolver.nameservers = [str(ip)]
305 result = resolver.query('opnfv.org')[0]
307 nameservers.append(ip.group())
308 except dns.exception.Timeout:
310 line = rconf.readline()
314 def get_ci_envvars():
316 Get the CI env variables
319 "installer": os.environ.get('INSTALLER_TYPE'),
320 "scenario": os.environ.get('DEPLOY_SCENARIO')}
324 def execute_command(cmd, info=False, error_msg="",
325 verbose=True, output_file=None):
327 error_msg = ("The command '%s' failed." % cmd)
328 msg_exec = ("Executing command: '%s'" % cmd)
331 logger.info(msg_exec)
333 logger.debug(msg_exec)
334 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
335 stderr=subprocess.STDOUT)
337 f = open(output_file, "w")
338 for line in iter(p.stdout.readline, b''):
342 line = line.replace('\n', '')
348 returncode = p.wait()
351 logger.error(error_msg)
356 def get_dict_by_test(testname):
357 with open(get_testcases_file_dir()) as f:
358 testcases_yaml = yaml.safe_load(f)
360 for dic_tier in testcases_yaml.get("tiers"):
361 for dic_testcase in dic_tier['testcases']:
362 if dic_testcase['name'] == testname:
365 logger.error('Project %s is not defined in testcases.yaml' % testname)
369 def get_criteria_by_test(testname):
370 dict = get_dict_by_test(testname)
372 return dict['criteria']
376 # ----------------------------------------------------------
380 # -----------------------------------------------------------
381 def get_parameter_from_yaml(parameter, file):
383 Returns the value of a given parameter in file.yaml
384 parameter must be given in string format with dots
385 Example: general.openstack.image_name
387 with open(file) as f:
388 file_yaml = yaml.safe_load(f)
391 for element in parameter.split("."):
392 value = value.get(element)
394 raise ValueError("The parameter %s is not defined in"
395 " config_functest.yaml" % parameter)
399 def get_functest_config(parameter):
400 yaml_ = os.environ["CONFIG_FUNCTEST_YAML"]
401 return get_parameter_from_yaml(parameter, yaml_)
404 def check_success_rate(case_name, success_rate):
405 success_rate = float(success_rate)
406 criteria = get_criteria_by_test(case_name)
408 def get_criteria_value(op):
409 return float(criteria.split(op)[1].rstrip('%'))
415 c_value = get_criteria_value(op)
416 if eval("%s %s %s" % (success_rate, op, c_value)):
423 def merge_dicts(dict1, dict2):
424 for k in set(dict1.keys()).union(dict2.keys()):
425 if k in dict1 and k in dict2:
426 if isinstance(dict1[k], dict) and isinstance(dict2[k], dict):
427 yield (k, dict(merge_dicts(dict1[k], dict2[k])))
436 def get_testcases_file_dir():
437 return get_functest_config('general.functest.testcases_yaml')
440 def get_functest_yaml():
441 with open(os.environ["CONFIG_FUNCTEST_YAML"]) as f:
442 functest_yaml = yaml.safe_load(f)
447 def print_separator():
448 logger.info("==============================================")
452 """Measure the time it takes for a function to complete"""
453 @functools.wraps(func)
454 def timed(*args, **kwargs):
456 result = func(*args, **kwargs)
458 elapsed = '{0}'.format(te - ts)
459 logger.info('{f}(*{a}, **{kw}) took: {t} sec'.format(
460 f=func.__name__, a=args, kw=kwargs, t=elapsed))
461 return result, elapsed