intra_admin_extension_id = tenants_dict[tenant_id]['intra_admin_extension_id']
tenant_name = tenants_dict[tenant_id]['name']
- # func.func_globals["_admin_extension_uuid"] = _admin_extension_uuid
if not intra_admin_extension_id:
- raise TenantNoIntraAdminExtension()
+ args[0].moonlog_api.warning("No admin IntraExtension found, authorization granted by default.")
return func(*args)
else:
authz_result = False
f = open(metadata_path)
json_perimeter = json.load(f)
- subject_categories_dict = dict()
+ # subject_categories_dict = dict()
for _cat in json_perimeter['subject_categories']:
- subject_categories_dict[uuid4().hex] = {"name": _cat}
- self.driver.set_subject_category_dict(intra_extension_dict["id"], subject_categories_dict)
+ self.driver.set_subject_category_dict(intra_extension_dict["id"], uuid4().hex,
+ {"name": _cat, "description": _cat})
# Initialize scope categories
- for _cat in subject_categories_dict.keys():
- self.driver.set_subject_scope_dict(intra_extension_dict["id"], _cat, {})
- intra_extension_dict['subject_categories'] = subject_categories_dict
+ # for _cat in subject_categories_dict.keys():
+ # self.driver.set_subject_scope_dict(intra_extension_dict["id"], _cat, {})
+ # intra_extension_dict['subject_categories'] = subject_categories_dict
- object_categories_dict = dict()
+ # object_categories_dict = dict()
for _cat in json_perimeter['object_categories']:
- object_categories_dict[uuid4().hex] = {"name": _cat}
- self.driver.set_object_category_dict(intra_extension_dict["id"], object_categories_dict)
+ self.driver.set_object_category_dict(intra_extension_dict["id"], uuid4().hex,
+ {"name": _cat, "description": _cat})
# Initialize scope categories
- for _cat in object_categories_dict.keys():
- self.driver.set_object_scope_dict(intra_extension_dict["id"], _cat, {})
- intra_extension_dict['object_categories'] = object_categories_dict
+ # for _cat in object_categories_dict.keys():
+ # self.driver.set_object_scope_dict(intra_extension_dict["id"], _cat, {})
+ # intra_extension_dict['object_categories'] = object_categories_dict
- action_categories_dict = dict()
+ # action_categories_dict = dict()
for _cat in json_perimeter['action_categories']:
- action_categories_dict[uuid4().hex] = {"name": _cat}
- self.driver.set_action_category_dict(intra_extension_dict["id"], action_categories_dict)
+ self.driver.set_action_category_dict(intra_extension_dict["id"], uuid4().hex,
+ {"name": _cat, "description": _cat})
# Initialize scope categories
- for _cat in action_categories_dict.keys():
- self.driver.set_action_scope_dict(intra_extension_dict["id"], _cat, {})
- intra_extension_dict['action_categories'] = action_categories_dict
+ # for _cat in action_categories_dict.keys():
+ # self.driver.set_action_scope_dict(intra_extension_dict["id"], _cat, {})
+ # intra_extension_dict['action_categories'] = action_categories_dict
def __load_perimeter_file(self, intra_extension_dict, policy_dir):
for _subject in json_perimeter['subjects']:
user = self.identity_api.get_user_by_name(_subject, "default")
subject_dict[user["id"]] = user
- self.driver.set_subject_dict(intra_extension_dict["id"], subject_dict)
+ subject_dict[user["id"]].pop("id")
+ self.driver.set_subject_dict(intra_extension_dict["id"], user["id"])
intra_extension_dict["subjects"] = subject_dict
- # Copy all values for objects and subjects
+ # Copy all values for objects and actions
object_dict = dict()
for _object in json_perimeter['objects']:
- object_dict[uuid4().hex] = {"name": _object}
- self.driver.set_object_dict(intra_extension_dict["id"], object_dict)
+ _id = uuid4().hex
+ object_dict[_id] = {"name": _object, "description": _object}
+ self.driver.set_object_dict(intra_extension_dict["id"], _id, object_dict[_id])
intra_extension_dict["objects"] = object_dict
action_dict = dict()
for _action in json_perimeter['actions']:
- action_dict[uuid4().hex] = {"name": _action}
- self.driver.set_action_dict(intra_extension_dict["id"], action_dict)
+ _id = uuid4().hex
+ action_dict[_id] = {"name": _action, "description": _action}
+ self.driver.set_action_dict(intra_extension_dict["id"], _id, action_dict[_id])
intra_extension_dict["ations"] = action_dict
def __load_scope_file(self, intra_extension_dict, policy_dir):
intra_extension_dict['subject_category_scope'] = dict()
for category, scope in json_perimeter["subject_category_scope"].iteritems():
- category = self.driver.get_uuid_from_name(intra_extension_dict["id"], category, self.driver.SUBJECT_CATEGORY)
+ category_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], category, self.driver.SUBJECT_CATEGORY)
_scope_dict = dict()
for _scope in scope:
- _scope_dict[uuid4().hex] = {"name": _scope}
- self.driver.set_subject_scope_dict(intra_extension_dict["id"], category, _scope_dict)
+ _id = uuid4().hex
+ _scope_dict[_id] = {"name": _scope, "description": _scope}
+ self.driver.set_subject_scope_dict(intra_extension_dict["id"], category_id, _id, _scope_dict[_id])
intra_extension_dict['subject_category_scope'][category] = _scope_dict
intra_extension_dict['object_category_scope'] = dict()
for category, scope in json_perimeter["object_category_scope"].iteritems():
- category = self.driver.get_uuid_from_name(intra_extension_dict["id"], category, self.driver.OBJECT_CATEGORY)
+ category_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], category, self.driver.OBJECT_CATEGORY)
_scope_dict = dict()
for _scope in scope:
- _scope_dict[uuid4().hex] = {"name": _scope}
- self.driver.set_object_scope_dict(intra_extension_dict["id"], category, _scope_dict)
+ _id = uuid4().hex
+ _scope_dict[_id] = {"name": _scope, "description": _scope}
+ self.driver.set_object_scope_dict(intra_extension_dict["id"], category_id, _id, _scope_dict[_id])
intra_extension_dict['object_category_scope'][category] = _scope_dict
intra_extension_dict['action_category_scope'] = dict()
for category, scope in json_perimeter["action_category_scope"].iteritems():
- category = self.driver.get_uuid_from_name(intra_extension_dict["id"], category, self.driver.ACTION_CATEGORY)
+ category_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], category, self.driver.ACTION_CATEGORY)
_scope_dict = dict()
for _scope in scope:
- _scope_dict[uuid4().hex] = {"name": _scope}
- self.driver.set_action_scope_dict(intra_extension_dict["id"], category, _scope_dict)
+ _id = uuid4().hex
+ _scope_dict[_id] = {"name": _scope, "description": _scope}
+ self.driver.set_action_scope_dict(intra_extension_dict["id"], category_id, _scope_dict[_id])
intra_extension_dict['action_category_scope'][category] = _scope_dict
def __load_assignment_file(self, intra_extension_dict, policy_dir):
subject_assignments = dict()
for category_name, value in json_assignments['subject_assignments'].iteritems():
- category = self.driver.get_uuid_from_name(intra_extension_dict["id"], category_name, self.driver.SUBJECT_CATEGORY)
+ category_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], category_name, self.driver.SUBJECT_CATEGORY)
for user_name in value:
- user_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], user_name, self.driver.SUBJECT)
- if user_id not in subject_assignments:
- subject_assignments[user_id] = dict()
- if category not in subject_assignments[user_id]:
- subject_assignments[user_id][category] = \
+ subject_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], user_name, self.driver.SUBJECT)
+ if subject_id not in subject_assignments:
+ subject_assignments[subject_id] = dict()
+ if category_id not in subject_assignments[subject_id]:
+ subject_assignments[subject_id][category_id] = \
map(lambda x: self.driver.get_uuid_from_name(intra_extension_dict["id"], x, self.driver.SUBJECT_SCOPE, category_name),
value[user_name])
else:
- subject_assignments[user_id][category].extend(
+ subject_assignments[subject_id][category_id].extend(
map(lambda x: self.driver.get_uuid_from_name(intra_extension_dict["id"], x, self.driver.SUBJECT_SCOPE, category_name),
value[user_name])
)
- # Note (dthom): subject_category_assignment must be initialized because when there is no data in json
- # we will not go through the for loop
- self.driver.set_subject_assignment_list(intra_extension_dict["id"])
- for subject in subject_assignments:
- self.driver.set_subject_assignment_list(intra_extension_dict["id"], subject, subject_assignments[subject])
+ self.driver.set_subject_assignment_list(intra_extension_dict["id"], subject_id, category_id,
+ subject_assignments[subject_id][category_id])
object_assignments = dict()
for category_name, value in json_assignments["object_assignments"].iteritems():
- category = self.driver.get_uuid_from_name(intra_extension_dict["id"], category_name, self.driver.OBJECT_CATEGORY)
+ category_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], category_name, self.driver.OBJECT_CATEGORY)
for object_name in value:
+ object_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], object_name, self.driver.OBJECT)
if object_name not in object_assignments:
- object_assignments[object_name] = dict()
- if category not in object_assignments[object_name]:
- object_assignments[object_name][category] = \
+ object_assignments[object_id] = dict()
+ if category_id not in object_assignments[object_name]:
+ object_assignments[object_id][category_id] = \
map(lambda x: self.driver.get_uuid_from_name(intra_extension_dict["id"], x, self.driver.OBJECT_SCOPE, category_name),
value[object_name])
else:
- object_assignments[object_name][category].extend(
+ object_assignments[object_id][category_id].extend(
map(lambda x: self.driver.get_uuid_from_name(intra_extension_dict["id"], x, self.driver.OBJECT_SCOPE, category_name),
value[object_name])
)
- # Note (dthom): object_category_assignment must be initialized because when there is no data in json
- # we will not go through the for loop
- self.driver.set_object_assignment_list(intra_extension_dict["id"])
- for object in object_assignments:
- self.driver.set_object_assignment_list(intra_extension_dict["id"], object, object_assignments[object])
+ self.driver.set_object_assignment_list(intra_extension_dict["id"], object_id, category_id,
+ object_assignments[object_id][category_id])
action_assignments = dict()
for category_name, value in json_assignments["action_assignments"].iteritems():
- category = self.driver.get_uuid_from_name(intra_extension_dict["id"], category_name, self.driver.ACTION_CATEGORY)
+ category_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], category_name, self.driver.ACTION_CATEGORY)
for action_name in value:
+ action_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], action_name, self.driver.ACTION)
if action_name not in action_assignments:
- action_assignments[action_name] = dict()
- if category not in action_assignments[action_name]:
- action_assignments[action_name][category] = \
+ action_assignments[action_id] = dict()
+ if category_id not in action_assignments[action_name]:
+ action_assignments[action_id][category_id] = \
map(lambda x: self.driver.get_uuid_from_name(intra_extension_dict["id"], x, self.driver.ACTION_SCOPE, category_name),
value[action_name])
else:
- action_assignments[action_name][category].extend(
+ action_assignments[action_id][category_id].extend(
map(lambda x: self.driver.get_uuid_from_name(intra_extension_dict["id"], x, self.driver.ACTION_SCOPE, category_name),
value[action_name])
)
- # Note (dthom): action_category_assignment must be initialized because when there is no data in json
- # we will not go through the for loop
- self.driver.set_action_assignment_list(intra_extension_dict["id"])
- for action in action_assignments:
- self.driver.set_action_assignment_list(intra_extension_dict["id"], action, action_assignments[action])
+ self.driver.set_action_assignment_list(intra_extension_dict["id"], action_id, category_id,
+ action_assignments[action_id][category_id])
def __load_metarule_file(self, intra_extension_dict, policy_dir):
"action_categories": self.driver.ACTION_CATEGORY
}
# Translate value from JSON file to UUID for Database
- for relation in json_metarule["sub_meta_rules"]:
- metarule[relation] = dict()
+ for metarule_name in json_metarule["sub_meta_rules"]:
+ _id = uuid4().hex
+ metarule[_id] = dict()
+ metarule[_id]["name"] = metarule_name
for item in ("subject_categories", "object_categories", "action_categories"):
- metarule[relation][item] = list()
- for element in json_metarule["sub_meta_rules"][relation][item]:
- metarule[relation][item].append(self.driver.get_uuid_from_name(intra_extension_dict["id"], element, categories[item]))
- metarule[relation]["algorithm"] = json_metarule["sub_meta_rules"][relation]["algorithm"]
+ metarule[_id][item] = list()
+ for element in json_metarule["sub_meta_rules"][metarule_name][item]:
+ metarule[[_id]][item].append(self.driver.get_uuid_from_name(intra_extension_dict["id"], element, categories[item]))
+ metarule[[_id]]["algorithm"] = json_metarule["sub_meta_rules"][metarule_name]["algorithm"]
+ self.driver.set_sub_meta_rule_dict(intra_extension_dict["id"], _id, metarule[[_id]])
submetarules = {
"aggregation": json_metarule["aggregation"],
"sub_meta_rules": metarule
}
- self.driver.set_sub_meta_rule_dict(intra_extension_dict["id"], submetarules)
+ self.driver.set_aggregation_algorithm(intra_extension_dict["id"], uuid4().hex,
+ {
+ "name": json_metarule["aggregation"],
+ "description": json_metarule["aggregation"],
+ })
def __load_rule_file(self, intra_extension_dict, policy_dir):
# Translate value from JSON file to UUID for Database
rules = dict()
sub_meta_rules = self.driver.get_sub_meta_rules_dict(intra_extension_dict["id"])
- for relation in json_rules:
- # print(relation)
+ for sub_rule_name in json_rules:
+ sub_rule_id = self.driver.get_uuid_from_name(intra_extension_dict["id"],
+ sub_rule_name,
+ self.driver.SUB_META_RULE)
+ # print(sub_rule_name)
# print(self.get_sub_meta_rule_relations("admin", ie["id"]))
- # if relation not in self.get_sub_meta_rule_relations("admin", ie["id"])["sub_meta_rule_relations"]:
- # raise IntraExtensionException("Bad relation name {} in rules".format(relation))
- rules[relation] = list()
- for rule in json_rules[relation]:
+ # if sub_rule_name not in self.get_sub_meta_rule_relations("admin", ie["id"])["sub_meta_rule_relations"]:
+ # raise IntraExtensionException("Bad sub_rule_name name {} in rules".format(sub_rule_name))
+ rules[sub_rule_id] = list()
+ for rule in json_rules[sub_rule_name]:
subrule = list()
_rule = list(rule)
- for category_uuid in sub_meta_rules["rule"][relation]["subject_categories"]:
+ for category_uuid in sub_meta_rules["rule"][sub_rule_name]["subject_categories"]:
scope_name = _rule.pop(0)
scope_uuid = self.driver.get_uuid_from_name(intra_extension_dict["id"],
scope_name,
self.driver.SUBJECT_SCOPE,
category_uuid=category_uuid)
subrule.append(scope_uuid)
- for category_uuid in sub_meta_rules["rule"][relation]["action_categories"]:
+ for category_uuid in sub_meta_rules["rule"][sub_rule_name]["action_categories"]:
scope_name = _rule.pop(0)
scope_uuid = self.driver.get_uuid_from_name(intra_extension_dict["id"],
scope_name,
self.driver.ACTION_SCOPE,
category_uuid=category_uuid)
subrule.append(scope_uuid)
- for category_uuid in sub_meta_rules["rule"][relation]["object_categories"]:
+ for category_uuid in sub_meta_rules["rule"][sub_rule_name]["object_categories"]:
scope_name = _rule.pop(0)
scope_uuid = self.driver.get_uuid_from_name(intra_extension_dict["id"],
scope_name,
self.driver.OBJECT_SCOPE,
category_uuid=category_uuid)
subrule.append(scope_uuid)
- # for cat, cat_func, cat_func_cat in (
- # ("subject_categories", self.driver.get_uuid_from_name, self.driver.SUBJECT_SCOPE),
- # ("action_categories", self.driver.ACTION_SCOPE),
- # ("object_categories", self.driver.OBJECT_SCOPE),
- # ):
- # for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]:
- # scope = cat_func(
- # ie["id"],
- # cat_value,
- # cat_func_cat
- # )[cat_func.__name__.replace("get_", "").replace("_dict", "")]
- #
- # _ = rule.pop(0)
- # a_scope = self.driver.get_uuid_from_name(ie["id"], _, scope[cat_value])
- # subrule.append(a_scope)
# if a positive/negative value exists, all item of rule have not be consumed
if len(rule) >= 1 and type(rule[0]) is bool:
subrule.append(rule[0])
else:
# if value doesn't exist add a default value
subrule.append(True)
- rules[relation].append(subrule)
- self.driver.set_rule_dict(intra_extension_dict["id"], rules)
+ rules[sub_rule_id].append(subrule)
+ self.driver.set_rule_dict(intra_extension_dict["id"], sub_rule_id, uuid4().hex, rules)
def load_intra_extension_dict(self, user_id, intra_extension_dict):
# TODO: check will be done through super_extension later
def del_subject_category(self, user_id, intra_extension_id, subject_category_id):
if subject_category_id not in self.driver.get_subject_categories_dict(intra_extension_id):
raise SubjectCategoryUnknown()
- # TODO (dthom): destroy category in scope
- # self.driver.destroy_subject_category_in_scope(intra_extension_id, subject_category_id)
- # TODO (dthom): destroy category-related assignment in assignments
- # self.driver.destroy_subject_category_in_assignement(intra_extension_id, subject_category_id)
- return self.driver.del_subject_category(intra_extension_id, subject_category_id)
+ # Destroy scopes related to this category
+ for scope in self.driver.get_subject_scope_dict(intra_extension_id, subject_category_id):
+ self.del_subject_scope(intra_extension_id, subject_category_id, scope)
+ # Destroy assignments related to this category
+ for subject_id in self.driver.get_subjects_dict(intra_extension_id):
+ for assignment_id in self.driver.get_subject_assignment_list(intra_extension_id, subject_id, subject_category_id):
+ self.driver.del_subject_assignment(intra_extension_id, subject_id, subject_category_id, assignment_id)
+ self.driver.del_subject_category(intra_extension_id, subject_category_id)
@filter_args
@enforce(("read", "write"), "subject_categories")
def set_subject_category(self, user_id, intra_extension_id, subject_category_id, subject_category_dict):
- for subject_category_id not in self.driver.get_subject_categories_dict(intra_extension_id):
+ if subject_category_id not in self.driver.get_subject_categories_dict(intra_extension_id):
raise SubjectCategoryUnknown()
return self.driver.set_subject_category(intra_extension_id, subject_category_id, subject_category_dict)
for object_category_id in object_category_dict:
if object_category_dict[object_category_id]["name"] is object_category_name:
raise ObjectCategoryNameExisting()
- object_category_id = uuid4().hex
- # TODO (dthom): create category in scope
- # self.driver.create_object_category_in_scope(intra_extension_id, object_category_id)
- return self.driver.add_object_category(intra_extension_id, object_category_id, object_category_name)
+ return self.driver.add_object_category(intra_extension_id, uuid4().hex, object_category_name)
@filter_args
@enforce("read", "object_categories")
@enforce(("read", "write"), "object_scopes")
@enforce(("read", "write"), "object_assignments")
def del_object_category(self, user_id, intra_extension_id, object_category_id):
- object_category_dict = self.driver.get_object_categories_dict(intra_extension_id)
- if object_category_id not in object_category_dict:
+ if object_category_id not in self.driver.get_object_categories_dict(intra_extension_id):
raise ObjectCategoryUnknown()
- # TODO (dthom): destroy category in scope
- # self.driver.destroy_object_category_in_scope(intra_extension_id, object_category_id)
- # TODO (dthom): destroy category-related assignment in assignments
- # self.driver.destroy_object_category_in_assignement(intra_extension_id, object_category_id)
- return self.driver.del_object_category(intra_extension_id, object_category_id)
+ # Destroy scopes related to this category
+ for scope in self.driver.get_object_scopes_dict(intra_extension_id, object_category_id):
+ self.del_object_scope(intra_extension_id, object_category_id, scope)
+ # Destroy assignments related to this category
+ for object_id in self.driver.get_objects_dict(intra_extension_id):
+ for assignment_id in self.driver.get_object_assignment_list(intra_extension_id, object_id, object_category_id):
+ self.driver.del_object_assignment(intra_extension_id, object_id, object_category_id, assignment_id)
+ self.driver.del_object_category(intra_extension_id, object_category_id)
@filter_args
@enforce("read", "action_categories")
for action_category_id in action_category_dict:
if action_category_dict[action_category_id]['name'] is action_category_name:
raise ActionCategoryNameExisting()
- action_category_id = uuid4().hex
- # TODO (dthom): create category in scope
- # self.driver.create_action_category_in_scope(intra_extension_id, action_category_id)
- return self.driver.add_action_category(intra_extension_id, action_category_id, action_category_name)
+ return self.driver.add_action_category(intra_extension_id, uuid4().hex, action_category_name)
@filter_args
@enforce("read", "action_categories")
@enforce(("read", "write"), "action_categories")
@enforce(("read", "write"), "action_category_scopes")
def del_action_category(self, user_id, intra_extension_id, action_category_id):
- action_category_dict = self.driver.get_action_categories_dict(intra_extension_id)
- if action_category_id not in action_category_dict:
+ if action_category_id not in self.driver.get_action_categories_dict(intra_extension_id):
raise ActionCategoryUnknown()
- # TODO (dthom): destroy category in scope
- # self.driver.destroy_action_category_in_scope(intra_extension_id, action_category_id)
- # TODO (dthom): destroy category-related assignment in assignement
- # self.driver.destroy_action_category_in_assignement(intra_extension_id, action_category_id)
- return self.driver.del_action_category(intra_extension_id, action_category_id)
+ # Destroy scopes related to this category
+ for scope in self.driver.get_action_scopes_dict(intra_extension_id, action_category_id):
+ self.del_action_scope(intra_extension_id, action_category_id, scope)
+ # Destroy assignments related to this category
+ for action_id in self.driver.get_actions_dict(intra_extension_id):
+ for assignment_id in self.driver.get_action_assignment_list(intra_extension_id, action_id, action_category_id):
+ self.driver.del_action_assignment(intra_extension_id, action_id, action_category_id, assignment_id)
+ self.driver.del_action_category(intra_extension_id, action_category_id)
# Perimeter functions
def del_subject(self, user_id, intra_extension_id, subject_id):
if subject_id in self.driver.get_subjects_dict(intra_extension_id):
raise SubjectUnknown()
- # TODO (dthom): destroy item-related assignment
+ # Destroy assignments related to this category
+ for subject_category_id in self.driver.get_subject_categories_dict(intra_extension_id):
+ for _subject_id in self.driver.get_subjects_dict(intra_extension_id):
+ for assignment_id in self.driver.get_subject_assignment_list(intra_extension_id, _subject_id, subject_category_id):
+ self.driver.del_subject_assignment(intra_extension_id, _subject_id, subject_category_id, assignment_id)
self.driver.del_subject(intra_extension_id, subject_id)
@filter_args
@filter_args
@enforce("read", "objects")
- def get_object_dict(self, user_id, intra_extension_id):
+ def get_objects_dict(self, user_id, intra_extension_id):
return self.driver.get_objects_dict(intra_extension_id)
@filter_args
@enforce(("read", "write"), "objects")
- def add_object(self, user_id, intra_extension_id, object_name):
+ def add_object_dict(self, user_id, intra_extension_id, object_name):
object_dict = self.driver.get_objects_dict(intra_extension_id)
for object_id in object_dict:
if object_dict[object_id]["name"] is object_name:
raise ObjectNameExisting()
- object_id = uuid4().hex
- return self.driver.add_object(intra_extension_id, object_id, object_name)
+ return self.driver.set_object_dict(intra_extension_id, uuid4().hex, object_name)
+
+ @filter_args
+ @enforce(("read", "write"), "objects")
+ def set_object_dict(self, user_id, intra_extension_id, object_id, object_dict):
+ objects_dict = self.driver.get_objects_dict(intra_extension_id)
+ for object_id in objects_dict:
+ if objects_dict[object_id]["name"] is object_dict['name']:
+ raise ObjectNameExisting()
+ return self.driver.set_object_dict(intra_extension_id, object_id, object_dict)
@filter_args
@enforce("read", "objects")
- def get_object(self, user_id, intra_extension_id, object_id):
+ def get_object_dict(self, user_id, intra_extension_id, object_id):
object_dict = self.driver.get_objects_dict(intra_extension_id)
if object_id in object_dict:
raise ObjectUnknown()
def del_object(self, user_id, intra_extension_id, object_id):
if object_id in self.driver.get_objects_dict(intra_extension_id):
raise ObjectUnknown()
- # TODO (dthom): destroy item-related assignment
- return self.driver.del_object(intra_extension_id, object_id)
+ # Destroy assignments related to this category
+ for object_category_id in self.driver.get_object_categories_dict(intra_extension_id):
+ for _object_id in self.driver.get_objects_dict(intra_extension_id):
+ for assignment_id in self.driver.get_object_assignment_list(intra_extension_id, _object_id, object_category_id):
+ self.driver.del_object_assignment(intra_extension_id, _object_id, object_category_id, assignment_id)
+ self.driver.del_object(intra_extension_id, object_id)
@filter_args
@enforce("read", "actions")
- def get_action_dict(self, user_id, intra_extension_id):
+ def get_actions_dict(self, user_id, intra_extension_id):
return self.driver.get_actions_dict(intra_extension_id)
@filter_args
@enforce(("read", "write"), "actions")
- def add_action(self, user_id, intra_extension_id, action_name):
+ def add_action_dict(self, user_id, intra_extension_id, action_name):
action_dict = self.driver.get_actions_dict(intra_extension_id)
for action_id in action_dict:
if action_dict[action_id]["name"] is action_name:
raise ActionNameExisting()
- action_id = uuid4().hex
- return self.driver.add_action(intra_extension_id, action_id, action_name)
+ return self.driver.add_action_dict(intra_extension_id, uuid4().hex, action_name)
+
+ @filter_args
+ @enforce(("read", "write"), "actions")
+ def set_action_dict(self, user_id, intra_extension_id, action_id, action_dict):
+ actions_dict = self.driver.get_actions_dict(intra_extension_id)
+ for action_id in actions_dict:
+ if actions_dict[action_id]["name"] is action_dict['name']:
+ raise ActionNameExisting()
+ return self.driver.set_action_dict(intra_extension_id, action_id, action_dict)
@filter_args
@enforce("read", "actions")
- def get_action(self, user_id, intra_extension_id, action_id):
+ def get_action_dict(self, user_id, intra_extension_id, action_id):
action_dict = self.driver.get_actions_dict(intra_extension_id)
if action_id in action_dict:
raise ActionUnknown()
def del_action(self, user_id, intra_extension_id, action_id):
if action_id in self.driver.get_actions_dict(intra_extension_id):
raise ActionUnknown()
- # TODO (dthom): destroy item-related assignment
+ # Destroy assignments related to this category
+ for action_category_id in self.driver.get_action_categories_dict(intra_extension_id):
+ for _action_id in self.driver.get_actions_dict(intra_extension_id):
+ for assignment_id in self.driver.get_action_assignment_list(intra_extension_id, _action_id, action_category_id):
+ self.driver.del_action_assignment(intra_extension_id, _action_id, action_category_id, assignment_id)
return self.driver.del_action(intra_extension_id, action_id)
# Scope functions
raise SubjectCategoryUnknown()
if subject_scope_id not in self.driver.get_subject_scopes_dict(intra_extension_id, subject_category_id):
raise SubjectScopeUnknown()
- # TODO (dthom): destroy scope-related assignment
- # TODO (dthom): destroy scope-related rule
+ # Destroy scope-related assignment
+ for subject_id in self.driver.get_subjects_dict(intra_extension_id):
+ for assignment_id in self.driver.get_subject_assignment_list(intra_extension_id, subject_id, subject_category_id):
+ self.driver.del_subject_assignment(intra_extension_id, subject_id, subject_category_id, assignment_id)
+ # Destroy scope-related rule
+ for sub_meta_rule_id in self.driver.get_sub_meta_rules_dict(intra_extension_id):
+ rules_dict = self.driver.get_rules_dict(intra_extension_id, sub_meta_rule_id)
+ for rule_id in rules_dict:
+ if subject_scope_id in rules_dict[rule_id]:
+ self.driver.del_rule(intra_extension_id, sub_meta_rule_id, rule_id)
return self.driver.del_subject_scope(intra_extension_id, subject_category_id, subject_scope_id)
@filter_args
@filter_args
@enforce("read", "object_category_scopes")
@enforce("read", "object_categories")
- def get_object_scope_dict(self, user_id, intra_extension_id, object_category_id):
+ def get_object_scopes_dict(self, user_id, intra_extension_id, object_category_id):
if object_category_id not in self.driver.get_object_categories_dict(intra_extension_id):
raise ObjectCategoryUnknown()
return self.driver.get_object_scopes_dict(intra_extension_id, object_category_id)
@filter_args
@enforce(("read", "write"), "object_scopes")
@enforce("read", "object_categories")
- def add_object_scope(self, user_id, intra_extension_id, object_category_id, object_scope_name):
+ def add_object_scope_dict(self, user_id, intra_extension_id, object_category_id, object_scope_name):
if object_category_id not in self.driver.get_object_categories_dict(intra_extension_id):
raise ObjectCategoryUnknown()
object_scope_dict = self.driver.get_object_scopes_dict(intra_extension_id, object_category_id)
@filter_args
@enforce("read", "object_scopes")
@enforce("read", "object_categories")
- def get_object_scope(self, user_id, intra_extension_id, object_category_id, object_scope_id):
+ def get_object_scope_dict(self, user_id, intra_extension_id, object_category_id, object_scope_id):
if object_category_id not in self.driver.get_object_categories_dict(intra_extension_id):
raise ObjectCategoryUnknown()
object_scopte_dict = self.driver.get_object_scopes_dict(intra_extension_id, object_category_id)
raise ObjectCategoryUnknown()
if object_scope_id not in self.driver.get_object_scopes_dict(intra_extension_id, object_category_id):
raise ObjectScopeUnknown()
- # TODO (dthom): destroy scope-related assignment
- # TODO (dthom): destroy scope-related rule
+ # Destroy scope-related assignment
+ for object_id in self.driver.get_objects_dict(intra_extension_id):
+ for assignment_id in self.driver.get_object_assignment_list(intra_extension_id, object_id, object_category_id):
+ self.driver.del_object_assignment(intra_extension_id, object_id, object_category_id, assignment_id)
+ # Destroy scope-related rule
+ for sub_meta_rule_id in self.driver.get_sub_meta_rules_dict(intra_extension_id):
+ rules_dict = self.driver.get_rules_dict(intra_extension_id, sub_meta_rule_id)
+ for rule_id in rules_dict:
+ if object_scope_id in rules_dict[rule_id]:
+ self.driver.del_rule(intra_extension_id, sub_meta_rule_id, rule_id)
return self.driver.del_object_scope(intra_extension_id, object_category_id, object_scope_id)
+ @filter_args
+ @enforce(("read", "write"), "object_scopes")
+ @enforce("read", "object_categories")
+ def set_object_scope_dict(self, user_id, intra_extension_id, object_category_id, object_scope_id, object_scope_name):
+ if object_category_id not in self.driver.get_object_categories_dict(intra_extension_id):
+ raise ObjectCategoryUnknown()
+ object_scope_dict = self.driver.get_object_scopes_dict(intra_extension_id, object_category_id)
+ for _object_scope_id in object_scope_dict:
+ if object_scope_name is object_scope_dict[_object_scope_id]['name']:
+ raise ObjectScopeNameExisting()
+ return self.driver.set_object_scope_dict(intra_extension_id, object_category_id, uuid4().hex, object_scope_dict)
+
@filter_args
@enforce("read", "action_category_scopes")
@enforce("read", "action_categories")
- def get_action_scope_dict(self, user_id, intra_extension_id, action_category_id):
+ def get_action_scopes_dict(self, user_id, intra_extension_id, action_category_id):
if action_category_id not in self.driver.get_object_categories_dict(intra_extension_id):
raise ActionCategoryUnknown()
return self.driver.get_action_scopes_dict(intra_extension_id, action_category_id)
@filter_args
@enforce(("read", "write"), "action_scopes")
@enforce("read", "action_categories")
- def add_action_scope(self, user_id, intra_extension_id, action_category_id, action_scope_name):
+ def add_action_scope_dict(self, user_id, intra_extension_id, action_category_id, action_scope_name):
if action_category_id not in self.driver.get_action_categories_dict(intra_extension_id):
raise ActionCategoryUnknown()
action_scope_dict = self.driver.get_action_scopes_dict(intra_extension_id, action_category_id)
if action_scope_name is action_scope_dict[_action_scope_id]['name']:
raise ActionScopeNameExisting()
action_scope_id = uuid4().hex
- return self.driver.add_action_scope(
+ return self.driver.add_action_scope_dict(
intra_extension_id,
action_category_id,
action_scope_id,
@filter_args
@enforce("read", "action_scopes")
@enforce("read", "action_categories")
- def get_action_scope(self, user_id, intra_extension_id, action_category_id, action_scope_id):
+ def get_action_scope_dict(self, user_id, intra_extension_id, action_category_id, action_scope_id):
if action_category_id not in self.driver.get_action_categories_dict(intra_extension_id):
raise ActionCategoryUnknown()
action_scopte_dict = self.driver.get_action_scopes_dict(intra_extension_id, action_category_id)
raise ActionCategoryUnknown()
if action_scope_id not in self.driver.get_action_scopes_dict(intra_extension_id, action_category_id):
raise ActionScopeUnknown()
- # TODO (dthom): destroy scope-related assignment
- # TODO (dthom): destroy scope-related rule
+ # Destroy scope-related assignment
+ for action_id in self.driver.get_actions_dict(intra_extension_id):
+ for assignment_id in self.driver.get_action_assignment_list(intra_extension_id, action_id, action_category_id):
+ self.driver.del_action_assignment(intra_extension_id, action_id, action_category_id, assignment_id)
+ # Destroy scope-related rule
+ for sub_meta_rule_id in self.driver.get_sub_meta_rules_dict(intra_extension_id):
+ rules_dict = self.driver.get_rules_dict(intra_extension_id, sub_meta_rule_id)
+ for rule_id in rules_dict:
+ if action_scope_id in rules_dict[rule_id]:
+ self.driver.del_rule(intra_extension_id, sub_meta_rule_id, rule_id)
return self.driver.del_action_scope(intra_extension_id, action_category_id, action_scope_id)
# Assignment functions
+ @filter_args
+ @enforce("read", "subject_assignments")
+ @enforce("read", "subjects")
+ @enforce("read", "subject_categories")
+ def get_subject_assignment_list(self, user_id, intra_extension_id, subject_id, subject_category_id):
+ if subject_id not in self.driver.get_subjects_dict(user_id, intra_extension_id):
+ raise SubjectUnknown()
+ elif subject_category_id not in self.driver.get_subject_categories_dict(intra_extension_id):
+ raise SubjectCategoryUnknown()
+ return self.driver.get_subject_assignment_list(intra_extension_id, subject_id, subject_category_id)
+
@filter_args
@enforce(("read", "write"), "subject_assignments")
@enforce("read", "subjects")
@enforce("read", "subject_categories")
@enforce("read", "subject_scopes")
- def add_subject_assignment(self, user_id, intra_extension_id, subject_id, subject_category_id, subject_scope_id):
+ def add_subject_assignment_list(self, user_id, intra_extension_id, subject_id, subject_category_id, subject_scope_id):
if subject_id not in self.driver.get_subjects_dict(intra_extension_id):
raise SubjectUnknown()
if subject_category_id not in self.driver.get_subject_categories_dict(intra_extension_id):
raise SubjectAssignmentExisting()
return self.driver.add_subject_assignment_list(intra_extension_id, subject_id, subject_category_id, subject_scope_id)
- @filter_args
- @enforce("read", "subject_assignments")
- @enforce("read", "subjects")
- @enforce("read", "subject_categories")
- def get_subject_assignment_list(self, user_id, intra_extension_id, subject_id, subject_category_id):
- if subject_id not in self.driver.get_subjects_dict(user_id, intra_extension_id):
- raise SubjectUnknown()
- elif subject_category_id not in self.driver.get_subject_categories_dict(intra_extension_id):
- raise SubjectCategoryUnknown()
- return self.driver.get_subject_assignment_list(intra_extension_id, subject_id)[subject_category_id]
-
@filter_args
@enforce(("read", "write"), "subject_assignments")
@enforce("read", "subjects")
@filter_args
@enforce("read", "object_assignments")
@enforce("read", "objects")
- def get_object_assignment_dict(self, user_id, intra_extension_id, object_id):
- if object_id not in self.get_object_dict(user_id, intra_extension_id):
+ @enforce("read", "object_categories")
+ def get_object_assignment_list(self, user_id, intra_extension_id, object_id, object_category_id):
+ if object_id not in self.driver.get_objects_dict(user_id, intra_extension_id):
raise ObjectUnknown()
- return self.driver.get_object_assignment_list(intra_extension_id, object_id)
+ elif object_category_id not in self.driver.get_object_categories_dict(intra_extension_id):
+ raise ObjectCategoryUnknown()
+ return self.driver.get_object_assignment_list(intra_extension_id, object_id, object_category_id)
@filter_args
@enforce(("read", "write"), "object_assignments")
@enforce("read", "objects")
@enforce("read", "object_categories")
- def add_object_assignment(self, user_id, intra_extension_id, object_id, object_category_id, object_scope_id):
+ def add_object_assignment_list(self, user_id, intra_extension_id, object_id, object_category_id, object_scope_id):
if object_id not in self.driver.get_objects_dict(intra_extension_id):
raise ObjectUnknown()
elif object_category_id not in self.driver.get_object_categories_dict(intra_extension_id):
raise ObjectAssignmentExisting()
return self.driver.add_object_assignment_list(intra_extension_id, object_id, object_category_id, object_scope_id)
- @filter_args
- @enforce("read", "object_assignments")
- @enforce("read", "objects")
- @enforce("read", "object_categories")
- def get_object_assignment(self, user_id, intra_extension_id, object_id, object_category_id):
- if object_id not in self.driver.get_objects_dict(user_id, intra_extension_id):
- raise ObjectUnknown()
- elif object_category_id not in self.driver.get_object_categories_dict(intra_extension_id):
- raise ObjectCategoryUnknown()
- return self.driver.get_object_assignment_list(intra_extension_id, object_id)[object_category_id]
-
@filter_args
@enforce(("read", "write"), "object_assignments")
@enforce("read", "objects")
@filter_args
@enforce("read", "action_assignments")
@enforce("read", "actions")
- def get_action_assignment_dict(self, user_id, intra_extension_id, action_id):
- if action_id not in self.get_action_dict(user_id, intra_extension_id):
+ @enforce("read", "action_categories")
+ def get_action_assignment_list(self, user_id, intra_extension_id, action_id, action_category_id):
+ if action_id not in self.driver.get_actions_dict(user_id, intra_extension_id):
raise ActionUnknown()
- return self.driver.get_action_assignment_list(intra_extension_id, action_id)
+ elif action_category_id not in self.driver.get_action_categories_dict(intra_extension_id):
+ raise ActionCategoryUnknown()
+ return self.driver.get_action_assignment_list(intra_extension_id, action_id, action_category_id)
@filter_args
@enforce(("read", "write"), "action_assignments")
@enforce("read", "actions")
@enforce("read", "action_categories")
- def add_action_assignment(self, user_id, intra_extension_id, action_id, action_category_id, action_scope_id):
+ def add_action_assignment_list(self, user_id, intra_extension_id, action_id, action_category_id, action_scope_id):
if action_id not in self.driver.get_actions_dict(intra_extension_id):
raise ActionUnknown()
elif action_category_id not in self.driver.get_action_categories_dict(intra_extension_id):
raise ObjectAssignmentExisting()
return self.driver.add_action_assignment_list(intra_extension_id, action_id, action_category_id, action_scope_id)
- @filter_args
- @enforce("read", "action_assignments")
- @enforce("read", "actions")
- @enforce("read", "action_categories")
- def get_action_assignment(self, user_id, intra_extension_id, action_id, action_category_id):
- if action_id not in self.driver.get_actions_dict(user_id, intra_extension_id):
- raise ActionUnknown()
- elif action_category_id not in self.driver.get_action_categories_dict(intra_extension_id):
- raise ActionCategoryUnknown()
- return self.driver.get_action_assignment_list(intra_extension_id, action_id)[action_category_id]
-
@filter_args
@enforce(("read", "write"), "action_assignments")
@enforce("read", "actions")
SUBJECT_SCOPE = 'subject_scope'
OBJECT_SCOPE = 'object_scope'
ACTION_SCOPE = 'action_scope'
+ SUB_META_RULE = 'sub_meta_rule'
def __get_data_from_type(self,
intra_extension_uuid,
elif data_name == self.SUBJECT_SCOPE:
if not category_uuid:
category_uuid = self.get_uuid_from_name(intra_extension_uuid, category_name, self.SUBJECT_CATEGORY)
- data_values = self.get_subject_category_scope_dict(intra_extension_uuid,
- category_uuid)["subject_category_scope"]
+ data_values = self.get_subject_scopes_dict(intra_extension_uuid,
+ category_uuid)["subject_category_scope"]
if (name and name not in extract_name(data_values)) or \
(uuid and uuid not in data_values.keys()):
raise SubjectScopeUnknown()
if not category_uuid:
category_uuid = self.get_uuid_from_name(intra_extension_uuid, category_name, self.OBJECT_CATEGORY)
data_values = self.get_object_scopes_dict(intra_extension_uuid,
- category_uuid)["object_category_scope"]
+ category_uuid)["object_category_scope"]
if (name and name not in extract_name(data_values)) or \
(uuid and uuid not in data_values.keys()):
raise ObjectScopeUnknown()
if not category_uuid:
category_uuid = self.get_uuid_from_name(intra_extension_uuid, category_name, self.ACTION_CATEGORY)
data_values = self.get_action_scopes_dict(intra_extension_uuid,
- category_uuid)["action_category_scope"]
+ category_uuid)["action_category_scope"]
if (name and name not in extract_name(data_values)) or \
(uuid and uuid not in data_values.keys()):
raise ActionScopeUnknown()
+ elif data_name == self.SUB_META_RULE:
+ data_values = self.get_sub_meta_rules_dict(intra_extension_uuid)["sub_meta_rule"]
+ if (name and name not in extract_name(data_values)) or \
+ (uuid and uuid not in data_values.keys()):
+ raise SubMetaRuleUnknown()
if category_uuid:
return data_values[category_uuid]
return data_values
def del_rule(self, intra_extension_id, sub_meta_rule_id, rule_id):
raise exception.NotImplemented() # pragma: no cover
+
class LogDriver(object):
def authz(self, message):
self.create_user("admin")
self.create_intra_extension()
- objects = self.manager.get_object_dict("admin", self.ref["id"])
+ objects = self.manager.get_objects_dict("admin", self.ref["id"])
self.assertIsInstance(objects, dict)
self.assertIn("objects", objects)
self.assertIn("id", objects)
# Delete the new object
self.manager.del_object("admin", self.ref["id"], new_object["id"])
- objects = self.manager.get_object_dict("admin", self.ref["id"])
+ objects = self.manager.get_objects_dict("admin", self.ref["id"])
self.assertIsInstance(objects, dict)
self.assertIn("objects", objects)
self.assertIn("id", objects)
self.assertNotIn(new_object["id"], objects["objects"])
# Add a particular object
- objects = self.manager.add_object("admin", self.ref["id"], new_object["name"])
+ objects = self.manager.add_object_dict("admin", self.ref["id"], new_object["name"])
self.assertIsInstance(objects, dict)
self.assertIn("object", objects)
self.assertIn("uuid", objects["object"])
self.assertEqual(new_object["name"], objects["object"]["name"])
new_object["id"] = objects["object"]["uuid"]
- objects = self.manager.get_object_dict("admin", self.ref["id"])
+ objects = self.manager.get_objects_dict("admin", self.ref["id"])
self.assertIsInstance(objects, dict)
self.assertIn("objects", objects)
self.assertIn("id", objects)
self.create_user("admin")
self.create_intra_extension()
- actions = self.manager.get_action_dict("admin", self.ref["id"])
+ actions = self.manager.get_actions_dict("admin", self.ref["id"])
self.assertIsInstance(actions, dict)
self.assertIn("actions", actions)
self.assertIn("id", actions)
# Delete the new action
self.manager.del_action("admin", self.ref["id"], new_action["id"])
- actions = self.manager.get_action_dict("admin", self.ref["id"])
+ actions = self.manager.get_actions_dict("admin", self.ref["id"])
self.assertIsInstance(actions, dict)
self.assertIn("actions", actions)
self.assertIn("id", actions)
self.assertNotIn(new_action["id"], actions["actions"])
# Add a particular action
- actions = self.manager.add_action("admin", self.ref["id"], new_action["name"])
+ actions = self.manager.add_action_dict("admin", self.ref["id"], new_action["name"])
self.assertIsInstance(actions, dict)
self.assertIn("action", actions)
self.assertIn("uuid", actions["action"])
self.assertEqual(new_action["name"], actions["action"]["name"])
new_action["id"] = actions["action"]["uuid"]
- actions = self.manager.get_action_dict("admin", self.ref["id"])
+ actions = self.manager.get_actions_dict("admin", self.ref["id"])
self.assertIsInstance(actions, dict)
self.assertIn("actions", actions)
self.assertIn("id", actions)
)
for object_category in object_categories["object_categories"]:
- object_category_scope = self.manager.get_object_scope_dict(
+ object_category_scope = self.manager.get_object_scopes_dict(
"admin",
self.ref["id"],
object_category)
self.ref["id"],
object_category,
new_object_category_scope_uuid)
- object_category_scope = self.manager.get_object_scope_dict(
+ object_category_scope = self.manager.get_object_scopes_dict(
"admin",
self.ref["id"],
object_category)
self.assertNotIn(new_object_category_scope_uuid, object_category_scope["object_category_scope"])
# Add a particular object_category_scope
- object_category_scope = self.manager.add_object_scope(
+ object_category_scope = self.manager.add_object_scope_dict(
"admin",
self.ref["id"],
object_category,
self.assertIn("uuid", object_category_scope["object_category_scope"])
self.assertEqual(new_object_category_scope[new_object_category_scope_uuid],
object_category_scope["object_category_scope"]["name"])
- object_category_scope = self.manager.get_object_scope_dict(
+ object_category_scope = self.manager.get_object_scopes_dict(
"admin",
self.ref["id"],
object_category)
)
for action_category in action_categories["action_categories"]:
- action_category_scope = self.manager.get_action_scope_dict(
+ action_category_scope = self.manager.get_action_scopes_dict(
"admin",
self.ref["id"],
action_category)
self.ref["id"],
action_category,
new_action_category_scope_uuid)
- action_category_scope = self.manager.get_action_scope_dict(
+ action_category_scope = self.manager.get_action_scopes_dict(
"admin",
self.ref["id"],
action_category)
self.assertNotIn(new_action_category_scope_uuid, action_category_scope["action_category_scope"])
# Add a particular action_category_scope
- action_category_scope = self.manager.add_action_scope(
+ action_category_scope = self.manager.add_action_scope_dict(
"admin",
self.ref["id"],
action_category,
self.assertIn("uuid", action_category_scope["action_category_scope"])
self.assertEqual(new_action_category_scope[new_action_category_scope_uuid],
action_category_scope["action_category_scope"]["name"])
- action_category_scope = self.manager.get_action_scope_dict(
+ action_category_scope = self.manager.get_action_scopes_dict(
"admin",
self.ref["id"],
action_category)
{new_subject_category_uuid: [new_subject_category_scope2_uuid, ]},
subject_category_assignments["subject_category_assignments"][new_subject["id"]])
- data = self.manager.add_subject_assignment(
+ data = self.manager.add_subject_assignment_list(
"admin",
self.ref["id"],
new_subject["id"],
)
for object_category in object_categories["object_categories"]:
- object_category_scope = self.manager.get_object_scope_dict(
+ object_category_scope = self.manager.get_object_scopes_dict(
"admin",
self.ref["id"],
object_category)
self.assertIn(new_object_category_scope2[new_object_category_scope2_uuid],
object_category_scope["object_category_scope"][object_category].values())
- object_category_assignments = self.manager.get_object_assignment_dict(
+ object_category_assignments = self.manager.get_object_assignment_list(
"admin",
self.ref["id"],
new_object["id"]
self.assertEqual(
{new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid]},
object_category_assignments["object_category_assignments"][new_object["id"]])
- object_category_assignments = self.manager.get_object_assignment_dict(
+ object_category_assignments = self.manager.get_object_assignment_list(
"admin",
self.ref["id"],
new_object["id"]
new_object_category_uuid,
new_object_category_scope_uuid
)
- object_category_assignments = self.manager.get_object_assignment_dict(
+ object_category_assignments = self.manager.get_object_assignment_list(
"admin",
self.ref["id"],
new_object["id"]
{new_object_category_uuid: [new_object_category_scope2_uuid, ]},
object_category_assignments["object_category_assignments"][new_object["id"]])
- self.manager.add_object_assignment(
+ self.manager.add_object_assignment_list(
"admin",
self.ref["id"],
new_object["id"],
new_object_category_scope_uuid
)
- object_category_assignments = self.manager.get_object_assignment_dict(
+ object_category_assignments = self.manager.get_object_assignment_list(
"admin",
self.ref["id"],
new_object["id"]
)
for action_category in action_categories["action_categories"]:
- action_category_scope = self.manager.get_action_scope_dict(
+ action_category_scope = self.manager.get_action_scopes_dict(
"admin",
self.ref["id"],
action_category)
self.assertIn(new_action_category_scope2[new_action_category_scope2_uuid],
action_category_scope["action_category_scope"][action_category].values())
- action_category_assignments = self.manager.get_action_assignment_dict(
+ action_category_assignments = self.manager.get_action_assignment_list(
"admin",
self.ref["id"],
new_action["id"]
self.assertEqual(
{new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid]},
action_category_assignments["action_category_assignments"][new_action["id"]])
- action_category_assignments = self.manager.get_action_assignment_dict(
+ action_category_assignments = self.manager.get_action_assignment_list(
"admin",
self.ref["id"],
new_action["id"]
new_action_category_uuid,
new_action_category_scope_uuid
)
- action_category_assignments = self.manager.get_action_assignment_dict(
+ action_category_assignments = self.manager.get_action_assignment_list(
"admin",
self.ref["id"],
new_action["id"]
{new_action_category_uuid: [new_action_category_scope2_uuid, ]},
action_category_assignments["action_category_assignments"][new_action["id"]])
- self.manager.add_action_assignment(
+ self.manager.add_action_assignment_list(
"admin",
self.ref["id"],
new_action["id"],
new_action_category_scope_uuid
)
- action_category_assignments = self.manager.get_action_assignment_dict(
+ action_category_assignments = self.manager.get_action_assignment_list(
"admin",
self.ref["id"],
new_action["id"]
for rule in sub_rules["rules"][relation]:
for cat, cat_func, func_name in (
("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"),
- ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"),
- ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"),
+ ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"),
+ ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"),
):
for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]:
scope = cat_func(
sub_rule = []
for cat, cat_func, func_name in (
("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"),
- ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"),
- ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"),
+ ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"),
+ ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"),
):
for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]:
scope = cat_func(
for rule in sub_rules["rules"][relation]:
for cat, cat_func, func_name in (
("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"),
- ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"),
- ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"),
+ ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"),
+ ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"),
):
for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]:
scope = cat_func(
self.assertRaises(
ObjectReadNotAuthorized,
- self.manager.get_object_dict,
+ self.manager.get_objects_dict,
demo_user["id"], ref["id"])
- objects = self.manager.get_object_dict(admin_user["id"], ref["id"])
+ objects = self.manager.get_objects_dict(admin_user["id"], ref["id"])
self.assertIsInstance(objects, dict)
self.assertIn("objects", objects)
self.assertIn("id", objects)
demo_user["id"], ref["id"], new_object["id"])
self.manager.del_object(admin_user["id"], ref["id"], new_object["id"])
- objects = self.manager.get_object_dict(admin_user["id"], ref["id"])
+ objects = self.manager.get_objects_dict(admin_user["id"], ref["id"])
self.assertIsInstance(objects, dict)
self.assertIn("objects", objects)
self.assertIn("id", objects)
# Add a particular object
self.assertRaises(
ObjectAddNotAuthorized,
- self.manager.add_object,
+ self.manager.add_object_dict,
demo_user["id"], ref["id"], new_object["name"])
- objects = self.manager.add_object(admin_user["id"], ref["id"], new_object["name"])
+ objects = self.manager.add_object_dict(admin_user["id"], ref["id"], new_object["name"])
self.assertIsInstance(objects, dict)
self.assertIn("object", objects)
self.assertIn("uuid", objects["object"])
self.assertEqual(new_object["name"], objects["object"]["name"])
new_object["id"] = objects["object"]["uuid"]
- objects = self.manager.get_object_dict(admin_user["id"], ref["id"])
+ objects = self.manager.get_objects_dict(admin_user["id"], ref["id"])
self.assertIsInstance(objects, dict)
self.assertIn("objects", objects)
self.assertIn("id", objects)
self.assertRaises(
ActionReadNotAuthorized,
- self.manager.get_action_dict,
+ self.manager.get_actions_dict,
demo_user["id"], ref["id"])
- actions = self.manager.get_action_dict(admin_user["id"], ref["id"])
+ actions = self.manager.get_actions_dict(admin_user["id"], ref["id"])
self.assertIsInstance(actions, dict)
self.assertIn("actions", actions)
self.assertIn("id", actions)
demo_user["id"], ref["id"], new_action["id"])
self.manager.del_action(admin_user["id"], ref["id"], new_action["id"])
- actions = self.manager.get_action_dict(admin_user["id"], ref["id"])
+ actions = self.manager.get_actions_dict(admin_user["id"], ref["id"])
self.assertIsInstance(actions, dict)
self.assertIn("actions", actions)
self.assertIn("id", actions)
# Add a particular action
self.assertRaises(
ActionAddNotAuthorized,
- self.manager.add_action,
+ self.manager.add_action_dict,
demo_user["id"], ref["id"], new_action["name"])
- actions = self.manager.add_action(admin_user["id"], ref["id"], new_action["name"])
+ actions = self.manager.add_action_dict(admin_user["id"], ref["id"], new_action["name"])
self.assertIsInstance(actions, dict)
self.assertIn("action", actions)
self.assertIn("uuid", actions["action"])
self.assertEqual(new_action["name"], actions["action"]["name"])
new_action["id"] = actions["action"]["uuid"]
- actions = self.manager.get_action_dict(admin_user["id"], ref["id"])
+ actions = self.manager.get_actions_dict(admin_user["id"], ref["id"])
self.assertIsInstance(actions, dict)
self.assertIn("actions", actions)
self.assertIn("id", actions)
for object_category in object_categories["object_categories"]:
self.assertRaises(
ObjectCategoryScopeReadNotAuthorized,
- self.manager.get_object_scope_dict,
+ self.manager.get_object_scopes_dict,
demo_user["id"], ref["id"], object_category)
- object_category_scope = self.manager.get_object_scope_dict(
+ object_category_scope = self.manager.get_object_scopes_dict(
admin_user["id"],
ref["id"],
object_category)
ref["id"],
object_category,
new_object_category_scope_uuid)
- object_category_scope = self.manager.get_object_scope_dict(
+ object_category_scope = self.manager.get_object_scopes_dict(
admin_user["id"],
ref["id"],
object_category)
# Add a particular object_category_scope
self.assertRaises(
ObjectCategoryScopeAddNotAuthorized,
- self.manager.add_object_scope,
+ self.manager.add_object_scope_dict,
demo_user["id"], ref["id"], object_category,
new_object_category_scope[new_object_category_scope_uuid]
)
- object_category_scope = self.manager.add_object_scope(
+ object_category_scope = self.manager.add_object_scope_dict(
admin_user["id"],
ref["id"],
object_category,
self.assertIn("uuid", object_category_scope["object_category_scope"])
self.assertEqual(new_object_category_scope[new_object_category_scope_uuid],
object_category_scope["object_category_scope"]["name"])
- object_category_scope = self.manager.get_object_scope_dict(
+ object_category_scope = self.manager.get_object_scopes_dict(
admin_user["id"],
ref["id"],
object_category)
for action_category in action_categories["action_categories"]:
self.assertRaises(
ActionCategoryScopeReadNotAuthorized,
- self.manager.get_object_scope_dict,
+ self.manager.get_object_scopes_dict,
demo_user["id"], ref["id"], action_category)
- action_category_scope = self.manager.get_action_scope_dict(
+ action_category_scope = self.manager.get_action_scopes_dict(
admin_user["id"],
ref["id"],
action_category)
ref["id"],
action_category,
new_action_category_scope_uuid)
- action_category_scope = self.manager.get_action_scope_dict(
+ action_category_scope = self.manager.get_action_scopes_dict(
admin_user["id"],
ref["id"],
action_category)
# Add a particular action_category_scope
self.assertRaises(
ActionCategoryScopeAddNotAuthorized,
- self.manager.add_action_scope,
+ self.manager.add_action_scope_dict,
demo_user["id"], ref["id"], action_category,
new_action_category_scope[new_action_category_scope_uuid]
)
- action_category_scope = self.manager.add_action_scope(
+ action_category_scope = self.manager.add_action_scope_dict(
admin_user["id"],
ref["id"],
action_category,
self.assertIn("uuid", action_category_scope["action_category_scope"])
self.assertEqual(new_action_category_scope[new_action_category_scope_uuid],
action_category_scope["action_category_scope"]["name"])
- action_category_scope = self.manager.get_action_scope_dict(
+ action_category_scope = self.manager.get_action_scopes_dict(
admin_user["id"],
ref["id"],
action_category)
self.assertRaises(
SubjectCategoryAssignmentAddNotAuthorized,
- self.manager.add_subject_assignment,
+ self.manager.add_subject_assignment_list,
demo_user["id"], ref["id"], new_subject["id"],
new_subject_category_uuid,
new_subject_category_scope_uuid
)
- data = self.manager.add_subject_assignment(
+ data = self.manager.add_subject_assignment_list(
admin_user["id"],
ref["id"],
new_subject["id"],
)
for object_category in object_categories["object_categories"]:
- object_category_scope = self.manager.get_object_scope_dict(
+ object_category_scope = self.manager.get_object_scopes_dict(
admin_user["id"],
ref["id"],
object_category)
self.assertRaises(
ObjectCategoryAssignmentReadNotAuthorized,
- self.manager.get_object_assignment_dict,
+ self.manager.get_object_assignment_list,
demo_user["id"], ref["id"], new_object["id"]
)
- object_category_assignments = self.manager.get_object_assignment_dict(
+ object_category_assignments = self.manager.get_object_assignment_list(
admin_user["id"],
ref["id"],
new_object["id"]
self.assertEqual(
{new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid]},
object_category_assignments["object_category_assignments"][new_object["id"]])
- object_category_assignments = self.manager.get_object_assignment_dict(
+ object_category_assignments = self.manager.get_object_assignment_list(
admin_user["id"],
ref["id"],
new_object["id"]
new_object_category_uuid,
new_object_category_scope_uuid
)
- object_category_assignments = self.manager.get_object_assignment_dict(
+ object_category_assignments = self.manager.get_object_assignment_list(
admin_user["id"],
ref["id"],
new_object["id"]
self.assertRaises(
ObjectCategoryAssignmentAddNotAuthorized,
- self.manager.add_object_assignment,
+ self.manager.add_object_assignment_list,
demo_user["id"], ref["id"], new_object["id"],
new_object_category_uuid,
new_object_category_scope_uuid
)
- self.manager.add_object_assignment(
+ self.manager.add_object_assignment_list(
admin_user["id"],
ref["id"],
new_object["id"],
new_object_category_scope_uuid
)
- object_category_assignments = self.manager.get_object_assignment_dict(
+ object_category_assignments = self.manager.get_object_assignment_list(
admin_user["id"],
ref["id"],
new_object["id"]
)
for action_category in action_categories["action_categories"]:
- action_category_scope = self.manager.get_action_scope_dict(
+ action_category_scope = self.manager.get_action_scopes_dict(
admin_user["id"],
ref["id"],
action_category)
self.assertRaises(
ActionCategoryAssignmentReadNotAuthorized,
- self.manager.get_action_assignment_dict,
+ self.manager.get_action_assignment_list,
demo_user["id"], ref["id"], new_action["id"]
)
- action_category_assignments = self.manager.get_action_assignment_dict(
+ action_category_assignments = self.manager.get_action_assignment_list(
admin_user["id"],
ref["id"],
new_action["id"]
self.assertEqual(
{new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid]},
action_category_assignments["action_category_assignments"][new_action["id"]])
- action_category_assignments = self.manager.get_action_assignment_dict(
+ action_category_assignments = self.manager.get_action_assignment_list(
admin_user["id"],
ref["id"],
new_action["id"]
new_action_category_uuid,
new_action_category_scope_uuid
)
- action_category_assignments = self.manager.get_action_assignment_dict(
+ action_category_assignments = self.manager.get_action_assignment_list(
admin_user["id"],
ref["id"],
new_action["id"]
self.assertRaises(
ActionCategoryAssignmentAddNotAuthorized,
- self.manager.add_action_assignment,
+ self.manager.add_action_assignment_list,
demo_user["id"], ref["id"], new_action["id"],
new_action_category_uuid,
new_action_category_scope_uuid
)
- self.manager.add_action_assignment(
+ self.manager.add_action_assignment_list(
admin_user["id"],
ref["id"],
new_action["id"],
new_action_category_scope_uuid
)
- action_category_assignments = self.manager.get_action_assignment_dict(
+ action_category_assignments = self.manager.get_action_assignment_list(
admin_user["id"],
ref["id"],
new_action["id"]
for rule in sub_rules["rules"][relation]:
for cat, cat_func, func_name in (
("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"),
- ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"),
- ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"),
+ ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"),
+ ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"),
):
for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]:
scope = cat_func(
sub_rule = []
for cat, cat_func, func_name in (
("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"),
- ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"),
- ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"),
+ ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"),
+ ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"),
):
for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]:
scope = cat_func(
for rule in sub_rules["rules"][relation]:
for cat, cat_func, func_name in (
("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"),
- ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"),
- ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"),
+ ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"),
+ ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"),
):
for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]:
scope = cat_func(