Fix falsy links
[functest.git] / functest / utils / functest_utils.py
index dc20eea..ef5dac7 100644 (file)
 # are made available under the terms of the Apache License, Version 2.0
 # which accompanies this distribution, and is available at
 # http://www.apache.org/licenses/LICENSE-2.0
-#
-import functools
-import json
+
+# pylint: disable=missing-docstring
+
+from __future__ import print_function
 import logging
 import os
-import re
-import shutil
 import subprocess
 import sys
-import time
-from datetime import datetime as dt
-
-import dns.resolver
-import requests
-from six.moves import urllib
 import yaml
-from git import Repo
-
-from functest.utils import constants
-from functest.utils import decorators
-
-logger = logging.getLogger(__name__)
-
-
-# ----------------------------------------------------------
-#
-#               INTERNET UTILS
-#
-# -----------------------------------------------------------
-def check_internet_connectivity(url='http://www.opnfv.org/'):
-    """
-    Check if there is access to the internet
-    """
-    try:
-        urllib.request.urlopen(url, timeout=5)
-        return True
-    except urllib.error.URLError:
-        return False
-
-
-def download_url(url, dest_path):
-    """
-    Download a file to a destination path given a URL
-    """
-    name = url.rsplit('/')[-1]
-    dest = dest_path + "/" + name
-    try:
-        response = urllib.request.urlopen(url)
-    except (urllib.error.HTTPError, urllib.error.URLError):
-        return False
-
-    with open(dest, 'wb') as f:
-        shutil.copyfileobj(response, f)
-    return True
-
-
-# ----------------------------------------------------------
-#
-#               CI UTILS
-#
-# -----------------------------------------------------------
-def get_git_branch(repo_path):
-    """
-    Get git branch name
-    """
-    repo = Repo(repo_path)
-    branch = repo.active_branch
-    return branch.name
-
-
-def get_installer_type():
-    """
-    Get installer type (fuel, apex, joid, compass)
-    """
-    try:
-        installer = os.environ['INSTALLER_TYPE']
-    except KeyError:
-        logger.error("Impossible to retrieve the installer type")
-        installer = "Unknown_installer"
-
-    return installer
-
-
-def get_scenario():
-    """
-    Get scenario
-    """
-    try:
-        scenario = os.environ['DEPLOY_SCENARIO']
-    except KeyError:
-        logger.info("Impossible to retrieve the scenario."
-                    "Use default os-nosdn-nofeature-noha")
-        scenario = "os-nosdn-nofeature-noha"
-
-    return scenario
-
-
-def get_version():
-    """
-    Get version
-    """
-    # Use the build tag to retrieve the version
-    # By default version is unknown
-    # if launched through CI the build tag has the following format
-    # jenkins-<project>-<installer>-<pod>-<job>-<branch>-<id>
-    # e.g. jenkins-functest-fuel-opnfv-jump-2-daily-master-190
-    # jenkins-functest-fuel-baremetal-weekly-master-8
-    # use regex to match branch info
-    rule = "(dai|week)ly-(.+?)-[0-9]*"
-    build_tag = get_build_tag()
-    m = re.search(rule, build_tag)
-    if m:
-        return m.group(2)
-    else:
-        return "unknown"
-
-
-def get_pod_name():
-    """
-    Get PoD Name from env variable NODE_NAME
-    """
-    try:
-        return os.environ['NODE_NAME']
-    except KeyError:
-        logger.info(
-            "Unable to retrieve the POD name from environment. " +
-            "Using pod name 'unknown-pod'")
-        return "unknown-pod"
-
-
-def get_build_tag():
-    """
-    Get build tag of jenkins jobs
-    """
-    try:
-        build_tag = os.environ['BUILD_TAG']
-    except KeyError:
-        logger.info("Impossible to retrieve the build tag")
-        build_tag = "none"
-
-    return build_tag
 
+from shade import _utils
+import six
 
-def get_db_url():
-    """
-    Returns DB URL
-    """
-    # TODO use CONST mechanism
-    try:
-        # if TEST_DB_URL declared in env variable, use it!
-        db_url = os.environ['TEST_DB_URL']
-    except KeyError:
-        db_url = get_functest_config('results.test_db_url')
-    return db_url
-
-
-def logger_test_results(project, case_name, status, details):
-    pod_name = get_pod_name()
-    scenario = get_scenario()
-    version = get_version()
-    build_tag = get_build_tag()
-
-    logger.info(
-        "\n"
-        "****************************************\n"
-        "\t %(p)s/%(n)s results \n\n"
-        "****************************************\n"
-        "DB:\t%(db)s\n"
-        "pod:\t%(pod)s\n"
-        "version:\t%(v)s\n"
-        "scenario:\t%(s)s\n"
-        "status:\t%(c)s\n"
-        "build tag:\t%(b)s\n"
-        "details:\t%(d)s\n"
-        % {'p': project,
-            'n': case_name,
-            'db': get_db_url(),
-            'pod': pod_name,
-            'v': version,
-            's': scenario,
-            'c': status,
-            'b': build_tag,
-            'd': details})
-
-
-@decorators.can_dump_request_to_file
-def push_results_to_db(project, case_name,
-                       start_date, stop_date, result, details):
-    """
-    POST results to the Result target DB
-    """
-    # Retrieve params from CI and conf
-    url = get_db_url()
-
-    try:
-        installer = os.environ['INSTALLER_TYPE']
-        scenario = os.environ['DEPLOY_SCENARIO']
-        pod_name = os.environ['NODE_NAME']
-        build_tag = os.environ['BUILD_TAG']
-    except KeyError as e:
-        logger.error("Please set env var: " + str(e))
-        return False
-    version = get_version()
-    test_start = dt.fromtimestamp(start_date).strftime('%Y-%m-%d %H:%M:%S')
-    test_stop = dt.fromtimestamp(stop_date).strftime('%Y-%m-%d %H:%M:%S')
-
-    params = {"project_name": project, "case_name": case_name,
-              "pod_name": pod_name, "installer": installer,
-              "version": version, "scenario": scenario, "criteria": result,
-              "build_tag": build_tag, "start_date": test_start,
-              "stop_date": test_stop, "details": details}
-
-    error = None
-    headers = {'Content-Type': 'application/json'}
-    try:
-        r = requests.post(url, data=json.dumps(params, sort_keys=True),
-                          headers=headers)
-        logger.debug(r)
-        r.raise_for_status()
-    except requests.RequestException as exc:
-        if 'r' in locals():
-            error = ("Pushing Result to DB(%s) failed: %s" %
-                     (r.url, r.content))
-        else:
-            error = ("Pushing Result to DB(%s) failed: %s" % (url, exc))
-    except Exception as e:
-        error = ("Error [push_results_to_db("
-                 "DB: '%(db)s', "
-                 "project: '%(project)s', "
-                 "case: '%(case)s', "
-                 "pod: '%(pod)s', "
-                 "version: '%(v)s', "
-                 "scenario: '%(s)s', "
-                 "criteria: '%(c)s', "
-                 "build_tag: '%(t)s', "
-                 "details: '%(d)s')]: "
-                 "%(error)s" %
-                 {
-                     'db': url,
-                     'project': project,
-                     'case': case_name,
-                     'pod': pod_name,
-                     'v': version,
-                     's': scenario,
-                     'c': result,
-                     't': build_tag,
-                     'd': details,
-                     'error': e
-                 })
-    finally:
-        if error:
-            logger.error(error)
-            return False
-        return True
-
-
-def get_resolvconf_ns():
-    """
-    Get nameservers from current resolv.conf
-    """
-    nameservers = []
-    rconf = open("/etc/resolv.conf", "r")
-    line = rconf.readline()
-    resolver = dns.resolver.Resolver()
-    while line:
-        ip = re.search(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", line)
-        if ip:
-            resolver.nameservers = [ip.group(0)]
-            try:
-                result = resolver.query('opnfv.org')[0]
-                if result != "":
-                    nameservers.append(ip.group())
-            except dns.exception.Timeout:
-                pass
-        line = rconf.readline()
-    return nameservers
-
-
-def get_ci_envvars():
-    """
-    Get the CI env variables
-    """
-    ci_env_var = {
-        "installer": os.environ.get('INSTALLER_TYPE'),
-        "scenario": os.environ.get('DEPLOY_SCENARIO')}
-    return ci_env_var
+LOGGER = logging.getLogger(__name__)
 
 
 def execute_command_raise(cmd, info=False, error_msg="",
@@ -308,116 +36,182 @@ def execute_command(cmd, info=False, error_msg="",
     msg_exec = ("Executing command: '%s'" % cmd)
     if verbose:
         if info:
-            logger.info(msg_exec)
+            LOGGER.info(msg_exec)
         else:
-            logger.debug(msg_exec)
-    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
-                         stderr=subprocess.STDOUT)
+            LOGGER.debug(msg_exec)
+    popen = subprocess.Popen(
+        cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
     if output_file:
-        f = open(output_file, "w")
-    for line in iter(p.stdout.readline, b''):
+        ofd = open(output_file, "w")
+    for line in iter(popen.stdout.readline, b''):
         if output_file:
-            f.write(line)
+            ofd.write(line.decode("utf-8"))
         else:
-            line = line.replace('\n', '')
+            line = line.decode("utf-8").replace('\n', '')
             print(line)
             sys.stdout.flush()
     if output_file:
-        f.close()
-    p.stdout.close()
-    returncode = p.wait()
+        ofd.close()
+    popen.stdout.close()
+    returncode = popen.wait()
     if returncode != 0:
         if verbose:
-            logger.error(error_msg)
+            LOGGER.error(error_msg)
 
     return returncode
 
 
-def get_dict_by_test(testname):
-    with open(get_testcases_file_dir()) as f:
-        testcases_yaml = yaml.safe_load(f)
-
-    for dic_tier in testcases_yaml.get("tiers"):
-        for dic_testcase in dic_tier['testcases']:
-            if dic_testcase['case_name'] == testname:
-                return dic_testcase
-
-    logger.error('Project %s is not defined in testcases.yaml' % testname)
-    return None
-
-
-def get_criteria_by_test(testname):
-    dict = get_dict_by_test(testname)
-    if dict:
-        return dict['criteria']
-    return None
-
-
-# ----------------------------------------------------------
-#
-#               YAML UTILS
-#
-# -----------------------------------------------------------
-def get_parameter_from_yaml(parameter, file):
+def get_parameter_from_yaml(parameter, yfile):
     """
     Returns the value of a given parameter in file.yaml
     parameter must be given in string format with dots
     Example: general.openstack.image_name
     """
-    with open(file) as f:
-        file_yaml = yaml.safe_load(f)
-    f.close()
+    with open(yfile) as yfd:
+        file_yaml = yaml.safe_load(yfd)
     value = file_yaml
     for element in parameter.split("."):
         value = value.get(element)
         if value is None:
             raise ValueError("The parameter %s is not defined in"
-                             " %s" % (parameter, file))
+                             " %s" % (parameter, yfile))
     return value
 
 
-def get_functest_config(parameter):
-    yaml_ = constants.CONST.__getattribute__('CONFIG_FUNCTEST_YAML')
-    return get_parameter_from_yaml(parameter, yaml_)
+def get_nova_version(cloud):
+    """ Get Nova API microversion
 
+    Returns:
 
-def merge_dicts(dict1, dict2):
-    for k in set(dict1.keys()).union(dict2.keys()):
-        if k in dict1 and k in dict2:
-            if isinstance(dict1[k], dict) and isinstance(dict2[k], dict):
-                yield (k, dict(merge_dicts(dict1[k], dict2[k])))
-            else:
-                yield (k, dict2[k])
-        elif k in dict1:
-            yield (k, dict1[k])
+    - Nova API microversion
+    - None on operation error
+    """
+    # pylint: disable=protected-access
+    try:
+        request = cloud._compute_client.request("/", "GET")
+        LOGGER.debug('cloud._compute_client.request: %s', request)
+        version = request["version"]["version"]
+        major, minor = version.split('.')
+        LOGGER.debug('nova version: %s', (int(major), int(minor)))
+        return (int(major), int(minor))
+    except Exception:  # pylint: disable=broad-except
+        LOGGER.exception("Cannot detect Nova version")
+        return None
+
+
+def get_openstack_version(cloud):
+    """ Detect OpenStack version via Nova API microversion
+
+    It follows `MicroversionHistory
+    <https://docs.openstack.org/nova/latest/reference/api-microversion-history.html>`_.
+
+    Returns:
+
+    - OpenStack release
+    - Unknown on operation error
+    """
+    version = get_nova_version(cloud)
+    try:
+        assert version
+        if version > (2, 72):
+            osversion = "Master"
+        elif version > (2, 65):
+            osversion = "Stein"
+        elif version > (2, 60):
+            osversion = "Rocky"
+        elif version > (2, 53):
+            osversion = "Queens"
+        elif version > (2, 42):
+            osversion = "Pike"
+        elif version > (2, 38):
+            osversion = "Ocata"
+        elif version > (2, 25):
+            osversion = "Newton"
+        elif version > (2, 12):
+            osversion = "Mitaka"
+        elif version > (2, 3):
+            osversion = "Liberty"
+        elif version >= (2, 1):
+            osversion = "Kilo"
         else:
-            yield (k, dict2[k])
+            osversion = "Unknown"
+        LOGGER.info('Detect OpenStack version: %s', osversion)
+        return osversion
+    except AssertionError:
+        LOGGER.exception("Cannot detect OpenStack version")
+        return "Unknown"
 
 
-def get_testcases_file_dir():
-    return get_functest_config('general.functest.testcases_yaml')
+def list_services(cloud):
+    # pylint: disable=protected-access
+    """Search Keystone services via $OS_INTERFACE.
 
+    It mainly conforms with `Shade
+    <https://docs.openstack.org/shade/latest>`_ but allows testing vs
+    public endpoints. It's worth mentioning that it doesn't support keystone
+    v2.
+
+    :returns: a list of ``munch.Munch`` containing the services description
+
+    :raises: ``OpenStackCloudException`` if something goes wrong during the
+        openstack API call.
+    """
+    url, key = '/services', 'services'
+    data = cloud._identity_client.get(
+        url, endpoint_filter={
+            'interface': os.environ.get('OS_INTERFACE', 'public')},
+        error_message="Failed to list services")
+    services = cloud._get_and_munchify(key, data)
+    return _utils.normalize_keystone_services(services)
 
-def get_functest_yaml():
-    with open(constants.CONST.__getattribute__('CONFIG_FUNCTEST_YAML')) as f:
-        functest_yaml = yaml.safe_load(f)
-    f.close()
-    return functest_yaml
 
+def search_services(cloud, name_or_id=None, filters=None):
+    # pylint: disable=protected-access
+    """Search Keystone services ia $OS_INTERFACE.
 
-def print_separator():
-    logger.info("==============================================")
+    It mainly conforms with `Shade
+    <https://docs.openstack.org/shade/latest>`_ but allows testing vs
+    public endpoints. It's worth mentioning that it doesn't support keystone
+    v2.
+
+    :param name_or_id: Name or id of the desired service.
+    :param filters: a dict containing additional filters to use. e.g.
+                    {'type': 'network'}.
+
+    :returns: a list of ``munch.Munch`` containing the services description
+
+    :raises: ``OpenStackCloudException`` if something goes wrong during the
+        openstack API call.
+    """
+    services = list_services(cloud)
+    return _utils._filter_list(services, name_or_id, filters)
+
+
+def convert_dict_to_ini(value):
+    "Convert dict to oslo.conf input"
+    assert isinstance(value, dict)
+    return ",".join("{}:{}".format(
+        key, val) for (key, val) in six.iteritems(value))
+
+
+def convert_list_to_ini(value):
+    "Convert list to oslo.conf input"
+    assert isinstance(value, list)
+    return ",".join("{}".format(val) for val in value)
+
+
+def convert_ini_to_dict(value):
+    "Convert oslo.conf input to dict"
+    assert isinstance(value, str)
+    try:
+        return {k: v for k, v in (x.rsplit(':', 1) for x in value.split(','))}
+    except ValueError:
+        return {}
 
 
-def timethis(func):
-    """Measure the time it takes for a function to complete"""
-    @functools.wraps(func)
-    def timed(*args, **kwargs):
-        ts = time.time()
-        result = func(*args, **kwargs)
-        te = time.time()
-        elapsed = '{0}'.format(te - ts)
-        logger.info('{f}(*{a}, **{kw}) took: {t} sec'.format(
-            f=func.__name__, a=args, kw=kwargs, t=elapsed))
-        return result, elapsed
-    return timed
+def convert_ini_to_list(value):
+    "Convert list to oslo.conf input"
+    assert isinstance(value, str)
+    if not value:
+        return []
+    return [x for x in value.split(',')]