# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
#
+import functools
import json
import os
import re
import shutil
import subprocess
import sys
+import time
import urllib2
from datetime import datetime as dt
import yaml
from git import Repo
+from functest.utils import decorators
import functest.utils.functest_logger as ft_logger
logger = ft_logger.Logger("functest_utils").getLogger()
-REPOS_DIR = os.getenv('repos_dir')
-FUNCTEST_REPO = ("%s/functest" % REPOS_DIR)
-
# ----------------------------------------------------------
#
try:
scenario = os.environ['DEPLOY_SCENARIO']
except KeyError:
- logger.error("Impossible to retrieve the scenario")
- scenario = "Unknown_scenario"
+ logger.info("Impossible to retrieve the scenario."
+ "Use default os-nosdn-nofeature-noha")
+ scenario = "os-nosdn-nofeature-noha"
return scenario
# if launched through CI the build tag has the following format
# jenkins-<project>-<installer>-<pod>-<job>-<branch>-<id>
# e.g. jenkins-functest-fuel-opnfv-jump-2-daily-master-190
+ # jenkins-functest-fuel-baremetal-weekly-master-8
# use regex to match branch info
- rule = "daily-(.+?)-[0-9]*"
+ rule = "(dai|week)ly-(.+?)-[0-9]*"
build_tag = get_build_tag()
m = re.search(rule, build_tag)
if m:
- return m.group(1)
+ return m.group(2)
else:
return "unknown"
try:
return os.environ['NODE_NAME']
except KeyError:
- logger.error(
+ logger.info(
"Unable to retrieve the POD name from environment. " +
"Using pod name 'unknown-pod'")
return "unknown-pod"
try:
build_tag = os.environ['BUILD_TAG']
except KeyError:
- logger.error("Impossible to retrieve the build tag")
- build_tag = "unknown_build_tag"
+ logger.info("Impossible to retrieve the build tag")
+ build_tag = "none"
return build_tag
"""
Returns DB URL
"""
- return get_functest_config('results.test_db_url')
+ # TODO use CONST mechanism
+ try:
+ # if TEST_DB_URL declared in env variable, use it!
+ db_url = os.environ['TEST_DB_URL']
+ except KeyError:
+ db_url = get_functest_config('results.test_db_url')
+ return db_url
def logger_test_results(project, case_name, status, details):
'd': details})
+@decorators.can_dump_request_to_file
def push_results_to_db(project, case_name,
start_date, stop_date, criteria, details):
"""
except KeyError as e:
logger.error("Please set env var: " + str(e))
return False
- rule = "daily-(.+?)-[0-9]*"
- m = re.search(rule, build_tag)
- if m:
- version = m.group(1)
- else:
- logger.error("Please fix BUILD_TAG env var: " + build_tag)
- return False
+ version = get_version()
test_start = dt.fromtimestamp(start_date).strftime('%Y-%m-%d %H:%M:%S')
test_stop = dt.fromtimestamp(stop_date).strftime('%Y-%m-%d %H:%M:%S')
while line:
ip = re.search(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", line)
if ip:
- resolver.nameservers = [str(ip)]
+ resolver.nameservers = [ip.group(0)]
try:
result = resolver.query('opnfv.org')[0]
if result != "":
return ci_env_var
+def execute_command_raise(cmd, info=False, error_msg="",
+ verbose=True, output_file=None):
+ ret = execute_command(cmd, info, error_msg, verbose, output_file)
+ if ret != 0:
+ raise Exception(error_msg)
+
+
def execute_command(cmd, info=False, error_msg="",
verbose=True, output_file=None):
if not error_msg:
return returncode
-def get_deployment_dir():
- """
- Returns current Rally deployment directory
- """
- deployment_name = get_functest_config('rally.deployment_name')
- rally_dir = get_functest_config('general.directories.dir_rally_inst')
- cmd = ("rally deployment list | awk '/" + deployment_name +
- "/ {print $2}'")
- p = subprocess.Popen(cmd, shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- deployment_uuid = p.stdout.readline().rstrip()
- if deployment_uuid == "":
- logger.error("Rally deployment not found.")
- exit(-1)
- deployment_dir = (rally_dir + "/tempest/for-deployment-" +
- deployment_uuid)
- return deployment_dir
-
-
def get_dict_by_test(testname):
- with open(get_testcases_file()) as f:
+ with open(get_testcases_file_dir()) as f:
testcases_yaml = yaml.safe_load(f)
for dic_tier in testcases_yaml.get("tiers"):
value = value.get(element)
if value is None:
raise ValueError("The parameter %s is not defined in"
- " config_functest.yaml" % parameter)
+ " %s" % (parameter, file))
return value
yield (k, dict2[k])
-def check_test_result(test_name, ret, start_time, stop_time):
- def get_criteria_value():
- return get_criteria_by_test(test_name).split('==')[1].strip()
-
- status = 'FAIL'
- if str(ret) == get_criteria_value():
- status = 'PASS'
-
- details = {
- 'timestart': start_time,
- 'duration': round(stop_time - start_time, 1),
- 'status': status,
- }
-
- return status, details
-
-
-def get_testcases_file():
- return FUNCTEST_REPO + "/functest/ci/testcases.yaml"
+def get_testcases_file_dir():
+ return get_functest_config('general.functest.testcases_yaml')
def get_functest_yaml():
functest_yaml = yaml.safe_load(f)
f.close()
return functest_yaml
+
+
+def print_separator():
+ logger.info("==============================================")
+
+
+def timethis(func):
+ """Measure the time it takes for a function to complete"""
+ @functools.wraps(func)
+ def timed(*args, **kwargs):
+ ts = time.time()
+ result = func(*args, **kwargs)
+ te = time.time()
+ elapsed = '{0}'.format(te - ts)
+ logger.info('{f}(*{a}, **{kw}) took: {t} sec'.format(
+ f=func.__name__, a=args, kw=kwargs, t=elapsed))
+ return result, elapsed
+ return timed