# http://www.apache.org/licenses/LICENSE-2.0
#
+""" global variables """
+
+from datetime import datetime as dt
import json
import os
import os.path
import re
-import requests
import shutil
-import socket
import subprocess
+import sys
import urllib2
+import dns.resolver
+
+import functest.ci.tier_builder as tb
from git import Repo
+import requests
+import yaml
+
+
+REPOS_DIR = os.getenv('repos_dir')
+FUNCTEST_REPO = ("%s/functest/" % REPOS_DIR)
# ----------------------------------------------------------
return build_tag
-def push_results_to_db(db_url, project, case_name, logger, pod_name,
- version, scenario, criteria, build_tag, payload):
+def get_db_url(logger=None):
+ """
+ Returns DB URL
+ """
+ with open(os.environ["CONFIG_FUNCTEST_YAML"]) as f:
+ functest_yaml = yaml.safe_load(f)
+ f.close()
+ db_url = functest_yaml.get("results").get("test_db_url")
+ return db_url
+
+
+def logger_test_results(logger, project, case_name, status, details):
+ pod_name = get_pod_name(logger)
+ scenario = get_scenario(logger)
+ version = get_version(logger)
+ build_tag = get_build_tag(logger)
+
+ logger.info("\n"
+ "****************************************\n"
+ "\t %(p)s/%(n)s results \n\n"
+ "****************************************\n"
+ "DB:\t%(db)s\n"
+ "pod:\t%(pod)s\n"
+ "version:\t%(v)s\n"
+ "scenario:\t%(s)s\n"
+ "status:\t%(c)s\n"
+ "build tag:\t%(b)s\n"
+ "details:\t%(d)s\n"
+ % {
+ 'p': project,
+ 'n': case_name,
+ 'db': get_db_url(),
+ 'pod': pod_name,
+ 'v': version,
+ 's': scenario,
+ 'c': status,
+ 'b': build_tag,
+ 'd': details,
+ })
+
+
+def push_results_to_db(project, case_name, logger,
+ start_date, stop_date, criteria, details):
"""
POST results to the Result target DB
"""
- url = db_url + "/results"
+ # Retrieve params from CI and conf
+ url = get_db_url(logger) + "/results"
installer = get_installer_type(logger)
+ scenario = get_scenario(logger)
+ version = get_version(logger)
+ pod_name = get_pod_name(logger)
+ build_tag = get_build_tag(logger)
+ test_start = dt.fromtimestamp(start_date).strftime('%Y-%m-%d %H:%M:%S')
+ test_stop = dt.fromtimestamp(stop_date).strftime('%Y-%m-%d %H:%M:%S')
+
params = {"project_name": project, "case_name": case_name,
"pod_name": pod_name, "installer": installer,
"version": version, "scenario": scenario, "criteria": criteria,
- "build_tag": build_tag, "details": payload}
+ "build_tag": build_tag, "start_date": test_start,
+ "stop_date": test_stop, "details": details}
headers = {'Content-Type': 'application/json'}
try:
r = requests.post(url, data=json.dumps(params), headers=headers)
if logger:
logger.debug(r)
+ r.raise_for_status()
return True
except Exception, e:
- print ("Error [push_results_to_db('%s', '%s', '%s', " +
- "'%s', '%s', '%s', '%s', '%s', '%s')]:" %
- (db_url, project, case_name, pod_name, version,
- scenario, criteria, build_tag, payload), e)
+ print("Error [push_results_to_db('%s', '%s', '%s', '%s',"
+ "'%s', '%s', '%s', '%s', '%s')]:" %
+ (url, project, case_name, pod_name, version,
+ scenario, criteria, build_tag, details)), e
return False
nameservers = []
rconf = open("/etc/resolv.conf", "r")
line = rconf.readline()
+ resolver = dns.resolver.Resolver()
while line:
ip = re.search(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", line)
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if ip:
- result = sock.connect_ex((ip.group(), 53))
- if result == 0:
- nameservers.append(ip.group())
+ resolver.nameservers = [str(ip)]
+ try:
+ result = resolver.query('opnfv.org')[0]
+ if result != "":
+ nameservers.append(ip.group())
+ except dns.exception.Timeout:
+ pass
line = rconf.readline()
return nameservers
-def getTestEnv(test, functest_yaml):
- """
- Get the config of the testcase based on functest_config.yaml
- 2 options
- - test = test project e.g; ovno
- - test = testcase e.g. functest/odl
- look for the / to see if it is a test project or a testcase
- """
- try:
- TEST_ENV = functest_yaml.get("test-dependencies")
-
- if test.find("/") < 0:
- config_test = TEST_ENV[test]
- else:
- test_split = test.split("/")
- testproject = test_split[0]
- testcase = test_split[1]
- config_test = TEST_ENV[testproject][testcase]
- except KeyError:
- # if not defined in dependencies => no dependencies
- config_test = ""
- except Exception, e:
- print "Error [getTestEnv]:", e
-
- return config_test
-
-
def get_ci_envvars():
"""
Get the CI env variables
return ci_env_var
-def isTestRunnable(test, functest_yaml):
- """
- Return True if the test is runnable in the current scenario
- """
- # By default we assume that all the tests are always runnable...
- is_runnable = True
- # Retrieve CI environment
- ci_env = get_ci_envvars()
- # Retrieve test environement from config file
- test_env = getTestEnv(test, functest_yaml)
-
- # if test_env not empty => dependencies to be checked
- if test_env is not None and len(test_env) > 0:
- # possible criteria = ["installer", "scenario"]
- # consider test criteria from config file
- # compare towards CI env through CI en variable
- for criteria in test_env:
- if re.search(test_env[criteria], ci_env[criteria]) is None:
- # print "Test "+ test + " cannot be run on the environment"
- is_runnable = False
- return is_runnable
-
-
-def generateTestcaseList(functest_yaml):
- """
- Generate a test file with the runnable test according to
- the current scenario
- """
- test_list = ""
- # get testcases
- testcase_list = functest_yaml.get("test-dependencies")
- projects = testcase_list.keys()
-
- for project in projects:
- testcases = testcase_list[project]
- # 1 or 2 levels for testcases project[/case]l
- # if only project name without controller or scenario
- # => shall be runnable on any controller/scenario
- if testcases is None:
- test_list += project + " "
+def execute_command(cmd, logger=None,
+ exit_on_error=True,
+ info=False,
+ error_msg="",
+ verbose=True):
+ if not error_msg:
+ error_msg = ("The command '%s' failed." % cmd)
+ msg_exec = ("Executing command: '%s'" % cmd)
+ if verbose:
+ if logger:
+ if info:
+ logger.info(msg_exec)
+ else:
+ logger.debug(msg_exec)
+ else:
+ print(msg_exec)
+ p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
+ for line in iter(p.stdout.readline, b''):
+ line = line.replace('\n', '')
+ if logger:
+ if info:
+ logger.info(line)
+ else:
+ logger.debug(line)
else:
- for testcase in testcases:
- if testcase == "installer" or testcase == "scenario":
- # project (1 level)
- if isTestRunnable(project, functest_yaml):
- test_list += project + " "
- else:
- # project/testcase (2 levels)
- thetest = project + "/" + testcase
- if isTestRunnable(thetest, functest_yaml):
- test_list += testcase + " "
-
- # sort the list to execute the test in the right order
- test_order_list = functest_yaml.get("test_exec_priority")
- test_sorted_list = ""
- for test in test_order_list:
- if test_order_list[test] in test_list:
- test_sorted_list += test_order_list[test] + " "
-
- # create a file that could be consumed by run-test.sh
- # this method is used only for CI
- # so it can be run only in container
- # reuse default conf directory to store the list of runnable tests
- file = open("/home/opnfv/functest/conf/testcase-list.txt", 'w')
- file.write(test_sorted_list)
- file.close()
-
- return test_sorted_list
-
-
-def execute_command(cmd, logger=None, exit_on_error=True):
+ print line
+ p.stdout.close()
+ returncode = p.wait()
+ if returncode != 0:
+ if verbose:
+ if logger:
+ logger.error(error_msg)
+ else:
+ print(error_msg)
+ if exit_on_error:
+ sys.exit(1)
+
+ return returncode
+
+
+def get_deployment_dir(logger=None):
"""
- Execute Linux command
- prints stdout to a file and depending on if there
- is a logger defined, it will print it or not.
+ Returns current Rally deployment directory
"""
- if logger:
- logger.debug('Executing command : {}'.format(cmd))
- output_file = "output.txt"
- f = open(output_file, 'w+')
- p = subprocess.call(cmd, shell=True, stdout=f, stderr=subprocess.STDOUT)
+ with open(os.environ["CONFIG_FUNCTEST_YAML"]) as f:
+ functest_yaml = yaml.safe_load(f)
f.close()
- f = open(output_file, 'r')
- result = f.read()
- if result != "" and logger:
- logger.debug(result)
- if p == 0:
- return True
- else:
+ deployment_name = functest_yaml.get("rally").get("deployment_name")
+ rally_dir = functest_yaml.get("general").get("directories").get(
+ "dir_rally_inst")
+ cmd = ("rally deployment list | awk '/" + deployment_name +
+ "/ {print $2}'")
+ p = subprocess.Popen(cmd, shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ deployment_uuid = p.stdout.readline().rstrip()
+ if deployment_uuid == "":
if logger:
- logger.error("Error when executing command %s" % cmd)
- if exit_on_error:
- exit(-1)
- return False
+ logger.error("Rally deployment not found.")
+ exit(-1)
+ deployment_dir = (rally_dir + "/tempest/for-deployment-" +
+ deployment_uuid)
+ return deployment_dir
+
+
+def get_criteria_by_test(testname):
+ criteria = ""
+ file = FUNCTEST_REPO + "/ci/testcases.yaml"
+ tiers = tb.TierBuilder("", "", file)
+ for tier in tiers.get_tiers():
+ for test in tier.get_tests():
+ if test.get_name() == testname:
+ criteria = test.get_criteria()
+
+ return criteria
+
+
+# ----------------------------------------------------------
+#
+# YAML UTILS
+#
+# -----------------------------------------------------------
+def get_parameter_from_yaml(parameter, file=None):
+ """
+ Returns the value of a given parameter in config_functest.yaml
+ parameter must be given in string format with dots
+ Example: general.openstack.image_name
+ """
+ if file is None:
+ file = os.environ["CONFIG_FUNCTEST_YAML"]
+ with open(file) as f:
+ functest_yaml = yaml.safe_load(f)
+ f.close()
+ value = functest_yaml
+ for element in parameter.split("."):
+ value = value.get(element)
+ if value is None:
+ raise ValueError("The parameter %s is not defined in"
+ " config_functest.yaml" % parameter)
+ return value
+
+
+def check_success_rate(case_name, success_rate):
+ success_rate = float(success_rate)
+ criteria = get_criteria_by_test(case_name)
+
+ def get_criteria_value(op):
+ return float(criteria.split(op)[1].rstrip('%'))
+
+ status = 'FAIL'
+ ops = ['==', '>=']
+ for op in ops:
+ if op in criteria:
+ c_value = get_criteria_value(op)
+ if eval("%s %s %s" % (success_rate, op, c_value)):
+ status = 'PASS'
+ break
+
+ return status
+
+
+def merge_dicts(dict1, dict2):
+ for k in set(dict1.keys()).union(dict2.keys()):
+ if k in dict1 and k in dict2:
+ if isinstance(dict1[k], dict) and isinstance(dict2[k], dict):
+ yield (k, dict(merge_dicts(dict1[k], dict2[k])))
+ else:
+ yield (k, dict2[k])
+ elif k in dict1:
+ yield (k, dict1[k])
+ else:
+ yield (k, dict2[k])
+
+
+def check_test_result(test_name, ret, start_time, stop_time):
+ def get_criteria_value():
+ return get_criteria_by_test(test_name).split('==')[1].strip()
+
+ status = 'FAIL'
+ if str(ret) == get_criteria_value():
+ status = 'PASS'
+
+ details = {
+ 'timestart': start_time,
+ 'duration': round(stop_time - start_time, 1),
+ 'status': status,
+ }
+
+ return status, details