# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
#
+import functools
import json
import os
import re
import shutil
import subprocess
import sys
+import time
import urllib2
from datetime import datetime as dt
import yaml
from git import Repo
+from functest.utils.constants import CONST
import functest.utils.functest_logger as ft_logger
+
logger = ft_logger.Logger("functest_utils").getLogger()
'd': details})
+def write_results_to_file(project, case_name, start_date,
+ stop_date, criteria, details):
+ file_path = re.split(r'://', CONST.results_test_db_url)[1]
+
+ try:
+ installer = os.environ['INSTALLER_TYPE']
+ scenario = os.environ['DEPLOY_SCENARIO']
+ pod_name = os.environ['NODE_NAME']
+ except KeyError as e:
+ logger.error("Please set env var: " + str(e))
+ return False
+
+ test_start = dt.fromtimestamp(start_date).strftime('%Y-%m-%d %H:%M:%S')
+ test_stop = dt.fromtimestamp(stop_date).strftime('%Y-%m-%d %H:%M:%S')
+
+ params = {"project_name": project, "case_name": case_name,
+ "pod_name": pod_name, "installer": installer,
+ "scenario": scenario, "criteria": criteria,
+ "start_date": test_start, "stop_date": test_stop,
+ "details": details}
+ try:
+ with open(file_path, "a+w") as outfile:
+ json.dump(params, outfile)
+ outfile.write("\n")
+ return True
+ except Exception as e:
+ logger.error("write result data into a file failed: %s" % e)
+ return False
+
+
def push_results_to_db(project, case_name,
start_date, stop_date, criteria, details):
"""
POST results to the Result target DB
"""
# Retrieve params from CI and conf
- url = get_db_url() + "/results"
+ url = CONST.results_test_db_url + "/results"
try:
installer = os.environ['INSTALLER_TYPE']
return returncode
-def get_deployment_dir():
- """
- Returns current Rally deployment directory
- """
- deployment_name = get_functest_config('rally.deployment_name')
- rally_dir = get_functest_config('general.directories.dir_rally_inst')
- cmd = ("rally deployment list | awk '/" + deployment_name +
- "/ {print $2}'")
- p = subprocess.Popen(cmd, shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- deployment_uuid = p.stdout.readline().rstrip()
- if deployment_uuid == "":
- logger.error("Rally deployment not found.")
- exit(-1)
- deployment_dir = (rally_dir + "/tempest/for-deployment-" +
- deployment_uuid)
- return deployment_dir
-
-
def get_dict_by_test(testname):
with open(get_testcases_file_dir()) as f:
testcases_yaml = yaml.safe_load(f)
yield (k, dict2[k])
-def check_test_result(test_name, ret, start_time, stop_time):
- def get_criteria_value():
- return get_criteria_by_test(test_name).split('==')[1].strip()
-
- status = 'FAIL'
- if str(ret) == get_criteria_value():
- status = 'PASS'
-
- details = {
- 'timestart': start_time,
- 'duration': round(stop_time - start_time, 1),
- 'status': status,
- }
-
- return status, details
-
-
def get_testcases_file_dir():
return get_functest_config('general.functest.testcases_yaml')
def print_separator():
logger.info("==============================================")
+
+
+def timethis(func):
+ """Measure the time it takes for a function to complete"""
+ @functools.wraps(func)
+ def timed(*args, **kwargs):
+ ts = time.time()
+ result = func(*args, **kwargs)
+ te = time.time()
+ elapsed = '{0}'.format(te - ts)
+ logger.info('{f}(*{a}, **{kw}) took: {t} sec'.format(
+ f=func.__name__, a=args, kw=kwargs, t=elapsed))
+ return result, elapsed
+ return timed