import os.path
import re
import shutil
-import socket
import subprocess
import sys
import urllib2
+import dns.resolver
import functest.ci.tier_builder as tb
from git import Repo
return db_url
+def logger_test_results(logger, project, case_name, status, details):
+ pod_name = get_pod_name(logger)
+ scenario = get_scenario(logger)
+ version = get_version(logger)
+ build_tag = get_build_tag(logger)
+
+ logger.info("\n"
+ "****************************************\n"
+ "\t %(p)s/%(n)s results \n\n"
+ "****************************************\n"
+ "DB:\t%(db)s\n"
+ "pod:\t%(pod)s\n"
+ "version:\t%(v)s\n"
+ "scenario:\t%(s)s\n"
+ "status:\t%(c)s\n"
+ "build tag:\t%(b)s\n"
+ "details:\t%(d)s\n"
+ % {
+ 'p': project,
+ 'n': case_name,
+ 'db': get_db_url(),
+ 'pod': pod_name,
+ 'v': version,
+ 's': scenario,
+ 'c': status,
+ 'b': build_tag,
+ 'd': details,
+ })
+
+
def push_results_to_db(project, case_name, logger,
start_date, stop_date, criteria, details):
"""
r = requests.post(url, data=json.dumps(params), headers=headers)
if logger:
logger.debug(r)
+ r.raise_for_status()
return True
except Exception, e:
- print ("Error [push_results_to_db('%s', '%s', '%s', " +
- "'%s', '%s', '%s', '%s', '%s', '%s')]:" %
- (url, project, case_name, pod_name, version,
- scenario, criteria, build_tag, details)), e
+ print("Error [push_results_to_db('%s', '%s', '%s', '%s',"
+ "'%s', '%s', '%s', '%s', '%s')]:" %
+ (url, project, case_name, pod_name, version,
+ scenario, criteria, build_tag, details)), e
return False
nameservers = []
rconf = open("/etc/resolv.conf", "r")
line = rconf.readline()
+ resolver = dns.resolver.Resolver()
while line:
ip = re.search(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", line)
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if ip:
- result = sock.connect_ex((ip.group(), 53))
- if result == 0:
- nameservers.append(ip.group())
+ resolver.nameservers = [str(ip)]
+ try:
+ result = resolver.query('opnfv.org')[0]
+ if result != "":
+ nameservers.append(ip.group())
+ except dns.exception.Timeout:
+ pass
line = rconf.readline()
return nameservers
else:
print(msg_exec)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
- while True:
- line = p.stdout.readline().replace('\n', '')
- if not line:
- break
+ for line in iter(p.stdout.readline, b''):
+ line = line.replace('\n', '')
if logger:
if info:
logger.info(line)
logger.debug(line)
else:
print line
- p.communicate()
- if p.returncode != 0:
+ p.stdout.close()
+ returncode = p.wait()
+ if returncode != 0:
if verbose:
if logger:
logger.error(error_msg)
if exit_on_error:
sys.exit(1)
- return p.returncode
+ return returncode
def get_deployment_dir(logger=None):
# YAML UTILS
#
# -----------------------------------------------------------
-def get_parameter_from_yaml(parameter):
+def get_parameter_from_yaml(parameter, file=None):
"""
Returns the value of a given parameter in config_functest.yaml
parameter must be given in string format with dots
Example: general.openstack.image_name
"""
- with open(os.environ["CONFIG_FUNCTEST_YAML"]) as f:
+ if file is None:
+ file = os.environ["CONFIG_FUNCTEST_YAML"]
+ with open(file) as f:
functest_yaml = yaml.safe_load(f)
f.close()
value = functest_yaml
raise ValueError("The parameter %s is not defined in"
" config_functest.yaml" % parameter)
return value
+
+
+def check_success_rate(case_name, success_rate):
+ success_rate = float(success_rate)
+ criteria = get_criteria_by_test(case_name)
+
+ def get_criteria_value(op):
+ return float(criteria.split(op)[1].rstrip('%'))
+
+ status = 'FAIL'
+ ops = ['==', '>=']
+ for op in ops:
+ if op in criteria:
+ c_value = get_criteria_value(op)
+ if eval("%s %s %s" % (success_rate, op, c_value)):
+ status = 'PASS'
+ break
+
+ return status
+
+
+def merge_dicts(dict1, dict2):
+ for k in set(dict1.keys()).union(dict2.keys()):
+ if k in dict1 and k in dict2:
+ if isinstance(dict1[k], dict) and isinstance(dict2[k], dict):
+ yield (k, dict(merge_dicts(dict1[k], dict2[k])))
+ else:
+ yield (k, dict2[k])
+ elif k in dict1:
+ yield (k, dict1[k])
+ else:
+ yield (k, dict2[k])
+
+
+def check_test_result(test_name, ret, start_time, stop_time):
+ def get_criteria_value():
+ return get_criteria_by_test(test_name).split('==')[1].strip()
+
+ status = 'FAIL'
+ if str(ret) == get_criteria_value():
+ status = 'PASS'
+
+ details = {
+ 'timestart': start_time,
+ 'duration': round(stop_time - start_time, 1),
+ 'status': status,
+ }
+
+ return status, details