#
import functools
import json
+import logging
import os
+import pkg_resources
import re
import shutil
import subprocess
import sys
import time
-import urllib2
from datetime import datetime as dt
import dns.resolver
import requests
+from six.moves import urllib
import yaml
-from git import Repo
+from functest.utils import constants
from functest.utils import decorators
-import functest.utils.functest_logger as ft_logger
+from functest.utils.constants import CONST
-logger = ft_logger.Logger("functest_utils").getLogger()
+logger = logging.getLogger(__name__)
# ----------------------------------------------------------
Check if there is access to the internet
"""
try:
- urllib2.urlopen(url, timeout=5)
+ urllib.request.urlopen(url, timeout=5)
return True
- except urllib2.URLError:
+ except urllib.error.URLError:
return False
name = url.rsplit('/')[-1]
dest = dest_path + "/" + name
try:
- response = urllib2.urlopen(url)
- except (urllib2.HTTPError, urllib2.URLError):
+ response = urllib.request.urlopen(url)
+ except (urllib.error.HTTPError, urllib.error.URLError):
return False
with open(dest, 'wb') as f:
# CI UTILS
#
# -----------------------------------------------------------
-def get_git_branch(repo_path):
- """
- Get git branch name
- """
- repo = Repo(repo_path)
- branch = repo.active_branch
- return branch.name
-
-
def get_installer_type():
"""
Get installer type (fuel, apex, joid, compass)
# jenkins-functest-fuel-baremetal-weekly-master-8
# use regex to match branch info
rule = "(dai|week)ly-(.+?)-[0-9]*"
- build_tag = get_build_tag()
+ build_tag = CONST.__getattribute__('BUILD_TAG')
+ if not build_tag:
+ build_tag = 'none'
m = re.search(rule, build_tag)
if m:
return m.group(2)
return "unknown-pod"
-def get_build_tag():
- """
- Get build tag of jenkins jobs
- """
- try:
- build_tag = os.environ['BUILD_TAG']
- except KeyError:
- logger.info("Impossible to retrieve the build tag")
- build_tag = "none"
-
- return build_tag
-
-
-def get_db_url():
+def logger_test_results(project, case_name, status, details):
"""
- Returns DB URL
+ Format test case results for the logger
"""
- # TODO use CONST mechanism
- try:
- # if TEST_DB_URL declared in env variable, use it!
- db_url = os.environ['TEST_DB_URL']
- except KeyError:
- db_url = get_functest_config('results.test_db_url')
- return db_url
-
-
-def logger_test_results(project, case_name, status, details):
pod_name = get_pod_name()
scenario = get_scenario()
version = get_version()
- build_tag = get_build_tag()
+ build_tag = CONST.__getattribute__('BUILD_TAG')
+ db_url = CONST.__getattribute__("results_test_db_url")
logger.info(
"\n"
"details:\t%(d)s\n"
% {'p': project,
'n': case_name,
- 'db': get_db_url(),
+ 'db': db_url,
'pod': pod_name,
'v': version,
's': scenario,
@decorators.can_dump_request_to_file
def push_results_to_db(project, case_name,
- start_date, stop_date, criteria, details):
+ start_date, stop_date, result, details):
"""
POST results to the Result target DB
"""
# Retrieve params from CI and conf
- url = get_db_url()
+ url = CONST.__getattribute__("results_test_db_url")
try:
installer = os.environ['INSTALLER_TYPE']
params = {"project_name": project, "case_name": case_name,
"pod_name": pod_name, "installer": installer,
- "version": version, "scenario": scenario, "criteria": criteria,
+ "version": version, "scenario": scenario, "criteria": result,
"build_tag": build_tag, "start_date": test_start,
"stop_date": test_stop, "details": details}
error = None
headers = {'Content-Type': 'application/json'}
try:
- r = requests.post(url, data=json.dumps(params), headers=headers)
+ r = requests.post(url, data=json.dumps(params, sort_keys=True),
+ headers=headers)
logger.debug(r)
r.raise_for_status()
except requests.RequestException as exc:
'pod': pod_name,
'v': version,
's': scenario,
- 'c': criteria,
+ 'c': result,
't': build_tag,
'd': details,
'error': e
def execute_command_raise(cmd, info=False, error_msg="",
- verbose=True, output_file=None):
- ret = execute_command(cmd, info, error_msg, verbose, output_file)
+ verbose=True, output_file=None, env=None):
+ ret = execute_command(cmd, info, error_msg, verbose, output_file, env)
if ret != 0:
raise Exception(error_msg)
def execute_command(cmd, info=False, error_msg="",
- verbose=True, output_file=None):
+ verbose=True, output_file=None, env=None):
if not error_msg:
error_msg = ("The command '%s' failed." % cmd)
msg_exec = ("Executing command: '%s'" % cmd)
logger.info(msg_exec)
else:
logger.debug(msg_exec)
- p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
+ p = subprocess.Popen(cmd, env=env, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
if output_file:
f = open(output_file, "w")
f.write(line)
else:
line = line.replace('\n', '')
- print line
+ print(line)
sys.stdout.flush()
if output_file:
f.close()
def get_dict_by_test(testname):
- with open(get_testcases_file_dir()) as f:
+ with open(pkg_resources.resource_filename(
+ 'functest', 'ci/testcases.yaml')) as f:
testcases_yaml = yaml.safe_load(f)
for dic_tier in testcases_yaml.get("tiers"):
def get_functest_config(parameter):
- yaml_ = os.environ["CONFIG_FUNCTEST_YAML"]
+ yaml_ = constants.CONST.__getattribute__('CONFIG_FUNCTEST_YAML')
return get_parameter_from_yaml(parameter, yaml_)
-def check_success_rate(case_name, success_rate):
- success_rate = float(success_rate)
- criteria = get_criteria_by_test(case_name)
-
- def get_criteria_value(op):
- return float(criteria.split(op)[1].rstrip('%'))
-
- status = 'FAIL'
- ops = ['==', '>=']
- for op in ops:
- if op in criteria:
- c_value = get_criteria_value(op)
- if eval("%s %s %s" % (success_rate, op, c_value)):
- status = 'PASS'
- break
-
- return status
-
-
def merge_dicts(dict1, dict2):
for k in set(dict1.keys()).union(dict2.keys()):
if k in dict1 and k in dict2:
yield (k, dict2[k])
-def get_testcases_file_dir():
- return get_functest_config('general.functest.testcases_yaml')
-
-
def get_functest_yaml():
- with open(os.environ["CONFIG_FUNCTEST_YAML"]) as f:
+ with open(constants.CONST.__getattribute__('CONFIG_FUNCTEST_YAML')) as f:
functest_yaml = yaml.safe_load(f)
f.close()
return functest_yaml