self.container_format = "bare"
self.disk_format = "qcow2"
self.imagename = "TestSfcVm"
- self.createImage = "/home/root1/devstack/files/images/\
- firewall_block_image.img"
+ self.createImage = ("/home/root1/devstack/files/images/"
+ "firewall_block_image.img")
self.vm_name = "vm"
self.imageRef = "test"
def getToken(self):
"""Get the keystone token value from Openstack ."""
- url = 'http://' + self.keystone_hostname + \
- ':5000/' + self.osver + '/tokens'
- data = '{"auth": {"tenantName": "admin", "passwordCredentials":\
- { "username": "admin", "password": "console"}}}'
+ url = 'http://%s:5000/%s/tokens' % (self.keystone_hostname,
+ self.osver)
+ data = ('{"auth": {"tenantName": "admin", "passwordCredentials":'
+ '{ "username": "admin", "password": "console"}}}')
headers = {"Accept": "application/json"}
response = requests.post(url, headers=headers, data=data)
if (response.status_code == OK):
Dicdata['admin_state_up'] = self.admin_state_up
Dicdata = {'network': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + \
- ':9696/' + self.osver + '/networks'
+ url = 'http://%s:9696/%s/networks' % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
Dicdata = {'subnet': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + \
- ':9696/' + self.osver + '/subnets'
+ url = 'http://%s:9696/%s/subnets' % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
Dicdata = {'port': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + \
- ':9696/' + self.osver + '/ports'
+ url = 'http://%s:9696/%s/ports' % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
def createVm(self):
"""Creation of Instance, using firewall image."""
- url = 'http://' + self.glance_hostname + \
- ':9292/v2/images?name=TestSfcVm'
+ url = ("http://%s:9292/v2/images?"
+ "name=TestSfcVm" % (self.glance_hostname))
headers = {"Accept": "application/json", "Content-Type": "application/\
octet-stream", "X-Auth-Token": self.token_id}
response = requests.get(url, headers=headers)
else:
return(response.status_code)
- url = 'http://' + self.nova_hostname + \
- ':8774/v2.1/' + self.tenant_id + '/flavors?name=m1.tiny'
+ url = ("http://%s:8774//v2.1/%s/ports/"
+ "%s/flavors?name=m1.tiny" % (self.nova_hostname,
+ self.tenant_id))
+
headers = {"Accept": "application/json", "Content-Type":
"application/json", "X-Auth-Token": self.token_id}
response = requests.get(url, headers=headers)
Dicdata['networks'] = org_nw_port
Dicdata = {'server': Dicdata}
data = json.dumps(Dicdata, indent=4)
-
- url = ('http://' + self.nova_hostname + ':8774/v2.1/' +
- self.tenant_id + '/servers')
+ url = 'http://%s:8774/v2.1/%s/servers' % (self.nova_hostname,
+ self.tenant_id)
headers = {"Accept": "application/json", "Content-Type":
"application/json", "X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
"""Checking the Status of the Instance."""
time.sleep(10)
for y in range(0, 3):
- url = 'http://' + \
- self.nova_hostname + \
- ':8774/v2.1/servers/detail?name=vm' + str(y)
+ url = ("http://%s:8774/v2.1/servers/"
+ "detail?name=vm" + str(y)) % (self.neutron_hostname)
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.get(url, headers=headers)
self.logger.debug(json1_data)
self.vm_active = json1_data['servers'][0]['status']
if (self.vm_active == "ACTIVE"):
- info = "VM" + str(y) + \
- " is Active : " + self.vm_active
+ info = "VM" + str(y) + " is Active : " + self.vm_active
else:
- info = "VM" + str(y) + " is NOT Active : " + \
- self.vm_active
+ info = "VM" + str(y) + " is NOT Active : " + self.vm_active
self.logger.debug(info)
else:
return(response.status_code)
Dicdata = {'port_pair': Dicdata}
data = json.dumps(Dicdata, indent=4)
-
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pairs'
+ url = 'http://%s:9696/%s/sfc/port_pairs' % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
- info = "Creation of Port Pair PP" + str(p) + \
- " is successful"
+ info = ("Creation of Port Pair PP" + str(p) +
+ " is successful")
self.logger.debug(info)
else:
return(response.status_code)
def getPortPair(self):
"""Query the Portpair id value."""
for p in range(0, 1):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pairs?name=PP1'
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
+ url = ("http://%s:9696/%s/ports/"
+ "sfc/port_pairs?name=PP1" % (self.neutron_hostname,
+ self.osver))
+ headers = {"Accept": "application/json",
+ "X-Auth-Token": self.token_id}
response = requests.get(url, headers=headers)
if (response.status_code == OK):
Dicdata = {'port_pair_group': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pair_groups'
+ url = ("http://%s:9696/%s/"
+ "sfc/port_pair_groups" % (self.neutron_hostname,
+ self.osver))
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
- info = "Creation of Port Group PG" + str(p) + \
- "is successful"
+ info = ("Creation of Port Group PG" + str(p) +
+ "is successful")
self.logger.debug(info)
else:
return(response.status_code)
def getPortGroup(self):
"""Query the PortGroup id."""
for p in range(0, 1):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pair_groups?name=PG' + str(p)
+ url = ("http://%s:9696/%s/sfc/port_pair_groups"
+ "?name=PG" + str(p)) % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.get(url, headers=headers)
Dicdata = {'flow_classifier': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/flow_classifiers'
+ url = ("http://%s:9696/%s/"
+ "sfc/flow_classifiers" % (self.neutron_hostname,
+ self.osver))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
Dicdata = {'port_chain': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_chains'
+ url = 'http://%s:9696/%s/sfc/port_chains' % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json",
"Content-Type": "application/json",
"X-Auth-Token": self.token_id}
Dicdata = {'router': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + \
- self.osver + '/routers.json'
+ url = 'http://%s:9696/%s/routers.json' % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
def attachInterface(self):
"""Attachment of instance ports to the Router."""
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/networks?name=admin_floating_net'
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
+ url = ("http://%s:9696/%s/networks"
+ "?name=admin_floating_net" % (self.neutron_hostname,
+ self.osver))
+ headers = {"Accept": "application/json", "X-Auth-Token": self.token_id}
response = requests.get(url, headers=headers)
if (response.status_code == OK):
self.logger.debug(response.status_code)
Dicdata['subnet_id'] = self.subnetId
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/routers/' + self.router_id + '/add_router_interface'
+ url = ("http://%s:9696/%s/routers"
+ "/%s/add_router_interface" % (self.neutron_hostname,
+ self.osver,
+ self.router_id))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.put(url, headers=headers, data=data)
Dicdata1 = {'external_gateway_info': Dicdata1}
Dicdata1 = {'router': Dicdata1}
data = json.dumps(Dicdata1, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/routers/' + self.router_id
+ url = 'http://%s:9696/%s/routers/%s' % (self.neutron_hostname,
+ self.osver,
+ self.router_id)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.put(url, headers=headers, data=data)
Dicdata['pool'] = "admin_floating_net"
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.nova_hostname + ':8774/v2.1/os-floating-ips'
+ url = ("http://%s:8774/v2.1/"
+ "os-floating-ips" % (self.nova_hostname))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
Dicdata1 = {'addFloatingIp': Dicdata1}
data = json.dumps(Dicdata1, indent=4)
- url = 'http://' + self.nova_hostname + ':8774/v2.1/servers/' + \
- self.vm[ip_num] + '/action'
+ url = ("http://%s:8774/v2.1/"
+ "servers/%s/action" % (self.nova_hostname,
+ self.vm[ip_num]))
+
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
def deletePortChain(self):
"""Deletion of PortChain."""
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_chains/' + self.PC_id
+ url = ('http://' + self.neutron_hostname + ':9696/' +
+ self.osver + '/sfc/port_chains/' + self.PC_id)
headers = {"Accept": "application/json", "Content-Type":
"application/json", "X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
def deleteFlowClassifier(self):
"""Deletion of Flow Classifier."""
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/flow_classifiers/' + self.flow_class_if
+ url = ("http://%s:9696/%s/sfc/"
+ "flow_classifiers/%s" % (self.neutron_hostname,
+ self.osver,
+ self.flow_class_if))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
def deletePortGroup(self):
"""Deletion of PortGroup."""
for p in range(0, 1):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pair_groups/' + self.port_grp_id[p]
+ url = ("http://%s:9696/%s/sfc/"
+ "port_pair_groups/%s" % (self.neutron_hostname,
+ self.osver,
+ self.port_grp_id[p]))
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.delete(url, headers=headers)
def deletePortPair(self):
"""Deletion of Portpair."""
for p in range(1, 2):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pairs/' + self.port_pair_id[0]
+ url = ("http://%s:9696/%s/sfc/"
+ "port_pairs/%s" % (self.neutron_hostname,
+ self.osver,
+ self.port_pair_id[0]))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
"""Cleanup."""
self.logger.info("Deleting VMs")
for y in range(0, 3):
- url = 'http://' + self.nova_hostname + \
- ':8774/v2.1/servers/' + self.vm[y]
+ url = ("http://%s:8774/v2.1/"
+ "/servers/%s" % (self.nova_hostname,
+ self.vm[y]))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
return(response.status_code)
self.logger.info("Deleting Ports")
for x in range(self.i, self.numTerms):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/ports/' + self.port_num[x]
+ url = ('http://' + self.neutron_hostname + ':9696/' +
+ self.osver + '/ports/' + self.port_num[x])
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.delete(url, headers=headers)
Dicdata['external_gateway_info'] = {}
Dicdata = {'router': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/routers/' + self.router_id
+ url = ("http://%s:9696/%s/"
+ "/routers/%s" % (self.neutron_hostname,
+ self.osver,
+ self.router_id))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.put(url, headers=headers, data=data)
if self.subnetId != '':
Dicdata1['subnet_id'] = self.subnetId
data = json.dumps(Dicdata1, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/routers/' + self.router_id + \
- '/remove_router_interface.json'
+ url = ("http://%s:9696/%s/routers/%s"
+ "/remove_router_interface.json" % (self.neutron_hostname,
+ self.osver,
+ self.router_id))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.put(url, headers=headers, data=data)
if (response.status_code == OK):
- url = ('http://' + self.neutron_hostname + ':9696/' +
- self.osver + '/routers/' + self.router_id)
+ url = ("http://%s:9696/%s/"
+ "routers/%s" % (self.neutron_hostname,
+ self.osver,
+ self.router_id))
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.delete(url, headers=headers)
return(response.status_code)
self.logger.info("Deleting Network")
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/networks/' + self.net_id
+ url = "http://%s:9696/%s/networks/%s" % (self.neutron_hostname,
+ self.osver,
+ self.net_id)
+
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
self.logger.info("Deleting Floating ip")
for ip_num in range(0, 2):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/floatingips/' + self.vm_public_id[ip_num]
+ url = ("http://%s:9696/%s/floatingips"
+ "/%s" % (self.neutron_hostname,
+ self.osver,
+ self.vm_public_id[ip_num]))
+
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.delete(url, headers=headers)