# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
#
+import functools
import json
import os
import re
import shutil
import subprocess
import sys
+import time
import urllib2
from datetime import datetime as dt
import yaml
from git import Repo
+from functest.utils import decorators
import functest.utils.functest_logger as ft_logger
logger = ft_logger.Logger("functest_utils").getLogger()
-REPOS_DIR = os.getenv('repos_dir')
-FUNCTEST_REPO = ("%s/functest" % REPOS_DIR)
-
# ----------------------------------------------------------
#
try:
scenario = os.environ['DEPLOY_SCENARIO']
except KeyError:
- logger.error("Impossible to retrieve the scenario")
- scenario = "Unknown_scenario"
+ logger.info("Impossible to retrieve the scenario."
+ "Use default os-nosdn-nofeature-noha")
+ scenario = "os-nosdn-nofeature-noha"
return scenario
try:
return os.environ['NODE_NAME']
except KeyError:
- logger.error(
+ logger.info(
"Unable to retrieve the POD name from environment. " +
"Using pod name 'unknown-pod'")
return "unknown-pod"
try:
build_tag = os.environ['BUILD_TAG']
except KeyError:
- logger.error("Impossible to retrieve the build tag")
- build_tag = "unknown_build_tag"
+ logger.info("Impossible to retrieve the build tag")
+ build_tag = "none"
return build_tag
'd': details})
+@decorators.can_dump_request_to_file
def push_results_to_db(project, case_name,
start_date, stop_date, criteria, details):
"""
while line:
ip = re.search(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", line)
if ip:
- resolver.nameservers = [str(ip)]
+ resolver.nameservers = [ip.group(0)]
try:
result = resolver.query('opnfv.org')[0]
if result != "":
return returncode
-def get_deployment_dir():
- """
- Returns current Rally deployment directory
- """
- deployment_name = get_functest_config('rally.deployment_name')
- rally_dir = get_functest_config('general.directories.dir_rally_inst')
- cmd = ("rally deployment list | awk '/" + deployment_name +
- "/ {print $2}'")
- p = subprocess.Popen(cmd, shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- deployment_uuid = p.stdout.readline().rstrip()
- if deployment_uuid == "":
- logger.error("Rally deployment not found.")
- exit(-1)
- deployment_dir = (rally_dir + "/tempest/for-deployment-" +
- deployment_uuid)
- return deployment_dir
-
-
def get_dict_by_test(testname):
- with open(get_testcases_file()) as f:
+ with open(get_testcases_file_dir()) as f:
testcases_yaml = yaml.safe_load(f)
for dic_tier in testcases_yaml.get("tiers"):
yield (k, dict2[k])
-def check_test_result(test_name, ret, start_time, stop_time):
- def get_criteria_value():
- return get_criteria_by_test(test_name).split('==')[1].strip()
-
- status = 'FAIL'
- if str(ret) == get_criteria_value():
- status = 'PASS'
-
- details = {
- 'timestart': start_time,
- 'duration': round(stop_time - start_time, 1),
- 'status': status,
- }
-
- return status, details
-
-
-def get_testcases_file():
- return FUNCTEST_REPO + "/functest/ci/testcases.yaml"
+def get_testcases_file_dir():
+ return get_functest_config('general.functest.testcases_yaml')
def get_functest_yaml():
functest_yaml = yaml.safe_load(f)
f.close()
return functest_yaml
+
+
+def print_separator():
+ logger.info("==============================================")
+
+
+def timethis(func):
+ """Measure the time it takes for a function to complete"""
+ @functools.wraps(func)
+ def timed(*args, **kwargs):
+ ts = time.time()
+ result = func(*args, **kwargs)
+ te = time.time()
+ elapsed = '{0}'.format(te - ts)
+ logger.info('{f}(*{a}, **{kw}) took: {t} sec'.format(
+ f=func.__name__, a=args, kw=kwargs, t=elapsed))
+ return result, elapsed
+ return timed