Add initialization for ADMIN_ID and ROOT_EXTENSION_ID in core.py. 76/1076/1
authorasteroide <thomas.duval@orange.com>
Mon, 27 Jul 2015 09:36:27 +0000 (11:36 +0200)
committerasteroide <thomas.duval@orange.com>
Mon, 27 Jul 2015 09:36:27 +0000 (11:36 +0200)
Change-Id: I25e69dbc19e7c4d896c0a0430805198a76f06957

keystone-moon/keystone/contrib/moon/core.py
keystone-moon/keystone/contrib/moon/exception.py
keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py

index e7d606c..7476415 100644 (file)
@@ -25,9 +25,9 @@ from keystone.contrib.moon.algorithms import *
 
 CONF = config.CONF
 LOG = log.getLogger(__name__)
-# TODO: call functions to get these 2 variables
-ADMIN_ID = uuid4().hex  # default user_id for internal invocation
-ROOT_EXTENSION_ID = uuid4().hex
+ADMIN_ID = None  # default user_id for internal invocation
+ROOT_EXTENSION_ID = None
+ROOT_EXTENSION_MODEL = "policy_root"
 
 
 _OPTS = [
@@ -107,9 +107,31 @@ def enforce(action_names, object_name, **extra):
     _action_name_list = action_names
     _object_name = object_name
 
+    def get_root_extension(self, args, kwargs):
+        if not ROOT_EXTENSION_ID:
+            global ROOT_EXTENSION_MODEL, ROOT_EXTENSION_ID, ADMIN_ID
+            try:
+                # if it is the first time we passed here, the root extension may be not initialized
+                # specially during unittest. So we raise  RootExtensionNotInitialized to authorize the
+                # current creation process
+                if 'intra_extension_dict' in kwargs:
+                    intra_extension_dict = kwargs['intra_extension_dict']
+                else:
+                    intra_extension_dict = args[2]
+                print(intra_extension_dict)
+                if isinstance(intra_extension_dict, dict) and \
+                                "model" in intra_extension_dict and \
+                                intra_extension_dict["model"] == "policy_root":
+                    raise RootExtensionNotInitialized()
+            except KeyError:
+                pass
+        return ROOT_EXTENSION_ID
+
     def wrap(func):
 
         def wrapped(*args, **kwargs):
+            global ADMIN_ID, ROOT_EXTENSION_ID
+            returned_value_for_func = None
             self = args[0]
             try:
                 user_id = args[1]
@@ -118,64 +140,85 @@ def enforce(action_names, object_name, **extra):
             intra_extension_id = None
             intra_admin_extension_id = None
 
-            if user_id == ADMIN_ID:
-                # TODO: check if there is no security hole here
-                return func(*args, **kwargs)
-
+            try:
+                intra_admin_extension_id = get_root_extension(self, args, kwargs)
+            except RootExtensionNotInitialized:
+                returned_value_for_func = func(*args, **kwargs)
+                intra_extensions_dict = self.admin_api.driver.get_intra_extensions_dict()
+                for ext in intra_extensions_dict:
+                    if intra_extensions_dict[ext]["model"] == ROOT_EXTENSION_MODEL:
+                        ROOT_EXTENSION_ID = ext
+                        break
+                if not ROOT_EXTENSION_ID:
+                    raise RootExtensionUnknown()
+                print(returned_value_for_func)
+                subjects_dict = self.admin_api.driver.get_subjects_dict(returned_value_for_func['id'])
+                for subject_id in subjects_dict:
+                    if subjects_dict[subject_id]["name"] == "admin":
+                        ADMIN_ID = subject_id
+                        break
+                if not ADMIN_ID:
+                    raise RootExtensionUnknown()
+                return returned_value_for_func
             try:
                 intra_extension_id = args[2]
             except IndexError:
                 if 'intra_extension_id' in kwargs:
                     intra_extension_id = kwargs['intra_extension_id']
-                else:
-                    intra_admin_extension_id = ROOT_EXTENSION_ID
-
-            intra_extensions_dict = self.admin_api.driver.get_intra_extensions_dict()
-            if intra_extension_id not in intra_extensions_dict:
-                raise IntraExtensionUnknown()
-            tenants_dict = self.tenant_api.driver.get_tenants_dict(ADMIN_ID)
-            for _tenant_id in tenants_dict:
-                if tenants_dict[_tenant_id]['intra_authz_extension_id'] is intra_extension_id or \
-                                tenants_dict[_tenant_id]['intra_admin_extension_id'] is intra_extension_id:
-                    intra_admin_extension_id = tenants_dict[_tenant_id]['intra_admin_extension_id']
-            if not intra_admin_extension_id:
-                self.moonlog_api.driver.warning("No Intra_Admin_Extension found, authorization granted by default.")
-                return func(*args, **kwargs)
+                # else:
+                #     intra_admin_extension_id = get_root_extension(self)
+
+            if ADMIN_ID and user_id == ADMIN_ID:
+                # TODO: check if there is no security hole here
+                returned_value_for_func = func(*args, **kwargs)
             else:
-                objects_dict = self.admin_api.driver.get_objects_dict(ADMIN_ID, intra_admin_extension_id)
-                object_name = intra_extensions_dict[intra_extension_id]['genre'] + '.' + _object_name
-                object_id = None
-                for _object_id in objects_dict:
-                    if objects_dict[_object_id]['name'] is object_name:
-                        object_id = _object_id
-                        break
-                if type(_action_name_list) in (str, unicode):
-                    action_name_list = (_action_name_list, )
+                intra_extensions_dict = self.admin_api.driver.get_intra_extensions_dict()
+                if intra_extension_id not in intra_extensions_dict:
+                    raise IntraExtensionUnknown()
+                tenants_dict = self.tenant_api.driver.get_tenants_dict(ADMIN_ID)
+                for _tenant_id in tenants_dict:
+                    if tenants_dict[_tenant_id]['intra_authz_extension_id'] is intra_extension_id or \
+                                    tenants_dict[_tenant_id]['intra_admin_extension_id'] is intra_extension_id:
+                        intra_admin_extension_id = tenants_dict[_tenant_id]['intra_admin_extension_id']
+                if not intra_admin_extension_id:
+                    self.moonlog_api.driver.warning("No Intra_Admin_Extension found, authorization granted by default.")
+                    returned_value_for_func = func(*args, **kwargs)
                 else:
-                    action_name_list = _action_name_list
-                actions_dict = self.admin_api.driver.get_actions_dict(ADMIN_ID, intra_admin_extension_id)
-                action_id_list = list()
-                for _action_name in action_name_list:
-                    for _action_id in actions_dict:
-                        if actions_dict[_action_id]['name'] is _action_name:
-                            action_id_list.append(_action_id)
+                    objects_dict = self.admin_api.driver.get_objects_dict(ADMIN_ID, intra_admin_extension_id)
+                    object_name = intra_extensions_dict[intra_extension_id]['genre'] + '.' + _object_name
+                    object_id = None
+                    for _object_id in objects_dict:
+                        if objects_dict[_object_id]['name'] is object_name:
+                            object_id = _object_id
                             break
-
-                authz_result = False
-                for action_id in action_id_list:
-                    if self.driver.authz(intra_admin_extension_id, user_id, object_id, action_id):
-                        authz_result = True
+                    if type(_action_name_list) in (str, unicode):
+                        action_name_list = (_action_name_list, )
                     else:
-                        authz_result = False
-                        break
-                if authz_result:
-                    return func(*args, **kwargs)
+                        action_name_list = _action_name_list
+                    actions_dict = self.admin_api.driver.get_actions_dict(ADMIN_ID, intra_admin_extension_id)
+                    action_id_list = list()
+                    for _action_name in action_name_list:
+                        for _action_id in actions_dict:
+                            if actions_dict[_action_id]['name'] is _action_name:
+                                action_id_list.append(_action_id)
+                                break
+
+                    authz_result = False
+                    for action_id in action_id_list:
+                        if self.driver.authz(intra_admin_extension_id, user_id, object_id, action_id):
+                            authz_result = True
+                        else:
+                            authz_result = False
+                            break
+                    if authz_result:
+                        returned_value_for_func = func(*args, **kwargs)
+            return returned_value_for_func
         return wrapped
     return wrap
 
 
 @dependency.provider('configuration_api')
-@dependency.requires('moonlog_api')
+@dependency.requires('moonlog_api', 'admin_api')
 class ConfigurationManager(manager.Manager):
 
     def __init__(self):
@@ -230,7 +273,7 @@ class ConfigurationManager(manager.Manager):
         return None
 
 @dependency.provider('tenant_api')
-@dependency.requires('moonlog_api', 'admin_api')
+@dependency.requires('moonlog_api', 'admin_api', 'configuration_api')
 class TenantManager(manager.Manager):
 
     def __init__(self):
@@ -259,8 +302,6 @@ class TenantManager(manager.Manager):
     def add_tenant_dict(self, user_id, tenant_dict):
         tenants_dict = self.driver.get_tenants_dict()
         for tenant_id in tenants_dict:
-            print(tenants_dict[tenant_id])
-            print(tenant_dict)
             if tenants_dict[tenant_id]['name'] == tenant_dict['name']:
                 raise TenantAddedNameExisting()
 
@@ -1553,6 +1594,7 @@ class IntraExtensionAuthzManager(IntraExtensionManager):
 
 
 @dependency.provider('admin_api')
+# @dependency.requires('configuration_api')
 class IntraExtensionAdminManager(IntraExtensionManager):
 
     def __init__(self):
index a53a339..75ccd18 100644 (file)
@@ -97,6 +97,12 @@ class RootExtensionUnknown(IntraExtensionUnknown):
     title = 'Root Extension Unknown'
     logger = "Error"
 
+class RootExtensionNotInitialized(IntraExtensionException):
+    message_format = _("The root_extension is not initialized.")
+    code = 400
+    title = 'Root Extension Not Initialized'
+    logger = "Error"
+
 
 class IntraExtensionCreationError(IntraExtensionException):
     message_format = _("The arguments for the creation of this Extension were malformed.")
index d122b25..6d65648 100644 (file)
@@ -13,6 +13,7 @@ from keystone.tests.unit.ksfixtures import database
 from keystone.contrib.moon.exception import *
 from keystone.tests.unit import default_fixtures
 from keystone.contrib.moon.core import LogManager
+from keystone.contrib.moon.core import ConfigurationManager
 from keystone.contrib.moon.core import ADMIN_ID
 from keystone.common import dependency
 
@@ -31,6 +32,7 @@ IE = {
 
 @dependency.requires('admin_api')
 class TestTenantManager(tests.TestCase):
+    ADMIN_ID = None
 
     def setUp(self):
         self.useFixture(database.Database())
@@ -39,8 +41,13 @@ class TestTenantManager(tests.TestCase):
         self.load_fixtures(default_fixtures)
         self.admin = self.create_user(username="admin")
         self.demo = self.create_user(username="demo")
-        self.manager = TenantManager()
         self.root_intra_extension = self.create_intra_extension(policy_model="policy_root")
+        # force re-initialization of the ADMIN_ID variable
+        from keystone.contrib.moon.core import ADMIN_ID
+        self.ADMIN_ID = ADMIN_ID
+        self.manager = TenantManager()
+        self.configuration_api = ConfigurationManager()
+        # self.configuration_api.init_default_variables()
 
     def load_extra_backends(self):
         return {
@@ -74,7 +81,10 @@ class TestTenantManager(tests.TestCase):
         if "authz" in policy_model:
             genre = "authz"
         IE["genre"] = genre
-        ref = self.admin_api.load_intra_extension_dict(ADMIN_ID, intra_extension_dict=IE)
+        # force re-initialization of the ADMIN_ID variable
+        from keystone.contrib.moon.core import ADMIN_ID
+        self.ADMIN_ID = ADMIN_ID
+        ref = self.admin_api.load_intra_extension_dict(self.ADMIN_ID, intra_extension_dict=IE)
         self.assertIsInstance(ref, dict)
         return ref
 
@@ -88,16 +98,16 @@ class TestTenantManager(tests.TestCase):
             "intra_authz_extension": authz_intra_extension['id'],
             "intra_admin_extension": admin_intra_extension['id'],
         }
-        data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant)
+        data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
         self.assertEquals(new_tenant["id"], data["id"])
         self.assertEquals(new_tenant["name"], data['tenant']["name"])
         self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
         self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
-        data = self.manager.get_tenants_dict(ADMIN_ID)
+        data = self.manager.get_tenants_dict(self.ADMIN_ID)
         self.assertNotEqual(data, {})
-        data = self.admin_api.get_intra_extension_dict(ADMIN_ID, new_tenant["intra_authz_extension"])
+        data = self.admin_api.get_intra_extension_dict(self.ADMIN_ID, new_tenant["intra_authz_extension"])
         self.assertEquals(new_tenant["intra_authz_extension"], data["id"])
-        data = self.admin_api.get_intra_extension_dict(ADMIN_ID, new_tenant["intra_admin_extension"])
+        data = self.admin_api.get_intra_extension_dict(self.ADMIN_ID, new_tenant["intra_admin_extension"])
         self.assertEquals(new_tenant["intra_admin_extension"], data["id"])
 
     def test_del_tenant(self):
@@ -110,15 +120,15 @@ class TestTenantManager(tests.TestCase):
             "intra_authz_extension": authz_intra_extension['id'],
             "intra_admin_extension": admin_intra_extension['id'],
         }
-        data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant)
+        data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
         self.assertEquals(new_tenant["id"], data["id"])
         self.assertEquals(new_tenant["name"], data['tenant']["name"])
         self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
         self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
-        data = self.manager.get_tenants_dict(ADMIN_ID)
+        data = self.manager.get_tenants_dict(self.ADMIN_ID)
         self.assertNotEqual(data, {})
-        self.manager.del_tenant(ADMIN_ID, new_tenant["id"])
-        data = self.manager.get_tenants_dict(ADMIN_ID)
+        self.manager.del_tenant(self.ADMIN_ID, new_tenant["id"])
+        data = self.manager.get_tenants_dict(self.ADMIN_ID)
         self.assertEqual(data, {})
 
     def test_set_tenant(self):
@@ -131,25 +141,25 @@ class TestTenantManager(tests.TestCase):
             "intra_authz_extension": authz_intra_extension['id'],
             "intra_admin_extension": admin_intra_extension['id'],
         }
-        data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant)
+        data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
         self.assertEquals(new_tenant["id"], data["id"])
         self.assertEquals(new_tenant["name"], data['tenant']["name"])
         self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
         self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
-        data = self.manager.get_tenants_dict(ADMIN_ID)
+        data = self.manager.get_tenants_dict(self.ADMIN_ID)
         self.assertNotEqual(data, {})
 
         new_tenant["name"] = "demo2"
-        data = self.manager.set_tenant_dict(user_id=ADMIN_ID, tenant_id=new_tenant["id"], tenant_dict=new_tenant)
+        data = self.manager.set_tenant_dict(user_id=self.ADMIN_ID, tenant_id=new_tenant["id"], tenant_dict=new_tenant)
         self.assertEquals(new_tenant["id"], data["id"])
         self.assertEquals(new_tenant["name"], data['tenant']["name"])
         self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
         self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
 
     def test_exception_tenant_unknown(self):
-        self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, ADMIN_ID, uuid.uuid4().hex)
-        self.assertRaises(TenantUnknown, self.manager.del_tenant, ADMIN_ID, uuid.uuid4().hex)
-        self.assertRaises(TenantUnknown, self.manager.set_tenant_dict, ADMIN_ID, uuid.uuid4().hex, {})
+        self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex)
+        self.assertRaises(TenantUnknown, self.manager.del_tenant, self.ADMIN_ID, uuid.uuid4().hex)
+        self.assertRaises(TenantUnknown, self.manager.set_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex, {})
 
         authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
         admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
@@ -160,15 +170,15 @@ class TestTenantManager(tests.TestCase):
             "intra_authz_extension": authz_intra_extension['id'],
             "intra_admin_extension": admin_intra_extension['id'],
         }
-        data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant)
+        data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
         self.assertEquals(new_tenant["id"], data["id"])
         self.assertEquals(new_tenant["name"], data['tenant']["name"])
         self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
         self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
-        data = self.manager.get_tenants_dict(ADMIN_ID)
+        data = self.manager.get_tenants_dict(self.ADMIN_ID)
         self.assertNotEqual(data, {})
 
-        self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, ADMIN_ID, uuid.uuid4().hex)
+        self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex)
 
     def test_exception_tenant_added_name_existing(self):
         authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
@@ -180,15 +190,15 @@ class TestTenantManager(tests.TestCase):
             "intra_authz_extension": authz_intra_extension['id'],
             "intra_admin_extension": admin_intra_extension['id'],
         }
-        data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant)
+        data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
         self.assertEquals(new_tenant["id"], data["id"])
         self.assertEquals(new_tenant["name"], data['tenant']["name"])
         self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
         self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
-        data = self.manager.get_tenants_dict(ADMIN_ID)
+        data = self.manager.get_tenants_dict(self.ADMIN_ID)
         self.assertNotEqual(data, {})
 
-        self.assertRaises(TenantAddedNameExisting, self.manager.add_tenant_dict, ADMIN_ID, new_tenant)
+        self.assertRaises(TenantAddedNameExisting, self.manager.add_tenant_dict, self.ADMIN_ID, new_tenant)
 
     def test_exception_tenant_no_intra_extension(self):
         authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
@@ -201,16 +211,16 @@ class TestTenantManager(tests.TestCase):
             "intra_admin_extension": admin_intra_extension['id'],
         }
         new_tenant['intra_authz_extension'] = None
-        self.assertRaises(TenantNoIntraAuthzExtension, self.manager.add_tenant_dict, ADMIN_ID, new_tenant)
+        self.assertRaises(TenantNoIntraAuthzExtension, self.manager.add_tenant_dict, self.ADMIN_ID, new_tenant)
         new_tenant['intra_authz_extension'] = authz_intra_extension['id']
-        data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant)
+        data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
         self.assertEquals(new_tenant["id"], data["id"])
         self.assertEquals(new_tenant["name"], data['tenant']["name"])
         self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
         self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
-        data = self.manager.get_tenants_dict(ADMIN_ID)
+        data = self.manager.get_tenants_dict(self.ADMIN_ID)
         self.assertNotEqual(data, {})
 
         new_tenant['intra_authz_extension'] = None
         new_tenant['name'] = "demo2"
-        self.assertRaises(TenantNoIntraAuthzExtension, self.manager.set_tenant_dict, ADMIN_ID, new_tenant["id"], new_tenant)
+        self.assertRaises(TenantNoIntraAuthzExtension, self.manager.set_tenant_dict, self.ADMIN_ID, new_tenant["id"], new_tenant)