import requests
+import configparser
-URL = "http://172.18.0.11:38001{}"
+config = configparser.ConfigParser()
+config.read("/etc/moon/moon.conf")
+
+URL = "http://{}:{}".format(config['interface']['host'], config['interface']['port'])
HEADERS = {"content-type": "application/json"}
-KEYSTONE_USER = "admin"
-KEYSTONE_PASSWORD = "p4ssw0rd"
-KEYSTONE_PROJECT = "admin"
+KEYSTONE_USER = config['keystone']['user']
+KEYSTONE_PASSWORD = config['keystone']['password']
+KEYSTONE_PROJECT = config['keystone']['project']
+KEYSTONE_SERVER = config['keystone']['url']
pdp_template = {
"name": "test_pdp",
def get_keystone_projects():
- server = "keystone"
HEADERS = {
"Content-Type": "application/json"
}
}
- req = requests.post("http://{}:5000/v3/auth/tokens".format(server), json=data_auth, headers=HEADERS)
+ req = requests.post("{}/auth/tokens".format(KEYSTONE_SERVER), json=data_auth, headers=HEADERS)
assert req.status_code in (200, 201)
TOKEN = req.headers['X-Subject-Token']
HEADERS['X-Auth-Token'] = TOKEN
- req = requests.get("http://{}:5000/v3/projects".format(server), headers=HEADERS)
+ req = requests.get("{}/projects".format(KEYSTONE_SERVER), headers=HEADERS)
assert req.status_code in (200, 201)
return req.json()
def check_pdp(pdp_id=None, keystone_project_id=None):
- req = requests.get(URL.format("/pdp"))
+ req = requests.get(URL + "/pdp")
assert req.status_code == 200
result = req.json()
assert type(result) is dict
pdp_template['name'] = name
if policy_id:
pdp_template['security_pipeline'].append(policy_id)
- req = requests.post(URL.format("/pdp"), json=pdp_template, headers=HEADERS)
+ req = requests.post(URL + "/pdp", json=pdp_template, headers=HEADERS)
assert req.status_code == 200
result = req.json()
assert type(result) is dict
def update_pdp(pdp_id, policy_id=None):
- req = requests.get(URL.format("/pdp/{}".format(pdp_id)))
+ req = requests.get(URL + "/pdp/{}".format(pdp_id))
assert req.status_code == 200
result = req.json()
assert type(result) is dict
pipeline = result['pdps'][pdp_id]["security_pipeline"]
if policy_id not in pipeline:
pipeline.append(policy_id)
- req = requests.patch(URL.format("/pdp/{}".format(pdp_id)),
+ req = requests.patch(URL + "/pdp/{}".format(pdp_id),
json={"security_pipeline": pipeline})
assert req.status_code == 200
result = req.json()
assert "pdps" in result
assert pdp_id in result['pdps']
- req = requests.get(URL.format("/pdp/{}".format(pdp_id)))
+ req = requests.get(URL + "/pdp/{}".format(pdp_id))
assert req.status_code == 200
result = req.json()
assert type(result) is dict
def map_to_keystone(pdp_id, keystone_project_id):
- req = requests.patch(URL.format("/pdp/{}".format(pdp_id)), json={"keystone_project_id": keystone_project_id},
+ req = requests.patch(URL + "/pdp/{}".format(pdp_id), json={"keystone_project_id": keystone_project_id},
headers=HEADERS)
assert req.status_code == 200
result = req.json()
def delete_pdp(pdp_id):
- req = requests.delete(URL.format("/pdp/{}".format(pdp_id)))
+ req = requests.delete(URL + "/pdp/{}".format(pdp_id))
assert req.status_code == 200
result = req.json()
assert type(result) is dict