]
},
"object_scopes": {
- "action_id": [
+ "object_id": [
"authz.subjects",
"authz.objects",
"authz.actions",
"object_assignments": {
"object_id": {
"templates": ["templates"],
- "sub_meta_rule_algorithm": ["sub_meta_rule_relations"],
+ "sub_meta_rule_algorithms": ["sub_meta_rule_algorithms"],
"aggregation_algorithms": ["aggregation_algorithms"],
"tenants": ["tenants"],
"intra_extensions": ["intra_extensions"],
"""Override parent from_dict() method with a different implementation.
"""
new_d = d.copy()
- uuid = new_d.keys()[0]
- return cls(id=uuid, **new_d[uuid])
+ return cls(**new_d)
def to_dict(self):
"""
"""
- tenant_dict = {}
- for key in ("id", "name", "authz", "admin"):
- tenant_dict[key] = getattr(self, key)
- return tenant_dict
+ return dict(six.iteritems(self))
class SubjectCategory(sql.ModelBase, sql.DictBase):
with sql.transaction() as session:
query = session.query(Tenant)
tenants = query.all()
- return {tenant.id: Tenant.to_dict(tenant) for tenant in tenants}
+ return {tenant.id: tenant.tenant for tenant in tenants}
def add_tenant_dict(self, tenant_id, tenant_dict):
with sql.transaction() as session:
def get_intra_extensions_dict(self):
with sql.transaction() as session:
- query = session.query(IntraExtension.id)
+ query = session.query(IntraExtension)
ref_list = query.all()
- return {_ref.id: _ref.intraextension for _ref in ref_list}
+ return {_ref.id: _ref.intra_extension for _ref in ref_list}
def del_intra_extension(self, intra_extension_id):
with sql.transaction() as session:
def get_rules_dict(self, intra_extension_id, sub_meta_rule_id):
with sql.transaction() as session:
query = session.query(Rule)
- query = query.filter_by(intra_extension_id=intra_extension_id, id=sub_meta_rule_id)
+ query = query.filter_by(intra_extension_id=intra_extension_id, sub_meta_rule_id=sub_meta_rule_id)
ref_list = query.all()
return {_ref.id: _ref.rule for _ref in ref_list}
@controller.protected()
def add_tenant(self, context, **kw):
user_id = self._get_user_id_from_token(context.get("token_id"))
- # TODO: get tenant name from keystone
+ # Next line will raise an error if tenant doesn't exist
+ k_tenant_dict = self.resource_api.get_project_by_name(kw.get("name", None))
tenant_dict = dict()
+ tenant_dict['id'] = k_tenant_dict['id']
tenant_dict['name'] = kw.get("name", None)
tenant_dict['description'] = kw.get("description", None)
tenant_dict['intra_authz_ext_id'] = kw.get("intra_authz_ext_id", None)
@controller.protected()
def set_tenant(self, context, **kw):
user_id = self._get_user_id_from_token(context.get('token_id'))
+ # Next line will raise an error if tenant doesn't exist
+ k_tenant_dict = self.resource_api.get_project(kw.get('id', None))
tenant_id = kw.get('id', None)
tenant_dict = dict()
- tenant_dict['name'] = kw.get("name", None)
+ tenant_dict['name'] = k_tenant_dict.get("name", None)
tenant_dict['description'] = kw.get("description", None)
tenant_dict['intra_authz_ext_id'] = kw.get("intra_authz_ext_id", None)
tenant_dict['intra_admin_ext_id'] = kw.get("intra_admin_ext_id", None)
intra_extension_dict["aggregation_algorithm"] = kw.get("intra_extension_aggregation_algorithm", dict())
intra_extension_dict["sub_meta_rules"] = kw.get("intra_extension_sub_meta_rules", dict())
intra_extension_dict["rules"] = kw.get("intra_extension_rules", dict())
- return self.admin_api.load_intra_extension_dict(user_id, intra_extension_dict)
+ return self.admin_api.load_intra_extension_dict(user_id, intra_extension_dict=intra_extension_dict)
@controller.protected()
def get_intra_extension(self, context, **kw):
def filter_input(func_or_str):
def __filter(string):
- return "".join(re.findall("[\w\-+]*", string))
+ if string:
+ return "".join(re.findall("[\w\-+]*", string))
def wrapped(*args, **kwargs):
_args = []
if isinstance(arg, str) or isinstance(arg, unicode):
arg = __filter(arg)
elif isinstance(arg, list):
- arg = [__filter(item) for item in arg]
+ arg = [__filter(item) for item in arg]
elif isinstance(arg, tuple):
- arg = (__filter(item) for item in arg)
+ arg = (__filter(item) for item in arg)
elif isinstance(arg, dict):
- arg = {item: __filter(arg[item]) for item in arg.keys()}
+ arg = {item: __filter(arg[item]) for item in arg.keys()}
_args.append(arg)
for arg in kwargs:
if type(kwargs[arg]) in (unicode, str):
if isinstance(kwargs[arg], str) or isinstance(kwargs[arg], unicode):
kwargs[arg] = __filter(kwargs[arg])
elif isinstance(kwargs[arg], list):
- kwargs[arg] = [__filter(item) for item in kwargs[arg]]
+ kwargs[arg] = [__filter(item) for item in kwargs[arg]]
elif isinstance(kwargs[arg], tuple):
- kwargs[arg] = (__filter(item) for item in kwargs[arg])
+ kwargs[arg] = (__filter(item) for item in kwargs[arg])
elif isinstance(kwargs[arg], dict):
- kwargs[arg] = {item: __filter(kwargs[arg][item]) for item in kwargs[arg].keys()}
+ kwargs[arg] = {item: __filter(kwargs[arg][item]) for item in kwargs[arg].keys()}
return func_or_str(*_args, **kwargs)
if isinstance(func_or_str, str) or isinstance(func_or_str, unicode):
_object_name = object_name
def wrap(func):
- def wrapped(*args):
- # global actions
+
+ def wrapped(*args, **kwargs):
self = args[0]
- user_id = args[1]
+ try:
+ user_id = args[1]
+ except IndexError:
+ user_id = kwargs['user_id']
+ intra_extension_id = None
intra_admin_extension_id = None
+ if user_id == ADMIN_ID:
+ # TODO: check if there is no security hole here
+ return func(*args, **kwargs)
+
try:
intra_extension_id = args[2]
- except:
- intra_admin_extension_id = ROOT_EXTENSION_ID
+ except IndexError:
+ if 'intra_extension_id' in kwargs:
+ intra_extension_id = kwargs['intra_extension_id']
+ else:
+ intra_admin_extension_id = ROOT_EXTENSION_ID
- intra_extensions_dict = self.admin_api.get_intra_extensions(ADMIN_ID)
+ intra_extensions_dict = self.admin_api.driver.get_intra_extensions_dict()
if intra_extension_id not in intra_extensions_dict:
raise IntraExtensionUnknown()
- tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID)
+ tenants_dict = self.tenant_api.driver.get_tenants_dict(ADMIN_ID)
for _tenant_id in tenants_dict:
if tenants_dict[_tenant_id]['intra_authz_extension_id'] is intra_extension_id or \
tenants_dict[_tenant_id]['intra_admin_extension_id'] is intra_extension_id:
intra_admin_extension_id = tenants_dict[_tenant_id]['intra_admin_extension_id']
if not intra_admin_extension_id:
- args[0].moonlog_api.warning("No Intra_Admin_Extension found, authorization granted by default.")
- return func(*args)
+ self.moonlog_api.driver.warning("No Intra_Admin_Extension found, authorization granted by default.")
+ return func(*args, **kwargs)
else:
- objects_dict = self.admin_api.get_objects_dict(ADMIN_ID, intra_admin_extension_id)
+ objects_dict = self.admin_api.driver.get_objects_dict(ADMIN_ID, intra_admin_extension_id)
object_name = intra_extensions_dict[intra_extension_id]['genre'] + '.' + _object_name
object_id = None
for _object_id in objects_dict:
action_name_list = (_action_name_list, )
else:
action_name_list = _action_name_list
- actions_dict = self.admin_api.get_actions_dict(ADMIN_ID, intra_admin_extension_id)
+ actions_dict = self.admin_api.driver.get_actions_dict(ADMIN_ID, intra_admin_extension_id)
action_id_list = list()
for _action_name in action_name_list:
for _action_id in actions_dict:
authz_result = False
break
if authz_result:
- return func(*args)
+ return func(*args, **kwargs)
return wrapped
return wrap
return sub_meta_rule_algorithm_id
return None
-
@dependency.provider('tenant_api')
-@dependency.requires('moonlog_api')
+@dependency.requires('moonlog_api', 'admin_api')
class TenantManager(manager.Manager):
def __init__(self):
super(TenantManager, self).__init__(CONF.moon.tenant_driver)
+ @filter_input
@enforce("read", "tenants")
def get_tenants_dict(self, user_id):
"""
"""
return self.driver.get_tenants_dict()
+ @filter_input
@enforce(("read", "write"), "tenants")
def add_tenant_dict(self, user_id, tenant_dict):
tenants_dict = self.driver.get_tenants_dict()
for tenant_id in tenants_dict:
- if tenants_dict[tenant_id]['name'] is tenant_dict['name']:
+ print(tenants_dict[tenant_id])
+ print(tenant_dict)
+ if tenants_dict[tenant_id]['name'] == tenant_dict['name']:
raise TenantAddedNameExisting()
# Sync users between intra_authz_extension and intra_admin_extension
if tenant_dict['intra_admin_extension']:
if not tenant_dict['intra_authz_extension']:
- raise TenantNoIntraAuthzExtension
+ raise TenantNoIntraAuthzExtension()
else:
authz_subjects_dict = self.admin_api.get_subjects_dict(ADMIN_ID, tenant_dict['intra_authz_extension'])
admin_subjects_dict = self.admin_api.get_subjects_dict(ADMIN_ID, tenant_dict['intra_admin_extension'])
for _subject_id in admin_subjects_dict:
if _subject_id not in authz_subjects_dict:
self.admin_api.add_subject_dict(ADMIN_ID, tenant_dict['intra_authz_extension'], admin_subjects_dict[_subject_id])
- return self.driver.add_tenant_dict(uuid4().hex, tenant_dict)
+ return self.driver.add_tenant_dict(tenant_dict['id'], tenant_dict)
+
+ @filter_input
@enforce("read", "tenants")
def get_tenant_dict(self, user_id, tenant_id):
tenants_dict = self.driver.get_tenants_dict()
raise TenantUnknown()
return tenants_dict[tenant_id]
+ @filter_input
@enforce(("read", "write"), "tenants")
def del_tenant(self, user_id, tenant_id):
if tenant_id not in self.driver.get_tenants_dict():
raise TenantUnknown()
self.driver.del_tenant(tenant_id)
+ @filter_input
@enforce(("read", "write"), "tenants")
def set_tenant_dict(self, user_id, tenant_id, tenant_dict):
tenants_dict = self.driver.get_tenants_dict()
for _subject_id in admin_subjects_dict:
if _subject_id not in authz_subjects_dict:
self.admin_api.add_subject_dict(ADMIN_ID, tenant_dict['intra_authz_extension'], admin_subjects_dict[_subject_id])
+
return self.driver.set_tenant_dict(tenant_id, tenant_dict)
subject_dict = dict()
# We suppose that all subjects can be mapped to a true user in Keystone
for _subject in json_perimeter['subjects']:
- user = self.identity_api.get_user_by_name(_subject, "default")
- subject_dict[user["id"]] = user
- self.driver.set_subject_dict(intra_extension_dict["id"], user["id"], user)
+ keystone_user = self.identity_api.get_user_by_name(_subject, "default")
+ subject_id = uuid4().hex
+ subject_dict[subject_id] = keystone_user
+ subject_dict[subject_id]['keystone_id'] = keystone_user["id"]
+ subject_dict[subject_id]['keystone_name'] = keystone_user["name"]
+ self.driver.set_subject_dict(intra_extension_dict["id"], subject_id, subject_dict[subject_id])
intra_extension_dict["subjects"] = subject_dict
# Copy all values for objects and actions
else:
# if value doesn't exist add a default value
subrule.append(True)
- rules[sub_rule_id].append(subrule)
- self.driver.set_rule_dict(intra_extension_dict["id"], sub_rule_id, uuid4().hex, rules)
+ # rules[sub_rule_id].append(subrule)
+ self.driver.set_rule_dict(intra_extension_dict["id"], sub_rule_id, uuid4().hex, subrule)
@enforce(("read", "write"), "intra_extensions")
def load_intra_extension_dict(self, user_id, intra_extension_dict):
if subjects_dict[subject_id]["name"] is subject_dict['name']:
raise SubjectNameExisting()
# Next line will raise an error if user is not present in Keystone database
- subject_item_dict = self.identity_api.get_user_by_name(subject_dict['name'], "default")
- return self.driver.set_subject_dict(intra_extension_id, subject_item_dict["id"], subject_dict)
+ subject_keystone_dict = self.identity_api.get_user_by_name(subject_dict['name'], "default")
+ subject_dict["keystone_id"] = subject_keystone_dict["id"]
+ subject_dict["keystone_name"] = subject_keystone_dict["name"]
+ return self.driver.set_subject_dict(intra_extension_id, uuid4().hex, subject_dict)
@filter_input
@enforce("read", "subjects")
if subjects_dict[subject_id]["name"] is subject_dict['name']:
raise SubjectNameExisting()
# Next line will raise an error if user is not present in Keystone database
- subject_item_dict = self.identity_api.get_user_by_name(subject_dict['name'], "default")
- return self.driver.set_subject_dict(intra_extension_id, subject_item_dict["id"], subject_dict)
+ subject_keystone_dict = self.identity_api.get_user_by_name(subject_dict['name'], "default")
+ subject_dict["keystone_id"] = subject_keystone_dict["id"]
+ subject_dict["keystone_name"] = subject_keystone_dict["name"]
+ return self.driver.set_subject_dict(intra_extension_id, subject_dict["id"], subject_dict)
@filter_input
@enforce("read", "objects")
subjects_dict = self.driver.get_subjects_dict(intra_extension_id)
subject_id = None
for _subject_id in subjects_dict:
- if subjects_dict[_subject_id]['name'] is subject_name:
+ if subjects_dict[_subject_id]['keystone_name'] is subject_name:
subject_id = _subject_id
if not subject_id:
raise SubjectUnknown()
@dependency.provider('moonlog_api')
+# Next line is mandatory in order to force keystone to process dependencies.
+@dependency.requires('admin_api')
class LogManager(manager.Manager):
def __init__(self):
data_values = self.get_subjects_dict(intra_extension_uuid)
if (name and name not in extract_name(data_values)) or \
(uuid and uuid not in data_values.keys()):
- raise SubjectUnknown()
+ raise SubjectUnknown("{} / {}".format(name, data_values))
elif data_name == self.OBJECT:
data_values = self.get_objects_dict(intra_extension_uuid)
if (name and name not in extract_name(data_values)) or \
(uuid and uuid not in data_values.keys()):
- raise ObjectUnknown()
+ raise ObjectUnknown("{} / {}".format(name, data_values))
elif data_name == self.ACTION:
data_values = self.get_actions_dict(intra_extension_uuid)
if (name and name not in extract_name(data_values)) or \
(uuid and uuid not in data_values.keys()):
- raise ActionUnknown()
+ raise ActionUnknown("{} / {}".format(name, data_values))
elif data_name == self.SUBJECT_CATEGORY:
data_values = self.get_subject_categories_dict(intra_extension_uuid)
if (name and name not in extract_name(data_values)) or \
(uuid and uuid not in data_values.keys()):
- raise SubjectCategoryUnknown()
+ raise SubjectCategoryUnknown("{} / {}".format(name, data_values))
elif data_name == self.OBJECT_CATEGORY:
data_values = self.get_object_categories_dict(intra_extension_uuid)
if (name and name not in extract_name(data_values)) or \
(uuid and uuid not in data_values.keys()):
- raise ObjectCategoryUnknown()
+ raise ObjectCategoryUnknown("{} / {}".format(name, data_values))
elif data_name == self.ACTION_CATEGORY:
data_values = self.get_action_categories_dict(intra_extension_uuid)
if (name and name not in extract_name(data_values)) or \
(uuid and uuid not in data_values.keys()):
- raise ActionCategoryUnknown()
+ raise ActionCategoryUnknown("{} / {}".format(name, data_values))
elif data_name == self.SUBJECT_SCOPE:
if not category_uuid:
category_uuid = self.get_uuid_from_name(intra_extension_uuid, category_name, self.SUBJECT_CATEGORY)
category_uuid)
if (name and name not in extract_name(data_values)) or \
(uuid and uuid not in data_values.keys()):
- raise SubjectScopeUnknown()
+ raise SubjectScopeUnknown("{} / {}".format(name, data_values))
elif data_name == self.OBJECT_SCOPE:
if not category_uuid:
category_uuid = self.get_uuid_from_name(intra_extension_uuid, category_name, self.OBJECT_CATEGORY)
category_uuid)
if (name and name not in extract_name(data_values)) or \
(uuid and uuid not in data_values.keys()):
- raise ObjectScopeUnknown()
+ raise ObjectScopeUnknown("{} / {}".format(name, data_values))
elif data_name == self.ACTION_SCOPE:
if not category_uuid:
category_uuid = self.get_uuid_from_name(intra_extension_uuid, category_name, self.ACTION_CATEGORY)
category_uuid)
if (name and name not in extract_name(data_values)) or \
(uuid and uuid not in data_values.keys()):
- raise ActionScopeUnknown()
+ raise ActionScopeUnknown("{} / {}".format(name, data_values))
elif data_name == self.SUB_META_RULE:
data_values = self.get_sub_meta_rules_dict(intra_extension_uuid)
- print("name = {}".format(name))
- print("data_values = {}".format(data_values))
if (name and name not in extract_name(data_values)) or \
(uuid and uuid not in data_values.keys()):
- raise SubMetaRuleUnknown()
+ raise SubMetaRuleUnknown("{} / {}".format(name, data_values))
# if data_name in (
# self.SUBJECT_SCOPE,
# self.OBJECT_SCOPE,
category_name=category_name,
category_uuid=category_uuid,
)
- print("get_uuid_from_name {}".format(data_values))
return filter(lambda v: v[1]["name"] == name, data_values.iteritems())[0][0]
def get_name_from_uuid(self, intra_extension_uuid, uuid, data_name, category_name=None, category_uuid=None):
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+from uuid import uuid4
import sqlalchemy as sql
from keystone.common import sql as k_sql
mysql_charset='utf8')
intra_extension_table.create(migrate_engine, checkfirst=True)
+ intra_extension_table.insert().values(id=uuid4().hex, intra_extension={
+ 'name': "Root Extension",
+ 'description': "The root intra extension",
+ 'model': 'admin'
+ })
+
tenant_table = sql.Table(
'tenants',
meta,
#self.admin = self.identity_api.create_user(USER)
IE["policymodel"] = policy_model
IE["name"] = uuid.uuid4().hex
- self.ref = self.manager.load_intra_extension_dict(DEFAULT_USER_ID, IE)
+ self.ref = self.manager.load_intra_extension_dict(DEFAULT_USER_ID, intra_extension_dict=IE)
self.assertIsInstance(self.ref, dict)
self.create_tenant(self.ref["id"])
IE["policymodel"] = policy_model
IE["name"] = uuid.uuid4().hex
- ref = self.admin_manager.load_intra_extension_dict(DEFAULT_USER_ID, IE)
+ ref = self.admin_manager.load_intra_extension_dict(DEFAULT_USER_ID, intra_extension_dict=IE)
self.assertIsInstance(ref, dict)
return ref
from keystone import resource
from keystone.contrib.moon.exception import *
from keystone.tests.unit import default_fixtures
-from keystone.contrib.moon.core import LogManager, TenantManager
+from keystone.contrib.moon.core import LogManager, TenantManager, ADMIN_ID
CONF = cfg.CONF
IE["model"] = policy_model
IE["name"] = uuid.uuid4().hex
- ref = self.admin_manager.load_intra_extension_dict(DEFAULT_USER_ID, IE)
+ ref = self.admin_manager.load_intra_extension_dict(ADMIN_ID, intra_extension_dict=IE)
self.assertIsInstance(ref, dict)
return ref
# Create the admin user because IntraExtension needs it
self.admin = self.identity_api.create_user(USER_ADMIN)
IE["policymodel"] = policy_model
- self.ref = self.manager.load_intra_extension_dict(DEFAULT_USER_ID, IE)
+ self.ref = self.manager.load_intra_extension_dict(DEFAULT_USER_ID, intra_extension_dict=IE)
self.assertIsInstance(self.ref, dict)
self.create_tenant(self.ref["id"])
from keystone.contrib.moon.exception import *
from keystone.tests.unit import default_fixtures
from keystone.contrib.moon.core import LogManager
+from keystone.contrib.moon.core import ADMIN_ID
+from keystone.common import dependency
-CONF = cfg.CONF
+CONF = cfg.CONF
+USER = {
+ 'name': 'admin',
+ 'domain_id': "default",
+ 'password': 'admin'
+}
+IE = {
+ "name": "test IE",
+ "policymodel": "policy_authz",
+ "description": "a simple description."
+}
+@dependency.requires('admin_api')
class TestTenantManager(tests.TestCase):
def setUp(self):
super(TestTenantManager, self).setUp()
self.load_backends()
self.load_fixtures(default_fixtures)
+ self.admin = self.create_user(username="admin")
+ self.demo = self.create_user(username="demo")
self.manager = TenantManager()
+ self.root_intra_extension = self.create_intra_extension(policy_model="policy_root")
def load_extra_backends(self):
return {
self.config_fixture.config(
group='moon',
tenant_driver='keystone.contrib.moon.backends.sql.TenantConnector')
+ self.policy_directory = 'examples/moon/policies'
+ self.config_fixture.config(
+ group='moon',
+ intraextension_driver='keystone.contrib.moon.backends.sql.IntraExtensionConnector')
+ self.config_fixture.config(
+ group='moon',
+ policy_directory=self.policy_directory)
+
+ def create_user(self, username="admin"):
+
+ _USER = dict(USER)
+ _USER["name"] = username
+ return self.identity_api.create_user(_USER)
+
+ def create_intra_extension(self, policy_model="policy_authz"):
+
+ IE["model"] = policy_model
+ IE["name"] = uuid.uuid4().hex
+ genre = "admin"
+ if "authz" in policy_model:
+ genre = "authz"
+ IE["genre"] = genre
+ ref = self.admin_api.load_intra_extension_dict(ADMIN_ID, intra_extension_dict=IE)
+ self.assertIsInstance(ref, dict)
+ return ref
def test_add_tenant(self):
- _uuid = uuid.uuid4().hex
+ authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
+ admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
new_mapping = {
- _uuid: {
- "name": uuid.uuid4().hex,
- "authz": uuid.uuid4().hex,
- "admin": uuid.uuid4().hex,
- }
+ "id": uuid.uuid4().hex,
+ "name": "demo",
+ "description": uuid.uuid4().hex,
+ "intra_authz_extension": authz_intra_extension['id'],
+ "intra_admin_extension": admin_intra_extension['id'],
}
- data = self.manager.set_tenant_dict(
- tenant_id=_uuid,
- tenant_name=new_mapping[_uuid]["name"],
- intra_authz_ext_id=new_mapping[_uuid]["authz"],
- intra_admin_ext_id=new_mapping[_uuid]["admin"]
+ data = self.manager.add_tenant_dict(
+ user_id=ADMIN_ID,
+ tenant_dict=new_mapping
)
- self.assertEquals(_uuid, data["id"])
- self.assertEquals(data["name"], new_mapping[_uuid]["name"])
- self.assertEquals(data["authz"], new_mapping[_uuid]["authz"])
- self.assertEquals(data["admin"], new_mapping[_uuid]["admin"])
- data = self.manager.get_tenants_dict()
+ self.assertEquals(new_mapping["id"], data["id"])
+ self.assertEquals(new_mapping["name"], data['tenant']["name"])
+ self.assertEquals(new_mapping["intra_authz_extension"], data['tenant']["intra_authz_extension"])
+ self.assertEquals(new_mapping["intra_admin_extension"], data['tenant']["intra_admin_extension"])
+ data = self.manager.get_tenants_dict(ADMIN_ID)
self.assertNotEqual(data, {})
- data = self.manager.get_tenant_uuid(new_mapping[_uuid]["authz"])
- self.assertEquals(_uuid, data)
- data = self.manager.get_tenant_uuid(new_mapping[_uuid]["admin"])
- self.assertEquals(_uuid, data)
- data = self.manager.get_admin_extension_uuid(new_mapping[_uuid]["authz"])
- self.assertEquals(new_mapping[_uuid]["admin"], data)
+ data = self.admin_api.get_intra_extension_dict(ADMIN_ID, new_mapping["intra_authz_extension"])
+ self.assertEquals(new_mapping["intra_authz_extension"], data["id"])
+ data = self.admin_api.get_intra_extension_dict(ADMIN_ID, new_mapping["intra_admin_extension"])
+ self.assertEquals(new_mapping["intra_admin_extension"], data["id"])
def test_del_tenant(self):
- _uuid = uuid.uuid4().hex
+ authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
+ admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
new_mapping = {
- _uuid: {
- "name": uuid.uuid4().hex,
- "authz": uuid.uuid4().hex,
- "admin": uuid.uuid4().hex,
- }
+ "id": uuid.uuid4().hex,
+ "name": "demo",
+ "description": uuid.uuid4().hex,
+ "intra_authz_extension": authz_intra_extension['id'],
+ "intra_admin_extension": admin_intra_extension['id'],
}
- data = self.manager.set_tenant_dict(
- tenant_id=_uuid,
- tenant_name=new_mapping[_uuid]["name"],
- intra_authz_ext_id=new_mapping[_uuid]["authz"],
- intra_admin_ext_id=new_mapping[_uuid]["admin"]
+ data = self.manager.add_tenant_dict(
+ user_id=ADMIN_ID,
+ tenant_dict=new_mapping
)
- self.assertEquals(_uuid, data["id"])
- self.assertEquals(data["name"], new_mapping[_uuid]["name"])
- self.assertEquals(data["authz"], new_mapping[_uuid]["authz"])
- self.assertEquals(data["admin"], new_mapping[_uuid]["admin"])
- data = self.manager.get_tenants_dict()
+ self.assertEquals(new_mapping["id"], data["id"])
+ self.assertEquals(new_mapping["name"], data['tenant']["name"])
+ self.assertEquals(new_mapping["intra_authz_extension"], data['tenant']["intra_authz_extension"])
+ self.assertEquals(new_mapping["intra_admin_extension"], data['tenant']["intra_admin_extension"])
+ data = self.manager.get_tenants_dict(ADMIN_ID)
self.assertNotEqual(data, {})
- self.manager.delete(new_mapping[_uuid]["authz"])
- data = self.manager.get_tenants_dict()
+ self.manager.del_tenant(ADMIN_ID, new_mapping["id"])
+ data = self.manager.get_tenants_dict(ADMIN_ID)
self.assertEqual(data, {})
def test_set_tenant_name(self):
- _uuid = uuid.uuid4().hex
+ authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
+ admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
new_mapping = {
- _uuid: {
- "name": uuid.uuid4().hex,
- "authz": uuid.uuid4().hex,
- "admin": uuid.uuid4().hex,
- }
+ "id": uuid.uuid4().hex,
+ "name": "demo",
+ "description": uuid.uuid4().hex,
+ "intra_authz_extension": authz_intra_extension['id'],
+ "intra_admin_extension": admin_intra_extension['id'],
}
+ data = self.manager.add_tenant_dict(
+ user_id=ADMIN_ID,
+ tenant_dict=new_mapping
+ )
+ self.assertEquals(new_mapping["id"], data["id"])
+ self.assertEquals(new_mapping["name"], data['tenant']["name"])
+ self.assertEquals(new_mapping["intra_authz_extension"], data['tenant']["intra_authz_extension"])
+ self.assertEquals(new_mapping["intra_admin_extension"], data['tenant']["intra_admin_extension"])
+ data = self.manager.get_tenants_dict(ADMIN_ID)
+ self.assertNotEqual(data, {})
+
+ new_mapping["name"] = "demo2"
data = self.manager.set_tenant_dict(
- tenant_id=_uuid,
- tenant_name=new_mapping[_uuid]["name"],
- intra_authz_ext_id=new_mapping[_uuid]["authz"],
- intra_admin_ext_id=new_mapping[_uuid]["admin"]
+ user_id=ADMIN_ID,
+ tenant_id=new_mapping["id"],
+ tenant_dict=new_mapping
)
- self.assertEquals(_uuid, data["id"])
- self.assertEquals(data["name"], new_mapping[_uuid]["name"])
- data = self.manager.set_tenant_name(_uuid, "new name")
- self.assertEquals(_uuid, data["id"])
- self.assertEquals(data["name"], "new name")
- data = self.manager.get_tenant_name_from_id(_uuid)
- self.assertEquals(data, "new name")
+ self.assertEquals(new_mapping["id"], data["id"])
+ self.assertEquals(new_mapping["name"], data['tenant']["name"])
+ self.assertEquals(new_mapping["intra_authz_extension"], data['tenant']["intra_authz_extension"])
+ self.assertEquals(new_mapping["intra_admin_extension"], data['tenant']["intra_admin_extension"])
def test_get_tenant_intra_extension_id(self):
- _uuid = uuid.uuid4().hex
+ authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
+ admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
new_mapping = {
- _uuid: {
- "name": uuid.uuid4().hex,
- "authz": uuid.uuid4().hex,
- "admin": uuid.uuid4().hex,
- }
+ "id": uuid.uuid4().hex,
+ "name": "demo",
+ "description": uuid.uuid4().hex,
+ "intra_authz_extension": authz_intra_extension['id'],
+ "intra_admin_extension": admin_intra_extension['id'],
}
- data = self.manager.set_tenant_dict(
- tenant_id=_uuid,
- tenant_name=new_mapping[_uuid]["name"],
- intra_authz_ext_id=new_mapping[_uuid]["authz"],
- intra_admin_ext_id=new_mapping[_uuid]["admin"]
+ data = self.manager.add_tenant_dict(
+ user_id=ADMIN_ID,
+ tenant_dict=new_mapping
)
- self.assertEquals(_uuid, data["id"])
- data = self.manager.get_extension_id(_uuid)
- self.assertEqual(data, new_mapping[_uuid]["authz"])
- data = self.manager.get_extension_id(_uuid, "admin")
- self.assertEqual(data, new_mapping[_uuid]["admin"])
-
- def test_exception_tenantunknown(self):
- self.assertRaises(TenantNotFound, self.manager.get_tenant_name_from_id, uuid.uuid4().hex)
- self.assertRaises(TenantNotFound, self.manager.set_tenant_name, uuid.uuid4().hex, "new name")
- self.assertRaises(TenantNotFound, self.manager.get_extension_id, uuid.uuid4().hex)
- _uuid = uuid.uuid4().hex
+ self.assertEquals(new_mapping["id"], data["id"])
+ self.assertEquals(new_mapping["name"], data['tenant']["name"])
+ self.assertEquals(new_mapping["intra_authz_extension"], data['tenant']["intra_authz_extension"])
+ self.assertEquals(new_mapping["intra_admin_extension"], data['tenant']["intra_admin_extension"])
+ data = self.manager.get_tenants_dict(ADMIN_ID)
+ self.assertNotEqual(data, {})
+
+ def test_exception_tenant_unknown(self):
+ self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, ADMIN_ID, uuid.uuid4().hex)
+ self.assertRaises(TenantUnknown, self.manager.del_tenant, ADMIN_ID, uuid.uuid4().hex)
+ self.assertRaises(TenantUnknown, self.manager.set_tenant_dict, ADMIN_ID, uuid.uuid4().hex, {})
+
+ authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
+ admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
new_mapping = {
- _uuid: {
- "name": uuid.uuid4().hex,
- "authz": uuid.uuid4().hex,
- "admin": uuid.uuid4().hex,
- }
+ "id": uuid.uuid4().hex,
+ "name": "demo",
+ "description": uuid.uuid4().hex,
+ "intra_authz_extension": authz_intra_extension['id'],
+ "intra_admin_extension": admin_intra_extension['id'],
}
- data = self.manager.set_tenant_dict(
- tenant_id=_uuid,
- tenant_name=new_mapping[_uuid]["name"],
- intra_authz_ext_id=new_mapping[_uuid]["authz"],
- intra_admin_ext_id=""
+ data = self.manager.add_tenant_dict(
+ user_id=ADMIN_ID,
+ tenant_dict=new_mapping
+ )
+ self.assertEquals(new_mapping["id"], data["id"])
+ self.assertEquals(new_mapping["name"], data['tenant']["name"])
+ self.assertEquals(new_mapping["intra_authz_extension"], data['tenant']["intra_authz_extension"])
+ self.assertEquals(new_mapping["intra_admin_extension"], data['tenant']["intra_admin_extension"])
+ data = self.manager.get_tenants_dict(ADMIN_ID)
+ self.assertNotEqual(data, {})
+
+ self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, ADMIN_ID, uuid.uuid4().hex)
+
+ def test_exception_tenant_added_name_existing(self):
+ authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
+ admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
+ new_mapping = {
+ "id": uuid.uuid4().hex,
+ "name": "demo",
+ "description": uuid.uuid4().hex,
+ "intra_authz_extension": authz_intra_extension['id'],
+ "intra_admin_extension": admin_intra_extension['id'],
+ }
+ data = self.manager.add_tenant_dict(
+ user_id=ADMIN_ID,
+ tenant_dict=new_mapping
)
- self.assertEquals(_uuid, data["id"])
- self.assertRaises(IntraExtensionUnknown, self.manager.get_extension_id, _uuid, "admin")
- self.assertRaises(TenantNotFound, self.manager.get_tenant_uuid, uuid.uuid4().hex)
- # self.assertRaises(AdminIntraExtensionNotFound, self.manager.get_admin_extension_uuid, uuid.uuid4().hex)
+ self.assertEquals(new_mapping["id"], data["id"])
+ self.assertEquals(new_mapping["name"], data['tenant']["name"])
+ self.assertEquals(new_mapping["intra_authz_extension"], data['tenant']["intra_authz_extension"])
+ self.assertEquals(new_mapping["intra_admin_extension"], data['tenant']["intra_admin_extension"])
+ data = self.manager.get_tenants_dict(ADMIN_ID)
+ self.assertNotEqual(data, {})
+
+ self.assertRaises(TenantAddedNameExisting, self.manager.add_tenant_dict, ADMIN_ID, new_mapping)
- def test_exception_tenantaddednameexisting(self):
- pass
+ def test_exception_tenant_no_intra_extension(self):
+ authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
+ admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
+ new_mapping = {
+ "id": uuid.uuid4().hex,
+ "name": "demo",
+ "description": uuid.uuid4().hex,
+ "intra_authz_extension": authz_intra_extension['id'],
+ "intra_admin_extension": admin_intra_extension['id'],
+ }
+ new_mapping['intra_authz_extension'] = None
+ self.assertRaises(TenantNoIntraAuthzExtension, self.manager.add_tenant_dict, ADMIN_ID, new_mapping)
+ new_mapping['intra_authz_extension'] = authz_intra_extension['id']
+ data = self.manager.add_tenant_dict(
+ user_id=ADMIN_ID,
+ tenant_dict=new_mapping
+ )
+ self.assertEquals(new_mapping["id"], data["id"])
+ self.assertEquals(new_mapping["name"], data['tenant']["name"])
+ self.assertEquals(new_mapping["intra_authz_extension"], data['tenant']["intra_authz_extension"])
+ self.assertEquals(new_mapping["intra_admin_extension"], data['tenant']["intra_admin_extension"])
+ data = self.manager.get_tenants_dict(ADMIN_ID)
+ self.assertNotEqual(data, {})
- def test_exception_tenantnointraextension(self):
- pass
\ No newline at end of file
+ new_mapping['intra_authz_extension'] = None
+ new_mapping['name'] = "demo2"
+ self.assertRaises(TenantNoIntraAuthzExtension, self.manager.set_tenant_dict, ADMIN_ID, new_mapping["id"], new_mapping)