return branch.name
-def get_installer_type(logger=None):
+def get_installer_type():
"""
Get installer type (fuel, apex, joid, compass)
"""
try:
installer = os.environ['INSTALLER_TYPE']
except KeyError:
- globals()['logger'].error("Impossible to retrieve the installer type")
+ logger.error("Impossible to retrieve the installer type")
installer = "Unknown_installer"
return installer
-def get_scenario(logger=None):
+def get_scenario():
"""
Get scenario
"""
try:
scenario = os.environ['DEPLOY_SCENARIO']
except KeyError:
- globals()['logger'].error("Impossible to retrieve the scenario")
+ logger.error("Impossible to retrieve the scenario")
scenario = "Unknown_scenario"
return scenario
-def get_version(logger=None):
+def get_version():
"""
Get version
"""
# e.g. jenkins-functest-fuel-opnfv-jump-2-daily-master-190
# use regex to match branch info
rule = "daily-(.+?)-[0-9]*"
- build_tag = get_build_tag(logger)
+ build_tag = get_build_tag()
m = re.search(rule, build_tag)
if m:
return m.group(1)
return "unknown"
-def get_pod_name(logger=None):
+def get_pod_name():
"""
Get PoD Name from env variable NODE_NAME
"""
try:
return os.environ['NODE_NAME']
except KeyError:
- globals()['logger'].error(
+ logger.error(
"Unable to retrieve the POD name from environment. " +
"Using pod name 'unknown-pod'")
return "unknown-pod"
-def get_build_tag(logger=None):
+def get_build_tag():
"""
Get build tag of jenkins jobs
"""
try:
build_tag = os.environ['BUILD_TAG']
except KeyError:
- globals()['logger'].error("Impossible to retrieve the build tag")
+ logger.error("Impossible to retrieve the build tag")
build_tag = "unknown_build_tag"
return build_tag
-def get_db_url(logger=None):
+def get_db_url():
"""
Returns DB URL
"""
- functest_yaml = get_functest_yaml()
- db_url = functest_yaml.get("results").get("test_db_url")
- return db_url
+ return get_functest_config('results.test_db_url')
-def logger_test_results(logger, project, case_name, status, details):
- pod_name = get_pod_name(logger)
- scenario = get_scenario(logger)
- version = get_version(logger)
- build_tag = get_build_tag(logger)
+def logger_test_results(project, case_name, status, details):
+ pod_name = get_pod_name()
+ scenario = get_scenario()
+ version = get_version()
+ build_tag = get_build_tag()
- globals()['logger'].info(
+ logger.info(
"\n"
"****************************************\n"
"\t %(p)s/%(n)s results \n\n"
'd': details})
-def push_results_to_db(project, case_name, logger,
+def push_results_to_db(project, case_name,
start_date, stop_date, criteria, details):
"""
POST results to the Result target DB
"""
# Retrieve params from CI and conf
- url = get_db_url(logger) + "/results"
+ url = get_db_url() + "/results"
try:
installer = os.environ['INSTALLER_TYPE']
pod_name = os.environ['NODE_NAME']
build_tag = os.environ['BUILD_TAG']
except KeyError as e:
- globals()['logger'].error("Please set env var: " + str(e))
+ logger.error("Please set env var: " + str(e))
return False
rule = "daily-(.+?)-[0-9]*"
m = re.search(rule, build_tag)
if m:
version = m.group(1)
else:
- globals()['logger'].error("Please fix BUILD_TAG env var: " + build_tag)
+ logger.error("Please fix BUILD_TAG env var: " + build_tag)
return False
test_start = dt.fromtimestamp(start_date).strftime('%Y-%m-%d %H:%M:%S')
test_stop = dt.fromtimestamp(stop_date).strftime('%Y-%m-%d %H:%M:%S')
headers = {'Content-Type': 'application/json'}
try:
r = requests.post(url, data=json.dumps(params), headers=headers)
- globals()['logger'].debug(r)
+ logger.debug(r)
r.raise_for_status()
except requests.RequestException as exc:
if 'r' in locals():
})
finally:
if error:
- globals()['logger'].error(error)
+ logger.error(error)
return False
return True
return ci_env_var
-def execute_command(cmd, logger=None,
- exit_on_error=True,
- info=False,
- error_msg="",
- verbose=True):
+def execute_command(cmd, exit_on_error=True, info=False, error_msg="",
+ verbose=True, output_file=None):
if not error_msg:
error_msg = ("The command '%s' failed." % cmd)
msg_exec = ("Executing command: '%s'" % cmd)
if verbose:
if info:
- globals()['logger'].info(msg_exec)
+ logger.info(msg_exec)
else:
- globals()['logger'].debug(msg_exec)
+ logger.debug(msg_exec)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
+ if output_file:
+ f = open(output_file, "w")
for line in iter(p.stdout.readline, b''):
- line = line.replace('\n', '')
- print line
- sys.stdout.flush()
+ if output_file:
+ f.write(line)
+ else:
+ line = line.replace('\n', '')
+ print line
+ sys.stdout.flush()
+ if output_file:
+ f.close()
p.stdout.close()
returncode = p.wait()
if returncode != 0:
if verbose:
- globals()['logger'].error(error_msg)
+ logger.error(error_msg)
if exit_on_error:
sys.exit(1)
return returncode
-def get_deployment_dir(logger=None):
+def get_deployment_dir():
"""
Returns current Rally deployment directory
"""
- functest_yaml = get_functest_yaml()
- deployment_name = functest_yaml.get("rally").get("deployment_name")
- rally_dir = functest_yaml.get("general").get("directories").get(
- "dir_rally_inst")
+ deployment_name = get_functest_config('rally.deployment_name')
+ rally_dir = get_functest_config('general.directories.dir_rally_inst')
cmd = ("rally deployment list | awk '/" + deployment_name +
"/ {print $2}'")
p = subprocess.Popen(cmd, shell=True,
stderr=subprocess.STDOUT)
deployment_uuid = p.stdout.readline().rstrip()
if deployment_uuid == "":
- globals()['logger'].error("Rally deployment not found.")
+ logger.error("Rally deployment not found.")
exit(-1)
deployment_dir = (rally_dir + "/tempest/for-deployment-" +
deployment_uuid)
# YAML UTILS
#
# -----------------------------------------------------------
-def get_parameter_from_yaml(parameter, file=None):
+def get_parameter_from_yaml(parameter, file):
"""
- Returns the value of a given parameter in config_functest.yaml
+ Returns the value of a given parameter in file.yaml
parameter must be given in string format with dots
Example: general.openstack.image_name
"""
- if file is None:
- file = os.environ["CONFIG_FUNCTEST_YAML"]
with open(file) as f:
- functest_yaml = yaml.safe_load(f)
+ file_yaml = yaml.safe_load(f)
f.close()
- value = functest_yaml
+ value = file_yaml
for element in parameter.split("."):
value = value.get(element)
if value is None:
return value
+def get_functest_config(parameter):
+ yaml_ = os.environ["CONFIG_FUNCTEST_YAML"]
+ return get_parameter_from_yaml(parameter, yaml_)
+
+
def check_success_rate(case_name, success_rate):
success_rate = float(success_rate)
criteria = get_criteria_by_test(case_name)