self.ip_pool = 0
self.vm_public_ip = []
self.vm_public_id = []
- self.cirros_username = CONST.openstack_image_username
- self.cirros_password = CONST.openstack_image_password
+ self.cirros_username = CONST.__getattribute__(
+ 'openstack_image_username')
+ self.cirros_password = CONST.__getattribute__(
+ 'openstack_image_password')
self.net_id1 = 0
self.vm = []
self.address = 0
data = ('{"auth": {"tenantName": "admin", "passwordCredentials":'
'{ "username": "admin", "password": "console"}}}')
headers = {"Accept": "application/json"}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == OK):
json1_data = json.loads(response.content)
self.logger.debug(response.status_code)
if self.admin_state_up != '':
Dicdata['admin_state_up'] = self.admin_state_up
Dicdata = {'network': Dicdata}
- data = json.dumps(Dicdata, indent=4)
+ data = json.dumps(Dicdata, indent=4)
url = 'http://%s:9696/%s/networks' % (self.neutron_hostname,
self.osver)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
self.osver)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
self.logger.debug(response.status_code)
self.osver)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
self.logger.debug(response.status_code)
"""Creation of Instance, using firewall image."""
url = ("http://%s:9292/v2/images?"
"name=TestSfcVm" % (self.glance_hostname))
- headers = {"Accept": "application/json", "Content-Type": "application/\
- octet-stream", "X-Auth-Token": self.token_id}
+ headers = {"Accept": "application/json",
+ "Content-Type": "application/octet-stream",
+ "X-Auth-Token": self.token_id}
response = requests.get(url, headers=headers)
if (response.status_code == OK):
self.logger.debug(response.status_code)
self.tenant_id)
headers = {"Accept": "application/json", "Content-Type":
"application/json", "X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == ACCEPTED):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
for y in range(0, 3):
url = ("http://%s:8774/v2.1/servers/"
"detail?name=vm" + str(y)) % (self.neutron_hostname)
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
+ headers = {"Accept": "application/json",
+ "X-Auth-Token": self.token_id}
response = requests.get(url, headers=headers)
if (response.status_code == OK):
self.logger.debug(response.status_code)
self.osver)
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
info = ("Creation of Port Pair PP" + str(p) +
" is successful")
self.osver))
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
info = ("Creation of Port Group PG" + str(p) +
"is successful")
self.osver))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
json1_data = json.loads(response.content)
self.flow_class_if = json1_data['flow_classifier']['id']
headers = {"Accept": "application/json",
"Content-Type": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
self.logger.debug("Creation of PORT CHAIN is successful")
json1_data = json.loads(response.content)
time.sleep(5)
response = requests.get('http://' + self.onos_hostname +
':8181/onos/v1/flows',
- auth=("karaf", "karaf"))
+ auth=("karaf", "karaf"))
if (response.status_code == OK):
self.logger.debug("Flow is successfully Queries")
json1_data = json.loads(response.content)
self.osver)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
self.router_id))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.put(url, headers=headers, data=data)
+ response = requests.put(url, headers=headers, data=data)
if (response.status_code == OK):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
self.router_id)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.put(url, headers=headers, data=data)
+ response = requests.put(url, headers=headers, data=data)
if (response.status_code == OK):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
"os-floating-ips" % (self.nova_hostname))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == OK):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if(response.status_code == ACCEPTED):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
s = pxssh.pxssh()
hostname = self.vm_public_ip[0]
- s.login(hostname, self.cirros_username, self.cirros_password)
+ s.login(hostname, self.cirros_username, self.cirros_password)
s.sendline("ping -c 5 " + str(self.port_ip[2]))
s.prompt() # match the prompt
- ping_re = re.search("transmitted.*received", s.before).group()
- x = re.split('\s+', ping_re)
+ ping_re = re.search("transmitted.*received", s.before).group()
+ x = re.split('\s+', ping_re)
if (x[1] >= "1"):
self.logger.info("Ping is Successfull")
else:
def vm1(queue1):
s = pxssh.pxssh()
hostname = self.vm_public_ip[1]
- s.login(hostname, self.cirros_username, self.cirros_password)
+ s.login(hostname, self.cirros_username, self.cirros_password)
s.sendline('sudo ./firewall')
s.prompt()
output_pack = s.before
if result0 == 0 and result1 == 0:
time.sleep(300)
queue1 = Queue()
- p1 = Process(target=vm1, args=(queue1, ))
+ p1 = Process(target=vm1, args=(queue1, ))
p1.start()
p2 = Process(target=vm0)
p2.start()
"""Check the PC SF Map Stats in the ONOS."""
response = requests.get('http://' + self.onos_hostname +
':8181/onos/vtn/portChainSfMap/' +
- self.PC_id, auth=("karaf", "karaf"))
+ self.PC_id, auth=("karaf", "karaf"))
if (response.status_code == OK):
self.logger.info("portChainSfMap is successfully Queries")
return(response.status_code)
def deletePortPair(self):
"""Deletion of Portpair."""
- for p in range(1, 2):
+ for p in range(1, 2):
url = ("http://%s:9696/%s/sfc/"
"port_pairs/%s" % (self.neutron_hostname,
self.osver,
self.router_id))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.put(url, headers=headers, data=data)
+ response = requests.put(url, headers=headers, data=data)
if (response.status_code == OK):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
self.router_id))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.put(url, headers=headers, data=data)
+ response = requests.put(url, headers=headers, data=data)
if (response.status_code == OK):
url = ("http://%s:9696/%s/"
"routers/%s" % (self.neutron_hostname,
self.osver,
self.router_id))
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
+ headers = {"Accept": "application/json",
+ "X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
if (response.status_code == NO_CONTENT):
self.logger.debug(response.status_code)