Set port_security: true in tempest_conf.yaml
[functest.git] / functest / utils / functest_utils.py
index 7ae7439..a1434ad 100644 (file)
 
 from __future__ import print_function
 import logging
+import os
 import subprocess
 import sys
 import yaml
 
+from shade import _utils
 import six
 
 LOGGER = logging.getLogger(__name__)
@@ -43,9 +45,9 @@ def execute_command(cmd, info=False, error_msg="",
         ofd = open(output_file, "w")
     for line in iter(popen.stdout.readline, b''):
         if output_file:
-            ofd.write(line)
+            ofd.write(line.decode("utf-8"))
         else:
-            line = line.replace('\n', '')
+            line = line.decode("utf-8").replace('\n', '')
             print(line)
             sys.stdout.flush()
     if output_file:
@@ -108,11 +110,16 @@ def get_openstack_version(cloud):
     - OpenStack release
     - Unknown on operation error
     """
+    # pylint: disable=too-many-branches
     version = get_nova_version(cloud)
     try:
         assert version
-        if version > (2, 65):
+        if version > (2, 79):
             osversion = "Master"
+        elif version > (2, 72):
+            osversion = "Train"
+        elif version > (2, 65):
+            osversion = "Stein"
         elif version > (2, 60):
             osversion = "Rocky"
         elif version > (2, 53):
@@ -138,6 +145,51 @@ def get_openstack_version(cloud):
         return "Unknown"
 
 
+def list_services(cloud):
+    # pylint: disable=protected-access
+    """Search Keystone services via $OS_INTERFACE.
+
+    It mainly conforms with `Shade
+    <https://docs.openstack.org/shade/latest>`_ but allows testing vs
+    public endpoints. It's worth mentioning that it doesn't support keystone
+    v2.
+
+    :returns: a list of ``munch.Munch`` containing the services description
+
+    :raises: ``OpenStackCloudException`` if something goes wrong during the
+        openstack API call.
+    """
+    url, key = '/services', 'services'
+    data = cloud._identity_client.get(
+        url, endpoint_filter={
+            'interface': os.environ.get('OS_INTERFACE', 'public')},
+        error_message="Failed to list services")
+    services = cloud._get_and_munchify(key, data)
+    return _utils.normalize_keystone_services(services)
+
+
+def search_services(cloud, name_or_id=None, filters=None):
+    # pylint: disable=protected-access
+    """Search Keystone services ia $OS_INTERFACE.
+
+    It mainly conforms with `Shade
+    <https://docs.openstack.org/shade/latest>`_ but allows testing vs
+    public endpoints. It's worth mentioning that it doesn't support keystone
+    v2.
+
+    :param name_or_id: Name or id of the desired service.
+    :param filters: a dict containing additional filters to use. e.g.
+                    {'type': 'network'}.
+
+    :returns: a list of ``munch.Munch`` containing the services description
+
+    :raises: ``OpenStackCloudException`` if something goes wrong during the
+        openstack API call.
+    """
+    services = list_services(cloud)
+    return _utils._filter_list(services, name_or_id, filters)
+
+
 def convert_dict_to_ini(value):
     "Convert dict to oslo.conf input"
     assert isinstance(value, dict)
@@ -155,7 +207,7 @@ def convert_ini_to_dict(value):
     "Convert oslo.conf input to dict"
     assert isinstance(value, str)
     try:
-        return {k: v for k, v in (x.split(':') for x in value.split(','))}
+        return {k: v for k, v in (x.rsplit(':', 1) for x in value.split(','))}
     except ValueError:
         return {}