policies = self.PolicyManager.get_policies("admin")
models = self.ModelManager.get_models("admin")
for pdp_key, pdp_value in self.PDPManager.get_pdp(user_id).items():
+ if 'security_pipeline' not in pdp_value:
+ raise exceptions.PdpContentError
for policy_id in pdp_value["security_pipeline"]:
- if not policies:
+ if not policies or policy_id not in policies:
raise exceptions.PolicyUnknown
model_id = policies[policy_id]["model_id"]
if not models:
raise exceptions.ModelUnknown
+ if model_id not in models:
+ raise exceptions.ModelUnknown
if meta_rule_id in models[model_id]["meta_rules"]:
return policy_id
value.get('name'))
perimeter_id = uuid4().hex
value.update(k_user['users'][0])
+ if not self.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
return self.driver.set_subject(policy_id=policy_id, perimeter_id=perimeter_id, value=value)
@enforce(("read", "write"), "perimeter")
@enforce(("read", "write"), "perimeter")
def add_object(self, user_id, policy_id, perimeter_id=None, value=None):
+ if not self.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
if not perimeter_id:
perimeter_id = uuid4().hex
return self.driver.set_object(policy_id=policy_id, perimeter_id=perimeter_id, value=value)
@enforce(("read", "write"), "perimeter")
def add_action(self, user_id, policy_id, perimeter_id=None, value=None):
+ if not self.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
if not perimeter_id:
perimeter_id = uuid4().hex
return self.driver.set_action(policy_id=policy_id, perimeter_id=perimeter_id, value=value)
@enforce(("read", "write"), "data")
def set_subject_data(self, user_id, policy_id, data_id=None, category_id=None, value=None):
+ if not category_id:
+ raise Exception('Invalid category id')
+ if not self.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
if not data_id:
data_id = uuid4().hex
return self.driver.set_subject_data(policy_id=policy_id, data_id=data_id, category_id=category_id, value=value)
@enforce(("read", "write"), "data")
def add_object_data(self, user_id, policy_id, data_id=None, category_id=None, value=None):
+ if not category_id:
+ raise Exception('Invalid category id')
+ if not self.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
if not data_id:
data_id = uuid4().hex
return self.driver.set_object_data(policy_id=policy_id, data_id=data_id, category_id=category_id, value=value)
if not category_id:
for cat in available_metadata["action"]:
results.append(self.driver.get_action_data(policy_id=policy_id, data_id=data_id,
- category_id=cat))
+ category_id=cat))
if category_id and category_id in available_metadata["action"]:
results.append(self.driver.get_action_data(policy_id=policy_id, data_id=data_id,
- category_id=category_id))
+ category_id=category_id))
return results
@enforce(("read", "write"), "data")
def add_action_data(self, user_id, policy_id, data_id=None, category_id=None, value=None):
+ if not category_id:
+ raise Exception('Invalid category id')
+ if not self.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
if not data_id:
data_id = uuid4().hex
return self.driver.set_action_data(policy_id=policy_id, data_id=data_id, category_id=category_id, value=value)
@enforce(("read", "write"), "assignments")
def add_subject_assignment(self, user_id, policy_id, subject_id, category_id, data_id):
+ if not self.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
return self.driver.add_subject_assignment(policy_id=policy_id, subject_id=subject_id,
category_id=category_id, data_id=data_id)
@enforce(("read", "write"), "assignments")
def add_object_assignment(self, user_id, policy_id, object_id, category_id, data_id):
+ if not self.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
return self.driver.add_object_assignment(policy_id=policy_id, object_id=object_id,
category_id=category_id, data_id=data_id)
@enforce(("read", "write"), "assignments")
def add_action_assignment(self, user_id, policy_id, action_id, category_id, data_id):
+ if not self.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
return self.driver.add_action_assignment(policy_id=policy_id, action_id=action_id,
category_id=category_id, data_id=data_id)
@enforce(("read", "write"), "rules")
def add_rule(self, user_id, policy_id, meta_rule_id, value):
+ if not self.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
return self.driver.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value)
@enforce(("read", "write"), "rules")
+import policies.mock_data as mock_data
+
+
def get_action_assignments(policy_id, action_id=None, category_id=None):
from python_moondb.core import PolicyManager
return PolicyManager.get_action_assignments("", policy_id, action_id, category_id)
def test_get_action_assignments(db):
- policy_id = "admin"
+ policy_id = mock_data.get_policy_id()
action_id = "action_id_1"
category_id = "category_id_1"
data_id = "data_id_1"
def test_get_action_assignments_by_policy_id(db):
- policy_id = "admin"
+ policy_id = mock_data.get_policy_id()
action_id = "action_id_1"
category_id = "category_id_1"
data_id = "data_id_1"
def test_add_action_assignments(db):
- policy_id = "admin"
+ policy_id = mock_data.get_policy_id()
action_id = "action_id_1"
category_id = "category_id_1"
data_id = "data_id_1"
def test_delete_action_assignment(db):
- policy_id = "admin_1"
+ policy_id = mock_data.get_policy_id()
add_action_assignment(policy_id, "", "", "")
- policy_id = "admin_2"
+ policy_id = mock_data.get_policy_id()
action_id = "action_id_2"
category_id = "category_id_2"
data_id = "data_id_2"
def test_get_object_assignments(db):
- policy_id = "admin"
+ policy_id = mock_data.get_policy_id()
object_id = "object_id_1"
category_id = "category_id_1"
data_id = "data_id_1"
def test_get_object_assignments_by_policy_id(db):
- policy_id = "admin"
+ policy_id = mock_data.get_policy_id()
object_id_1 = "object_id_1"
category_id_1 = "category_id_1"
data_id = "data_id_1"
def test_add_object_assignments(db):
- policy_id = "admin"
+ policy_id = mock_data.get_policy_id()
object_id = "object_id_1"
category_id = "category_id_1"
data_id = "data_id_1"
def test_delete_object_assignment(db):
- policy_id = "admin_1"
+ policy_id = mock_data.get_policy_id()
add_object_assignment(policy_id, "", "", "")
object_id = "action_id_2"
category_id = "category_id_2"
def test_get_subject_assignments(db):
- policy_id = "admin"
+ policy_id = mock_data.get_policy_id()
subject_id = "object_id_1"
category_id = "category_id_1"
data_id = "data_id_1"
def test_get_subject_assignments_by_policy_id(db):
- policy_id = "admin"
+ policy_id = mock_data.get_policy_id()
subject_id_1 = "subject_id_1"
category_id_1 = "category_id_1"
data_id = "data_id_1"
def test_add_subject_assignments(db):
- policy_id = "admin"
+ policy_id = mock_data.get_policy_id()
subject_id = "subject_id_1"
category_id = "category_id_1"
data_id = "data_id_1"
def test_delete_subject_assignment(db):
- policy_id = "admin_1"
+ policy_id = mock_data.get_policy_id()
add_subject_assignment(policy_id, "", "", "")
subject_id = "subject_id_2"
category_id = "category_id_2"
def test_get_action_data_with_invalid_category_id(db):
policy_id = mock_data.get_policy_id()
get_available_metadata(policy_id)
-
- policy_id = policy_id
data_id = "data_id_1"
category_id = "action_category_id1"
value = {
def test_add_action_data(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
data_id = "data_id_1"
category_id = "category_id_1"
value = {
assert action_data[action_data_id].get('policy_id') == policy_id
+def test_add_action_data_with_invalid_category_id(db):
+ policy_id = mock_data.get_policy_id()
+ data_id = "data_id_1"
+ value = {
+ "name": "action-type",
+ "description": {"vm-action": "", "storage-action": "", },
+ }
+ with pytest.raises(Exception) as exception_info:
+ add_action_data(policy_id=policy_id, data_id=data_id, value=value).get('data')
+ assert str(exception_info.value) == 'Invalid category id'
+
+
def test_delete_action_data(db):
policy_id = mock_data.get_policy_id()
get_available_metadata(policy_id)
def test_get_object_data(db):
policy_id = mock_data.get_policy_id()
get_available_metadata(policy_id)
-
- policy_id = policy_id
data_id = "data_id_1"
category_id = "object_category_id1"
value = {
def test_get_object_data_with_invalid_category_id(db):
policy_id = mock_data.get_policy_id()
get_available_metadata(policy_id)
-
- policy_id = policy_id
data_id = "data_id_1"
category_id = "object_category_id1"
value = {
def test_add_object_data(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
data_id = "data_id_1"
category_id = "object_category_id1"
value = {
assert object_data[object_data_id].get('policy_id') == policy_id
+def test_add_object_data_with_invalid_category_id(db):
+ policy_id = mock_data.get_policy_id()
+ data_id = "data_id_1"
+ value = {
+ "name": "object-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ with pytest.raises(Exception) as exception_info:
+ add_object_data(policy_id=policy_id, data_id=data_id, value=value).get('data')
+ assert str(exception_info.value) == 'Invalid category id'
+
+
def test_delete_object_data(db):
policy_id = mock_data.get_policy_id()
get_available_metadata(policy_id)
def test_get_subject_data(db):
policy_id = mock_data.get_policy_id()
get_available_metadata(policy_id)
-
- policy_id = policy_id
data_id = "data_id_1"
category_id = "subject_category_id1"
value = {
def test_get_subject_data_with_invalid_category_id(db):
policy_id = mock_data.get_policy_id()
get_available_metadata(policy_id)
-
- policy_id = policy_id
data_id = "data_id_1"
category_id = "subject_category_id1"
value = {
def test_add_subject_data(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
data_id = "data_id_1"
category_id = "subject_category_id1"
value = {
"name": "subject-security-level",
"description": {"low": "", "medium": "", "high": ""},
}
- subject_data = add_object_data(policy_id, data_id, category_id, value).get('data')
+ subject_data = add_subject_data(policy_id, data_id, category_id, value).get('data')
assert subject_data
subject_data_id = list(subject_data.keys())[0]
assert subject_data[subject_data_id].get('policy_id') == policy_id
+def test_add_subject_data_with_no_category_id(db):
+ policy_id = mock_data.get_policy_id()
+ data_id = "data_id_1"
+ value = {
+ "name": "subject-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ with pytest.raises(Exception) as exception_info:
+ add_subject_data(policy_id=policy_id, data_id=data_id, value=value).get('data')
+ assert str(exception_info.value) == 'Invalid category id'
+
+
def test_delete_subject_data(db):
policy_id = mock_data.get_policy_id()
get_available_metadata(policy_id)
def test_get_actions(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
value = {
"name": "test_action",
"description": "test",
def test_add_action(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
value = {
"name": "test_action",
"description": "test",
def test_add_action_multiple_times(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
value = {
"name": "test_action",
"description": "test",
"description": "test",
"policy_list": ['policy_id_3', 'policy_id_4']
}
- action = add_action('policy_id_7', perimeter_id, value)
+ action = add_action(mock_data.get_policy_id(), perimeter_id, value)
assert action
action_id = list(action.keys())[0]
assert len(action[action_id].get('policy_list')) == 2
def test_delete_action(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
value = {
"name": "test_action",
"description": "test",
def test_get_objects(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
value = {
"name": "test_object",
"description": "test",
def test_add_object(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
value = {
"name": "test_object",
"description": "test",
def test_add_objects_multiple_times(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
value = {
"name": "test_object",
"description": "test",
"description": "test",
"policy_list": ['policy_id_3', 'policy_id_4']
}
- added_object = add_object('policy_id_7', perimeter_id, value)
+ added_object = add_object(mock_data.get_policy_id(), perimeter_id, value)
assert added_object
object_id = list(added_object.keys())[0]
assert len(added_object[object_id].get('policy_list')) == 2
def test_delete_object(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
value = {
"name": "test_object",
"description": "test",
def test_get_subjects(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
value = {
"name": "testuser",
"description": "test",
def test_add_subject(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
value = {
"name": "testuser",
"description": "test",
def test_add_subjects_multiple_times(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
value = {
"name": "testuser",
"description": "test",
"description": "test",
"policy_list": ['policy_id_3', 'policy_id_4']
}
- subject = add_subject('policy_id_7', perimeter_id, value)
+ subject = add_subject(mock_data.get_policy_id(), perimeter_id, value)
assert subject
subject_id = list(subject.keys())[0]
assert len(subject[subject_id].get('policy_list')) == 2
def test_delete_subject(db):
- policy_id = "policy_id_1"
+ policy_id = mock_data.get_policy_id()
value = {
"name": "testuser",
"description": "test",
"instructions": ({"decision": "grant"}),
"enabled": "",
}
- policy_id = "1"
+ policy_id = mock_data.get_policy_id()
meta_rule_id = "1"
add_rule(policy_id, meta_rule_id, value)
value = {
"instructions": ({"decision": "grant"}),
"enabled": "",
}
- policy_id = "1"
meta_rule_id = "1"
add_rule(policy_id, meta_rule_id, value)
rules = get_rules(policy_id, meta_rule_id)
"instructions": ({"decision": "grant"}),
"enabled": "",
}
- policy_id = "1"
+ policy_id = mock_data.get_policy_id()
meta_rule_id = "1"
rules = add_rule(policy_id, meta_rule_id, value)
assert rules
"instructions": ({"decision": "grant"}),
"enabled": "",
}
- policy_id = "2"
+ policy_id = mock_data.get_policy_id()
meta_rule_id = "2"
rules = add_rule(policy_id, meta_rule_id, value)
rule_id = list(rules.keys())[0]