add unit test for policy module 55/49655/1
authorahmed.helmy <ahmad.helmy@orange.com>
Tue, 26 Dec 2017 11:26:17 +0000 (13:26 +0200)
committerahmed.helmy <ahmad.helmy@orange.com>
Tue, 26 Dec 2017 11:26:17 +0000 (13:26 +0200)
Change-Id: I981d9161cbdc167c4c9b5483d23842bca7e82006
Signed-off-by: ahmed.helmy <ahmad.helmy@orange.com>
python_moondb/tests/unit_python/policies/__init__.py [new file with mode: 0644]
python_moondb/tests/unit_python/policies/mock_data.py [new file with mode: 0644]
python_moondb/tests/unit_python/policies/test_assignments.py [new file with mode: 0755]
python_moondb/tests/unit_python/policies/test_data.py [new file with mode: 0755]
python_moondb/tests/unit_python/policies/test_policies.py [new file with mode: 0755]
python_moondb/tests/unit_python/test_policies.py [deleted file]

diff --git a/python_moondb/tests/unit_python/policies/__init__.py b/python_moondb/tests/unit_python/policies/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/python_moondb/tests/unit_python/policies/mock_data.py b/python_moondb/tests/unit_python/policies/mock_data.py
new file mode 100644 (file)
index 0000000..b264297
--- /dev/null
@@ -0,0 +1,45 @@
+def create_meta_rule():
+    meta_rule_value = {
+        "name": "meta_rule1",
+        "algorithm": "name of the meta rule algorithm",
+        "subject_categories": ["subject_category_id1",
+                               "subject_category_id2"],
+        "object_categories": ["object_category_id1"],
+        "action_categories": ["action_category_id1"]
+    }
+    return meta_rule_value
+
+
+def create_model(meta_rule_id):
+    value = {
+        "name": "test_model",
+        "description": "test",
+        "meta_rules": [meta_rule_id]
+
+    }
+    return value
+
+
+def create_policy(model_id):
+    value = {
+        "name": "policy_1",
+        "model_id": model_id,
+        "genre": "authz",
+        "description": "test",
+    }
+    return value
+
+
+def get_policy_id():
+    import policies.test_policies as test_policies
+    import models.test_models as test_models
+    import models.test_meta_rules as test_meta_rules
+    meta_rule = test_meta_rules.add_meta_rule(value=create_meta_rule())
+    meta_rule_id = list(meta_rule.keys())[0]
+    model = test_models.add_model(value=create_model(meta_rule_id))
+    model_id = list(model.keys())[0]
+    value = create_policy(model_id)
+    policy = test_policies.add_policies(value)
+    assert policy
+    policy_id = list(policy.keys())[0]
+    return policy_id
diff --git a/python_moondb/tests/unit_python/policies/test_assignments.py b/python_moondb/tests/unit_python/policies/test_assignments.py
new file mode 100755 (executable)
index 0000000..ccac205
--- /dev/null
@@ -0,0 +1,245 @@
+def get_action_assignments(policy_id, action_id=None, category_id=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.get_action_assignments("", policy_id, action_id, category_id)
+
+
+def add_action_assignment(policy_id, action_id, category_id, data_id):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.add_action_assignment("", policy_id, action_id, category_id, data_id)
+
+
+def delete_action_assignment(policy_id, action_id, category_id, data_id):
+    from python_moondb.core import PolicyManager
+    PolicyManager.delete_action_assignment("", policy_id, action_id, category_id, data_id)
+
+
+def get_object_assignments(policy_id, object_id=None, category_id=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.get_object_assignments("", policy_id, object_id, category_id)
+
+
+def add_object_assignment(policy_id, object_id, category_id, data_id):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.add_object_assignment("", policy_id, object_id, category_id, data_id)
+
+
+def delete_object_assignment(policy_id, object_id, category_id, data_id):
+    from python_moondb.core import PolicyManager
+    PolicyManager.delete_object_assignment("", policy_id, object_id, category_id, data_id)
+
+
+def get_subject_assignments(policy_id, subject_id=None, category_id=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.get_subject_assignments("", policy_id, subject_id, category_id)
+
+
+def add_subject_assignment(policy_id, subject_id, category_id, data_id):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.add_subject_assignment("", policy_id, subject_id, category_id, data_id)
+
+
+def delete_subject_assignment(policy_id, subject_id, category_id, data_id):
+    from python_moondb.core import PolicyManager
+    PolicyManager.delete_subject_assignment("", policy_id, subject_id, category_id, data_id)
+
+
+def test_get_action_assignments(db):
+    policy_id = "admin"
+    action_id = "action_id_1"
+    category_id = "category_id_1"
+    data_id = "data_id_1"
+    add_action_assignment(policy_id, action_id, category_id, data_id)
+    act_assignments = get_action_assignments(policy_id, action_id, category_id)
+    action_id_1 = list(act_assignments.keys())[0]
+    assert act_assignments[action_id_1]["policy_id"] == policy_id
+    assert act_assignments[action_id_1]["action_id"] == action_id
+    assert act_assignments[action_id_1]["category_id"] == category_id
+    assert len(act_assignments[action_id_1].get("assignments")) == 1
+    assert data_id in act_assignments[action_id_1].get("assignments")
+
+
+def test_get_action_assignments_by_policy_id(db):
+    policy_id = "admin"
+    action_id = "action_id_1"
+    category_id = "category_id_1"
+    data_id = "data_id_1"
+    add_action_assignment(policy_id, action_id, category_id, data_id)
+    data_id = "data_id_2"
+    add_action_assignment(policy_id, action_id, category_id, data_id)
+    data_id = "data_id_3"
+    add_action_assignment(policy_id, action_id, category_id, data_id)
+    act_assignments = get_action_assignments(policy_id)
+    action_id_1 = list(act_assignments.keys())[0]
+    assert act_assignments[action_id_1]["policy_id"] == policy_id
+    assert act_assignments[action_id_1]["action_id"] == action_id
+    assert act_assignments[action_id_1]["category_id"] == category_id
+    assert len(act_assignments[action_id_1].get("assignments")) == 3
+
+
+def test_add_action_assignments(db):
+    policy_id = "admin"
+    action_id = "action_id_1"
+    category_id = "category_id_1"
+    data_id = "data_id_1"
+    action_assignments = add_action_assignment(policy_id, action_id, category_id, data_id)
+    assert action_assignments
+    action_id_1 = list(action_assignments.keys())[0]
+    assert action_assignments[action_id_1]["policy_id"] == policy_id
+    assert action_assignments[action_id_1]["action_id"] == action_id
+    assert action_assignments[action_id_1]["category_id"] == category_id
+    assert len(action_assignments[action_id_1].get("assignments")) == 1
+    assert data_id in action_assignments[action_id_1].get("assignments")
+
+
+def test_delete_action_assignment(db):
+    policy_id = "admin_1"
+    add_action_assignment(policy_id, "", "", "")
+    policy_id = "admin_2"
+    action_id = "action_id_2"
+    category_id = "category_id_2"
+    data_id = "data_id_2"
+    add_action_assignment(policy_id, action_id, category_id, data_id)
+    delete_action_assignment(policy_id, "", "", "")
+    assignments = get_action_assignments(policy_id, )
+    assert len(assignments) == 1
+
+
+def test_delete_action_assignment_with_invalid_policy_id(db):
+    policy_id = "invalid_id"
+    delete_action_assignment(policy_id, "", "", "")
+    assignments = get_action_assignments(policy_id, )
+    assert len(assignments) == 0
+
+
+def test_get_object_assignments(db):
+    policy_id = "admin"
+    object_id = "object_id_1"
+    category_id = "category_id_1"
+    data_id = "data_id_1"
+    add_object_assignment(policy_id, object_id, category_id, data_id)
+    obj_assignments = get_object_assignments(policy_id, object_id, category_id)
+    object_id_1 = list(obj_assignments.keys())[0]
+    assert obj_assignments[object_id_1]["policy_id"] == policy_id
+    assert obj_assignments[object_id_1]["object_id"] == object_id
+    assert obj_assignments[object_id_1]["category_id"] == category_id
+    assert len(obj_assignments[object_id_1].get("assignments")) == 1
+    assert data_id in obj_assignments[object_id_1].get("assignments")
+
+
+def test_get_object_assignments_by_policy_id(db):
+    policy_id = "admin"
+    object_id_1 = "object_id_1"
+    category_id_1 = "category_id_1"
+    data_id = "data_id_1"
+    add_action_assignment(policy_id, object_id_1, category_id_1, data_id)
+    object_id_2 = "object_id_2"
+    category_id_2 = "category_id_2"
+    data_id = "data_id_2"
+    add_action_assignment(policy_id, object_id_2, category_id_2, data_id)
+    object_id_3 = "object_id_3"
+    category_id_3 = "category_id_3"
+    data_id = "data_id_3"
+    add_action_assignment(policy_id, object_id_3, category_id_3, data_id)
+    act_assignments = get_action_assignments(policy_id)
+    assert len(act_assignments) == 3
+
+
+def test_add_object_assignments(db):
+    policy_id = "admin"
+    object_id = "object_id_1"
+    category_id = "category_id_1"
+    data_id = "data_id_1"
+    object_assignments = add_object_assignment(policy_id, object_id, category_id, data_id)
+    assert object_assignments
+    object_id_1 = list(object_assignments.keys())[0]
+    assert object_assignments[object_id_1]["policy_id"] == policy_id
+    assert object_assignments[object_id_1]["object_id"] == object_id
+    assert object_assignments[object_id_1]["category_id"] == category_id
+    assert len(object_assignments[object_id_1].get("assignments")) == 1
+    assert data_id in object_assignments[object_id_1].get("assignments")
+
+
+def test_delete_object_assignment(db):
+    policy_id = "admin_1"
+    add_object_assignment(policy_id, "", "", "")
+    object_id = "action_id_2"
+    category_id = "category_id_2"
+    data_id = "data_id_2"
+    add_object_assignment(policy_id, object_id, category_id, data_id)
+    delete_object_assignment(policy_id, "", "", "")
+    assignments = get_object_assignments(policy_id, )
+    assert len(assignments) == 1
+
+
+def test_delete_object_assignment_with_invalid_policy_id(db):
+    policy_id = "invalid_id"
+    delete_object_assignment(policy_id, "", "", "")
+    assignments = get_object_assignments(policy_id, )
+    assert len(assignments) == 0
+
+
+def test_get_subject_assignments(db):
+    policy_id = "admin"
+    subject_id = "object_id_1"
+    category_id = "category_id_1"
+    data_id = "data_id_1"
+    add_subject_assignment(policy_id, subject_id, category_id, data_id)
+    subj_assignments = get_subject_assignments(policy_id, subject_id, category_id)
+    subject_id_1 = list(subj_assignments.keys())[0]
+    assert subj_assignments[subject_id_1]["policy_id"] == policy_id
+    assert subj_assignments[subject_id_1]["subject_id"] == subject_id
+    assert subj_assignments[subject_id_1]["category_id"] == category_id
+    assert len(subj_assignments[subject_id_1].get("assignments")) == 1
+    assert data_id in subj_assignments[subject_id_1].get("assignments")
+
+
+def test_get_subject_assignments_by_policy_id(db):
+    policy_id = "admin"
+    subject_id_1 = "subject_id_1"
+    category_id_1 = "category_id_1"
+    data_id = "data_id_1"
+    add_subject_assignment(policy_id, subject_id_1, category_id_1, data_id)
+    subject_id_2 = "subject_id_2"
+    category_id_2 = "category_id_2"
+    data_id = "data_id_2"
+    add_subject_assignment(policy_id, subject_id_2, category_id_2, data_id)
+    subject_id_3 = "subject_id_3"
+    category_id_3 = "category_id_3"
+    data_id = "data_id_3"
+    add_subject_assignment(policy_id, subject_id_3, category_id_3, data_id)
+    subj_assignments = get_subject_assignments(policy_id)
+    assert len(subj_assignments) == 3
+
+
+def test_add_subject_assignments(db):
+    policy_id = "admin"
+    subject_id = "subject_id_1"
+    category_id = "category_id_1"
+    data_id = "data_id_1"
+    subject_assignments = add_subject_assignment(policy_id, subject_id, category_id, data_id)
+    assert subject_assignments
+    subject_id_1 = list(subject_assignments.keys())[0]
+    assert subject_assignments[subject_id_1]["policy_id"] == policy_id
+    assert subject_assignments[subject_id_1]["subject_id"] == subject_id
+    assert subject_assignments[subject_id_1]["category_id"] == category_id
+    assert len(subject_assignments[subject_id_1].get("assignments")) == 1
+    assert data_id in subject_assignments[subject_id_1].get("assignments")
+
+
+def test_delete_subject_assignment(db):
+    policy_id = "admin_1"
+    add_subject_assignment(policy_id, "", "", "")
+    subject_id = "subject_id_2"
+    category_id = "category_id_2"
+    data_id = "data_id_2"
+    add_subject_assignment(policy_id, subject_id, category_id, data_id)
+    delete_subject_assignment(policy_id, "", "", "")
+    assignments = get_subject_assignments(policy_id, )
+    assert len(assignments) == 1
+
+
+def test_delete_subject_assignment_with_invalid_policy_id(db):
+    policy_id = "invalid_id"
+    delete_subject_assignment(policy_id, "", "", "")
+    assignments = get_subject_assignments(policy_id, )
+    assert len(assignments) == 0
diff --git a/python_moondb/tests/unit_python/policies/test_data.py b/python_moondb/tests/unit_python/policies/test_data.py
new file mode 100755 (executable)
index 0000000..68b1d2a
--- /dev/null
@@ -0,0 +1,513 @@
+import policies.mock_data as mock_data
+import pytest
+
+
+def get_action_data(policy_id, data_id=None, category_id=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.get_action_data("", policy_id, data_id, category_id)
+
+
+def add_action_data(policy_id, data_id=None, category_id=None, value=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.add_action_data("", policy_id, data_id, category_id, value)
+
+
+def delete_action_data(policy_id, data_id):
+    from python_moondb.core import PolicyManager
+    PolicyManager.delete_action_data("", policy_id, data_id)
+
+
+def get_object_data(policy_id, data_id=None, category_id=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.get_object_data("", policy_id, data_id, category_id)
+
+
+def add_object_data(policy_id, data_id=None, category_id=None, value=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.add_object_data("", policy_id, data_id, category_id, value)
+
+
+def delete_object_data(policy_id, data_id):
+    from python_moondb.core import PolicyManager
+    PolicyManager.delete_object_data("", policy_id, data_id)
+
+
+def get_subject_data(policy_id, data_id=None, category_id=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.get_subject_data("", policy_id, data_id, category_id)
+
+
+def add_subject_data(policy_id, data_id=None, category_id=None, value=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.set_subject_data("", policy_id, data_id, category_id, value)
+
+
+def delete_subject_data(policy_id, data_id):
+    from python_moondb.core import PolicyManager
+    PolicyManager.delete_subject_data("", policy_id, data_id)
+
+
+def get_actions(policy_id, perimeter_id=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.get_actions("", policy_id, perimeter_id)
+
+
+def add_action(policy_id, perimeter_id=None, value=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.add_action("", policy_id, perimeter_id, value)
+
+
+def delete_action(policy_id, perimeter_id):
+    from python_moondb.core import PolicyManager
+    PolicyManager.delete_action("", policy_id, perimeter_id)
+
+
+def get_objects(policy_id, perimeter_id=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.get_objects("", policy_id, perimeter_id)
+
+
+def add_object(policy_id, perimeter_id=None, value=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.add_object("", policy_id, perimeter_id, value)
+
+
+def delete_object(policy_id, perimeter_id):
+    from python_moondb.core import PolicyManager
+    PolicyManager.delete_object("", policy_id, perimeter_id)
+
+
+def get_subjects(policy_id, perimeter_id=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.get_subjects("", policy_id, perimeter_id)
+
+
+def add_subject(policy_id, perimeter_id=None, value=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.add_subject("", policy_id, perimeter_id, value)
+
+
+def delete_subject(policy_id, perimeter_id):
+    from python_moondb.core import PolicyManager
+    PolicyManager.delete_subject("", policy_id, perimeter_id)
+
+
+def get_available_metadata(policy_id):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.get_available_metadata("", policy_id)
+
+
+def test_get_action_data(db):
+    policy_id = mock_data.get_policy_id()
+    get_available_metadata(policy_id)
+
+    policy_id = policy_id
+    data_id = "data_id_1"
+    category_id = "action_category_id1"
+    value = {
+        "name": "action-type",
+        "description": {"vm-action": "", "storage-action": "", },
+    }
+    add_action_data(policy_id, data_id, category_id, value)
+    action_data = get_action_data(policy_id, data_id, category_id)
+    assert action_data
+    assert len(action_data[0]['data']) == 1
+
+
+def test_get_action_data_with_invalid_category_id(db):
+    policy_id = mock_data.get_policy_id()
+    get_available_metadata(policy_id)
+
+    policy_id = policy_id
+    data_id = "data_id_1"
+    category_id = "action_category_id1"
+    value = {
+        "name": "action-type",
+        "description": {"vm-action": "", "storage-action": "", },
+    }
+    add_action_data(policy_id, data_id, category_id, value)
+    action_data = get_action_data(policy_id)
+    assert action_data
+    assert len(action_data[0]['data']) == 1
+
+
+def test_add_action_data(db):
+    policy_id = "policy_id_1"
+    data_id = "data_id_1"
+    category_id = "category_id_1"
+    value = {
+        "name": "action-type",
+        "description": {"vm-action": "", "storage-action": "", },
+    }
+    action_data = add_action_data(policy_id, data_id, category_id, value).get('data')
+    assert action_data
+    action_data_id = list(action_data.keys())[0]
+    assert action_data[action_data_id].get('policy_id') == policy_id
+
+
+def test_delete_action_data(db):
+    policy_id = mock_data.get_policy_id()
+    get_available_metadata(policy_id)
+    data_id = "data_id_1"
+    category_id = "category_id_1"
+    value = {
+        "name": "action-type",
+        "description": {"vm-action": "", "storage-action": "", },
+    }
+    action_data = add_action_data(policy_id, data_id, category_id, value).get('data')
+    action_data_id = list(action_data.keys())[0]
+    delete_action_data(action_data[action_data_id].get('policy_id'), None)
+    new_action_data = get_action_data(policy_id)
+    assert len(new_action_data[0]['data']) == 0
+
+
+def test_get_object_data(db):
+    policy_id = mock_data.get_policy_id()
+    get_available_metadata(policy_id)
+
+    policy_id = policy_id
+    data_id = "data_id_1"
+    category_id = "object_category_id1"
+    value = {
+        "name": "object-security-level",
+        "description": {"low": "", "medium": "", "high": ""},
+    }
+    add_object_data(policy_id, data_id, category_id, value)
+    object_data = get_object_data(policy_id, data_id, category_id)
+    assert object_data
+    assert len(object_data[0]['data']) == 1
+
+
+def test_get_object_data_with_invalid_category_id(db):
+    policy_id = mock_data.get_policy_id()
+    get_available_metadata(policy_id)
+
+    policy_id = policy_id
+    data_id = "data_id_1"
+    category_id = "object_category_id1"
+    value = {
+        "name": "object-security-level",
+        "description": {"low": "", "medium": "", "high": ""},
+    }
+    add_object_data(policy_id, data_id, category_id, value)
+    object_data = get_object_data(policy_id)
+    assert object_data
+    assert len(object_data[0]['data']) == 1
+
+
+def test_add_object_data(db):
+    policy_id = "policy_id_1"
+    data_id = "data_id_1"
+    category_id = "object_category_id1"
+    value = {
+        "name": "object-security-level",
+        "description": {"low": "", "medium": "", "high": ""},
+    }
+    object_data = add_object_data(policy_id, data_id, category_id, value).get('data')
+    assert object_data
+    object_data_id = list(object_data.keys())[0]
+    assert object_data[object_data_id].get('policy_id') == policy_id
+
+
+def test_delete_object_data(db):
+    policy_id = mock_data.get_policy_id()
+    get_available_metadata(policy_id)
+    data_id = "data_id_1"
+    category_id = "object_category_id1"
+    value = {
+        "name": "object-security-level",
+        "description": {"low": "", "medium": "", "high": ""},
+    }
+    object_data = add_object_data(policy_id, data_id, category_id, value).get('data')
+    object_data_id = list(object_data.keys())[0]
+    delete_object_data(object_data[object_data_id].get('policy_id'), data_id)
+    new_object_data = get_object_data(policy_id)
+    assert len(new_object_data[0]['data']) == 0
+
+
+def test_get_subject_data(db):
+    policy_id = mock_data.get_policy_id()
+    get_available_metadata(policy_id)
+
+    policy_id = policy_id
+    data_id = "data_id_1"
+    category_id = "subject_category_id1"
+    value = {
+        "name": "subject-security-level",
+        "description": {"low": "", "medium": "", "high": ""},
+    }
+    add_subject_data(policy_id, data_id, category_id, value)
+    subject_data = get_subject_data(policy_id, data_id, category_id)
+    assert subject_data
+    assert len(subject_data[0]['data']) == 1
+
+
+def test_get_subject_data_with_invalid_category_id(db):
+    policy_id = mock_data.get_policy_id()
+    get_available_metadata(policy_id)
+
+    policy_id = policy_id
+    data_id = "data_id_1"
+    category_id = "subject_category_id1"
+    value = {
+        "name": "subject-security-level",
+        "description": {"low": "", "medium": "", "high": ""},
+    }
+    add_subject_data(policy_id, data_id, category_id, value)
+    subject_data = get_subject_data(policy_id)
+    assert subject_data
+    assert len(subject_data[0]['data']) == 1
+
+
+def test_add_subject_data(db):
+    policy_id = "policy_id_1"
+    data_id = "data_id_1"
+    category_id = "subject_category_id1"
+    value = {
+        "name": "subject-security-level",
+        "description": {"low": "", "medium": "", "high": ""},
+    }
+    subject_data = add_object_data(policy_id, data_id, category_id, value).get('data')
+    assert subject_data
+    subject_data_id = list(subject_data.keys())[0]
+    assert subject_data[subject_data_id].get('policy_id') == policy_id
+
+
+def test_delete_subject_data(db):
+    policy_id = mock_data.get_policy_id()
+    get_available_metadata(policy_id)
+    data_id = "data_id_1"
+    category_id = "subject_category_id1"
+    value = {
+        "name": "subject-security-level",
+        "description": {"low": "", "medium": "", "high": ""},
+    }
+    subject_data = add_subject_data(policy_id, data_id, category_id, value).get('data')
+    subject_data_id = list(subject_data.keys())[0]
+    delete_subject_data(subject_data[subject_data_id].get('policy_id'), data_id)
+    new_subject_data = get_subject_data(policy_id)
+    assert len(new_subject_data[0]['data']) == 0
+
+
+def test_get_actions(db):
+    policy_id = "policy_id_1"
+    value = {
+        "name": "test_action",
+        "description": "test",
+    }
+    add_action(policy_id=policy_id, value=value)
+    actions = get_actions(policy_id, )
+    assert actions
+    assert len(actions) == 1
+    action_id = list(actions.keys())[0]
+    assert actions[action_id].get('policy_list')[0] == policy_id
+
+
+def test_add_action(db):
+    policy_id = "policy_id_1"
+    value = {
+        "name": "test_action",
+        "description": "test",
+    }
+    action = add_action(policy_id=policy_id, value=value)
+    assert action
+    action_id = list(action.keys())[0]
+    assert len(action[action_id].get('policy_list')) == 1
+
+
+def test_add_action_multiple_times(db):
+    policy_id = "policy_id_1"
+    value = {
+        "name": "test_action",
+        "description": "test",
+    }
+    action = add_action(policy_id=policy_id, value=value)
+    action_id = list(action.keys())[0]
+    perimeter_id = action[action_id].get('id')
+    assert action
+    value = {
+        "name": "test_action",
+        "description": "test",
+        "policy_list": ['policy_id_3', 'policy_id_4']
+    }
+    action = add_action('policy_id_7', perimeter_id, value)
+    assert action
+    action_id = list(action.keys())[0]
+    assert len(action[action_id].get('policy_list')) == 2
+
+
+def test_delete_action(db):
+    policy_id = "policy_id_1"
+    value = {
+        "name": "test_action",
+        "description": "test",
+    }
+    action = add_action(policy_id=policy_id, value=value)
+    action_id = list(action.keys())[0]
+    delete_action(policy_id, action_id)
+    actions = get_actions(policy_id, )
+    assert not actions
+
+
+def test_delete_action_with_invalid_perimeter_id(db):
+    policy_id = "invalid"
+    perimeter_id = "invalid"
+    with pytest.raises(Exception) as exception_info:
+        delete_action(policy_id, perimeter_id)
+    assert str(exception_info.value) == '400: Action Unknown'
+
+
+def test_get_objects(db):
+    policy_id = "policy_id_1"
+    value = {
+        "name": "test_object",
+        "description": "test",
+    }
+    add_object(policy_id=policy_id, value=value)
+    objects = get_objects(policy_id, )
+    assert objects
+    assert len(objects) == 1
+    object_id = list(objects.keys())[0]
+    assert objects[object_id].get('policy_list')[0] == policy_id
+
+
+def test_add_object(db):
+    policy_id = "policy_id_1"
+    value = {
+        "name": "test_object",
+        "description": "test",
+    }
+    added_object = add_object(policy_id=policy_id, value=value)
+    assert added_object
+    object_id = list(added_object.keys())[0]
+    assert len(added_object[object_id].get('policy_list')) == 1
+
+
+def test_add_objects_multiple_times(db):
+    policy_id = "policy_id_1"
+    value = {
+        "name": "test_object",
+        "description": "test",
+    }
+    added_object = add_object(policy_id=policy_id, value=value)
+    object_id = list(added_object.keys())[0]
+    perimeter_id = added_object[object_id].get('id')
+    assert added_object
+    value = {
+        "name": "test_object",
+        "description": "test",
+        "policy_list": ['policy_id_3', 'policy_id_4']
+    }
+    added_object = add_object('policy_id_7', perimeter_id, value)
+    assert added_object
+    object_id = list(added_object.keys())[0]
+    assert len(added_object[object_id].get('policy_list')) == 2
+
+
+def test_delete_object(db):
+    policy_id = "policy_id_1"
+    value = {
+        "name": "test_object",
+        "description": "test",
+    }
+    added_object = add_object(policy_id=policy_id, value=value)
+    object_id = list(added_object.keys())[0]
+    delete_object(policy_id, object_id)
+    objects = get_objects(policy_id, )
+    assert not objects
+
+
+def test_delete_object_with_invalid_perimeter_id(db):
+    policy_id = "invalid"
+    perimeter_id = "invalid"
+    with pytest.raises(Exception) as exception_info:
+        delete_object(policy_id, perimeter_id)
+    assert str(exception_info.value) == '400: Object Unknown'
+
+
+def test_get_subjects(db):
+    policy_id = "policy_id_1"
+    value = {
+        "name": "testuser",
+        "description": "test",
+    }
+    add_subject(policy_id=policy_id, value=value)
+    subjects = get_subjects(policy_id, )
+    assert subjects
+    assert len(subjects) == 1
+    subject_id = list(subjects.keys())[0]
+    assert subjects[subject_id].get('policy_list')[0] == policy_id
+
+
+def test_add_subject(db):
+    policy_id = "policy_id_1"
+    value = {
+        "name": "testuser",
+        "description": "test",
+    }
+    subject = add_subject(policy_id=policy_id, value=value)
+    assert subject
+    subject_id = list(subject.keys())[0]
+    assert len(subject[subject_id].get('policy_list')) == 1
+
+
+def test_add_subjects_multiple_times(db):
+    policy_id = "policy_id_1"
+    value = {
+        "name": "testuser",
+        "description": "test",
+    }
+    subject = add_subject(policy_id=policy_id, value=value)
+    subject_id = list(subject.keys())[0]
+    perimeter_id = subject[subject_id].get('id')
+    assert subject
+    value = {
+        "name": "testuser",
+        "description": "test",
+        "policy_list": ['policy_id_3', 'policy_id_4']
+    }
+    subject = add_subject('policy_id_7', perimeter_id, value)
+    assert subject
+    subject_id = list(subject.keys())[0]
+    assert len(subject[subject_id].get('policy_list')) == 2
+
+
+def test_delete_subject(db):
+    policy_id = "policy_id_1"
+    value = {
+        "name": "testuser",
+        "description": "test",
+    }
+    subject = add_subject(policy_id=policy_id, value=value)
+    subject_id = list(subject.keys())[0]
+    delete_subject(policy_id, subject_id)
+    subjects = get_subjects(policy_id, )
+    assert not subjects
+
+
+def test_delete_subject_with_invalid_perimeter_id(db):
+    policy_id = "invalid"
+    perimeter_id = "invalid"
+    with pytest.raises(Exception) as exception_info:
+        delete_subject(policy_id, perimeter_id)
+    assert str(exception_info.value) == '400: Subject Unknown'
+
+
+def test_get_available_metadata(db):
+    policy_id = mock_data.get_policy_id()
+    metadata = get_available_metadata(policy_id)
+    assert metadata
+    assert metadata['object'][0] == "object_category_id1"
+    assert metadata['subject'][0] == "subject_category_id1"
+    assert metadata['subject'][1] == "subject_category_id2"
+
+
+def test_get_available_metadata_empty_model(db):
+    import policies.test_policies as test_policies
+    policy_id = mock_data.get_policy_id()
+    value = mock_data.create_policy("invalid")
+    policy = test_policies.add_policies(value)
+    assert policy
+    policy_id = list(policy.keys())[0]
+    metadata = get_available_metadata(policy_id)
+    assert metadata
\ No newline at end of file
diff --git a/python_moondb/tests/unit_python/policies/test_policies.py b/python_moondb/tests/unit_python/policies/test_policies.py
new file mode 100755 (executable)
index 0000000..acd5d7a
--- /dev/null
@@ -0,0 +1,161 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+
+def get_policies():
+    from python_moondb.core import PolicyManager
+    return PolicyManager.get_policies("admin")
+
+
+def add_policies(value=None):
+    from python_moondb.core import PolicyManager
+    if not value:
+        value = {
+            "name": "test_policiy",
+            "model_id": "",
+            "genre": "authz",
+            "description": "test",
+        }
+    return PolicyManager.add_policy("admin", value=value)
+
+
+def delete_policies(uuid=None, name=None):
+    from python_moondb.core import PolicyManager
+    if not uuid:
+        for policy_id, policy_value in get_policies():
+            if name == policy_value['name']:
+                uuid = policy_id
+                break
+    PolicyManager.delete_policy("admin", uuid)
+
+
+def get_rules(policy_id=None, meta_rule_id=None, rule_id=None):
+    from python_moondb.core import PolicyManager
+    return PolicyManager.get_rules("", policy_id, meta_rule_id, rule_id)
+
+
+def add_rule(policy_id=None, meta_rule_id=None, value=None):
+    from python_moondb.core import PolicyManager
+    if not value:
+        value = {
+            "rule": ("high", "medium", "vm-action"),
+            "instructions": ({"decision": "grant"}),
+            "enabled": "",
+        }
+    return PolicyManager.add_rule("", policy_id, meta_rule_id, value)
+
+
+def delete_rule(policy_id=None, rule_id=None):
+    from python_moondb.core import PolicyManager
+    PolicyManager.delete_rule("", policy_id, rule_id)
+
+
+def test_get_policies(db):
+    policies = get_policies()
+    assert isinstance(policies, dict)
+    assert not policies
+
+
+def test_add_policies(db):
+    value = {
+        "name": "test_policy",
+        "model_id": "",
+        "genre": "authz",
+        "description": "test",
+    }
+    policies = add_policies(value)
+    assert isinstance(policies, dict)
+    assert policies
+    assert len(policies.keys()) == 1
+    policy_id = list(policies.keys())[0]
+    for key in ("genre", "name", "model_id", "description"):
+        assert key in policies[policy_id]
+        assert policies[policy_id][key] == value[key]
+
+
+def test_delete_policies(db):
+    value = {
+        "name": "test_policy1",
+        "model_id": "",
+        "genre": "authz",
+        "description": "test",
+    }
+    policies = add_policies(value)
+    policy_id1 = list(policies.keys())[0]
+    value = {
+        "name": "test_policy2",
+        "model_id": "",
+        "genre": "authz",
+        "description": "test",
+    }
+    policies = add_policies(value)
+    policy_id2 = list(policies.keys())[0]
+    assert policy_id1 != policy_id2
+    delete_policies(policy_id1)
+    policies = get_policies()
+    assert policy_id1 not in policies
+
+
+def test_get_rules(db):
+    value = {
+        "rule": ("low", "medium", "vm-action"),
+        "instructions": ({"decision": "grant"}),
+        "enabled": "",
+    }
+    policy_id = "1"
+    meta_rule_id = "1"
+    add_rule(policy_id, meta_rule_id, value)
+    value = {
+        "rule": ("low", "low", "vm-action"),
+        "instructions": ({"decision": "grant"}),
+        "enabled": "",
+    }
+    policy_id = "1"
+    meta_rule_id = "1"
+    add_rule(policy_id, meta_rule_id, value)
+    rules = get_rules(policy_id, meta_rule_id)
+    assert isinstance(rules, dict)
+    assert rules
+    obj = rules.get('rules')
+    assert len(obj) == 2
+
+
+def test_get_rules_with_invalid_policy_id_failure(db):
+    rules = get_rules("invalid_policy_id", "meta_rule_id")
+    assert not rules.get('meta_rule-id')
+    assert len(rules.get('rules')) == 0
+
+
+def test_add_rule(db):
+    value = {
+        "rule": ("high", "medium", "vm-action"),
+        "instructions": ({"decision": "grant"}),
+        "enabled": "",
+    }
+    policy_id = "1"
+    meta_rule_id = "1"
+    rules = add_rule(policy_id, meta_rule_id, value)
+    assert rules
+    assert len(rules) == 1
+    assert isinstance(rules, dict)
+    rule_id = list(rules.keys())[0]
+    for key in ("rule", "instructions", "enabled"):
+        assert key in rules[rule_id]
+        assert rules[rule_id][key] == value[key]
+
+
+def test_delete_rule(db):
+    value = {
+        "rule": ("low", "low", "vm-action"),
+        "instructions": ({"decision": "grant"}),
+        "enabled": "",
+    }
+    policy_id = "2"
+    meta_rule_id = "2"
+    rules = add_rule(policy_id, meta_rule_id, value)
+    rule_id = list(rules.keys())[0]
+    delete_rule(policy_id, rule_id)
+    rules = get_rules(policy_id, meta_rule_id)
+    assert not rules.get('rules')
diff --git a/python_moondb/tests/unit_python/test_policies.py b/python_moondb/tests/unit_python/test_policies.py
deleted file mode 100644 (file)
index 2d65466..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
-# This software is distributed under the terms and conditions of the 'Apache-2.0'
-# license which can be found in the file 'LICENSE' in this package distribution
-# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
-
-
-def get_policies():
-    from python_moondb.core import PolicyManager
-    return PolicyManager.get_policies("admin")
-
-
-def add_policies(value=None):
-    from python_moondb.core import PolicyManager
-    if not value:
-        value = {
-            "name": "test_policiy",
-            "model_id": "",
-            "genre": "authz",
-            "description": "test",
-        }
-    return PolicyManager.add_policy("admin", value=value)
-
-
-def delete_policies(uuid=None, name=None):
-    from python_moondb.core import PolicyManager
-    if not uuid:
-        for policy_id, policy_value in get_policies():
-            if name == policy_value['name']:
-                uuid = policy_id
-                break
-    PolicyManager.delete_policy("admin", uuid)
-
-
-def test_get_policies(db):
-    policies = get_policies()
-    assert isinstance(policies, dict)
-    assert not policies
-
-
-def test_add_policies(db):
-    value = {
-        "name": "test_policy",
-        "model_id": "",
-        "genre": "authz",
-        "description": "test",
-    }
-    policies = add_policies(value)
-    assert isinstance(policies, dict)
-    assert policies
-    assert len(policies.keys()) == 1
-    policy_id = list(policies.keys())[0]
-    for key in ("genre", "name", "model_id", "description"):
-        assert key in policies[policy_id]
-        assert policies[policy_id][key] == value[key]
-
-
-def test_delete_policies(db):
-    value = {
-        "name": "test_policy1",
-        "model_id": "",
-        "genre": "authz",
-        "description": "test",
-    }
-    policies = add_policies(value)
-    policy_id1 = list(policies.keys())[0]
-    value = {
-        "name": "test_policy2",
-        "model_id": "",
-        "genre": "authz",
-        "description": "test",
-    }
-    policies = add_policies(value)
-    policy_id2 = list(policies.keys())[0]
-    assert policy_id1 != policy_id2
-    delete_policies(policy_id1)
-    policies = get_policies()
-    assert policy_id1 not in policies