-import os
+import logging
+import os
import re
import time
import json
from multiprocessing import Queue
from pexpect import pxssh
-import functest.utils.functest_logger as ft_logger
+from functest.utils.constants import CONST
OK = 200
CREATED = 201
NO_CONTENT = 204
-class SfcOnos:
+class SfcOnos(object):
"""Defines all the def function of SFC."""
def __init__(self):
"""Initialization of variables."""
- self.logger = ft_logger.Logger("sfc_fun").getLogger()
+ self.logger = logging.getLogger(__name__)
self.osver = "v2.0"
self.token_id = 0
self.net_id = 0
self.container_format = "bare"
self.disk_format = "qcow2"
self.imagename = "TestSfcVm"
- self.createImage = "/home/root1/devstack/files/images/\
- firewall_block_image.img"
+ self.createImage = ("/home/root1/devstack/files/images/"
+ "firewall_block_image.img")
self.vm_name = "vm"
self.imageRef = "test"
self.ip_pool = 0
self.vm_public_ip = []
self.vm_public_id = []
+ self.cirros_username = CONST.__getattribute__(
+ 'openstack_image_username')
+ self.cirros_password = CONST.__getattribute__(
+ 'openstack_image_password')
self.net_id1 = 0
self.vm = []
self.address = 0
def getToken(self):
"""Get the keystone token value from Openstack ."""
- url = 'http://' + self.keystone_hostname + \
- ':5000/' + self.osver + '/tokens'
- data = '{"auth": {"tenantName": "admin", "passwordCredentials":\
- { "username": "admin", "password": "console"}}}'
+ url = 'http://%s:5000/%s/tokens' % (self.keystone_hostname,
+ self.osver)
+ data = ('{"auth": {"tenantName": "admin", "passwordCredentials":'
+ '{ "username": "admin", "password": "console"}}}')
headers = {"Accept": "application/json"}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == OK):
json1_data = json.loads(response.content)
self.logger.debug(response.status_code)
if self.admin_state_up != '':
Dicdata['admin_state_up'] = self.admin_state_up
Dicdata = {'network': Dicdata}
- data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + \
- ':9696/' + self.osver + '/networks'
+ data = json.dumps(Dicdata, indent=4)
+ url = 'http://%s:9696/%s/networks' % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
Dicdata = {'subnet': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + \
- ':9696/' + self.osver + '/subnets'
+ url = 'http://%s:9696/%s/subnets' % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
self.logger.debug(response.status_code)
Dicdata = {'port': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + \
- ':9696/' + self.osver + '/ports'
+ url = 'http://%s:9696/%s/ports' % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
self.logger.debug(response.status_code)
def createVm(self):
"""Creation of Instance, using firewall image."""
- url = 'http://' + self.glance_hostname + \
- ':9292/v2/images?name=TestSfcVm'
- headers = {"Accept": "application/json", "Content-Type": "application/\
- octet-stream", "X-Auth-Token": self.token_id}
+ url = ("http://%s:9292/v2/images?"
+ "name=TestSfcVm" % (self.glance_hostname))
+ headers = {"Accept": "application/json",
+ "Content-Type": "application/octet-stream",
+ "X-Auth-Token": self.token_id}
response = requests.get(url, headers=headers)
if (response.status_code == OK):
self.logger.debug(response.status_code)
else:
return(response.status_code)
- url = 'http://' + self.nova_hostname + \
- ':8774/v2.1/' + self.tenant_id + '/flavors?name=m1.tiny'
+ url = ("http://%s:8774/v2.1/%s/flavors?"
+ "name=m1.tiny" % (self.nova_hostname, self.tenant_id))
headers = {"Accept": "application/json", "Content-Type":
"application/json", "X-Auth-Token": self.token_id}
response = requests.get(url, headers=headers)
Dicdata['networks'] = org_nw_port
Dicdata = {'server': Dicdata}
data = json.dumps(Dicdata, indent=4)
-
- url = ('http://' + self.nova_hostname + ':8774/v2.1/' +
- self.tenant_id + '/servers')
+ url = 'http://%s:8774/v2.1/%s/servers' % (self.nova_hostname,
+ self.tenant_id)
headers = {"Accept": "application/json", "Content-Type":
"application/json", "X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == ACCEPTED):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
"""Checking the Status of the Instance."""
time.sleep(10)
for y in range(0, 3):
- url = 'http://' + \
- self.nova_hostname + \
- ':8774/v2.1/servers/detail?name=vm' + str(y)
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
+ url = ("http://%s:8774/v2.1/servers/"
+ "detail?name=vm" + str(y)) % (self.neutron_hostname)
+ headers = {"Accept": "application/json",
+ "X-Auth-Token": self.token_id}
response = requests.get(url, headers=headers)
if (response.status_code == OK):
self.logger.debug(response.status_code)
self.logger.debug(json1_data)
self.vm_active = json1_data['servers'][0]['status']
if (self.vm_active == "ACTIVE"):
- info = "VM" + str(y) + \
- " is Active : " + self.vm_active
+ info = "VM" + str(y) + " is Active : " + self.vm_active
else:
- info = "VM" + str(y) + " is NOT Active : " + \
- self.vm_active
+ info = "VM" + str(y) + " is NOT Active : " + self.vm_active
self.logger.debug(info)
else:
return(response.status_code)
Dicdata = {'port_pair': Dicdata}
data = json.dumps(Dicdata, indent=4)
-
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pairs'
+ url = 'http://%s:9696/%s/sfc/port_pairs' % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
- info = "Creation of Port Pair PP" + str(p) + \
- " is successful"
+ info = ("Creation of Port Pair PP" + str(p) +
+ " is successful")
self.logger.debug(info)
else:
return(response.status_code)
def getPortPair(self):
"""Query the Portpair id value."""
for p in range(0, 1):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pairs?name=PP1'
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
+ url = ("http://%s:9696/%s/"
+ "sfc/port_pairs?name=PP1" % (self.neutron_hostname,
+ self.osver))
+ headers = {"Accept": "application/json",
+ "X-Auth-Token": self.token_id}
response = requests.get(url, headers=headers)
if (response.status_code == OK):
Dicdata = {'port_pair_group': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pair_groups'
+ url = ("http://%s:9696/%s/"
+ "sfc/port_pair_groups" % (self.neutron_hostname,
+ self.osver))
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
- info = "Creation of Port Group PG" + str(p) + \
- "is successful"
+ info = ("Creation of Port Group PG" + str(p) +
+ "is successful")
self.logger.debug(info)
else:
return(response.status_code)
def getPortGroup(self):
"""Query the PortGroup id."""
for p in range(0, 1):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pair_groups?name=PG' + str(p)
+ url = ("http://%s:9696/%s/sfc/port_pair_groups"
+ "?name=PG" + str(p)) % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.get(url, headers=headers)
Dicdata = {'flow_classifier': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/flow_classifiers'
+ url = ("http://%s:9696/%s/"
+ "sfc/flow_classifiers" % (self.neutron_hostname,
+ self.osver))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
json1_data = json.loads(response.content)
self.flow_class_if = json1_data['flow_classifier']['id']
Dicdata = {'port_chain': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_chains'
+ url = 'http://%s:9696/%s/sfc/port_chains' % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json",
"Content-Type": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
self.logger.debug("Creation of PORT CHAIN is successful")
json1_data = json.loads(response.content)
time.sleep(5)
response = requests.get('http://' + self.onos_hostname +
':8181/onos/v1/flows',
- auth=("karaf", "karaf"))
+ auth=("karaf", "karaf"))
if (response.status_code == OK):
self.logger.debug("Flow is successfully Queries")
json1_data = json.loads(response.content)
Dicdata = {'router': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + \
- self.osver + '/routers.json'
+ url = 'http://%s:9696/%s/routers.json' % (self.neutron_hostname,
+ self.osver)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == CREATED):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
def attachInterface(self):
"""Attachment of instance ports to the Router."""
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/networks?name=admin_floating_net'
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
+ url = ("http://%s:9696/%s/networks"
+ "?name=admin_floating_net" % (self.neutron_hostname,
+ self.osver))
+ headers = {"Accept": "application/json", "X-Auth-Token": self.token_id}
response = requests.get(url, headers=headers)
if (response.status_code == OK):
self.logger.debug(response.status_code)
Dicdata['subnet_id'] = self.subnetId
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/routers/' + self.router_id + '/add_router_interface'
+ url = ("http://%s:9696/%s/routers"
+ "/%s/add_router_interface" % (self.neutron_hostname,
+ self.osver,
+ self.router_id))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.put(url, headers=headers, data=data)
+ response = requests.put(url, headers=headers, data=data)
if (response.status_code == OK):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
Dicdata1 = {'external_gateway_info': Dicdata1}
Dicdata1 = {'router': Dicdata1}
data = json.dumps(Dicdata1, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/routers/' + self.router_id
+ url = 'http://%s:9696/%s/routers/%s' % (self.neutron_hostname,
+ self.osver,
+ self.router_id)
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.put(url, headers=headers, data=data)
+ response = requests.put(url, headers=headers, data=data)
if (response.status_code == OK):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
Dicdata['pool'] = "admin_floating_net"
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.nova_hostname + ':8774/v2.1/os-floating-ips'
+ url = ("http://%s:8774/v2.1/"
+ "os-floating-ips" % (self.nova_hostname))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if (response.status_code == OK):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
Dicdata1 = {'addFloatingIp': Dicdata1}
data = json.dumps(Dicdata1, indent=4)
- url = 'http://' + self.nova_hostname + ':8774/v2.1/servers/' + \
- self.vm[ip_num] + '/action'
+ url = ("http://%s:8774/v2.1/"
+ "servers/%s/action" % (self.nova_hostname,
+ self.vm[ip_num]))
+
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
+ response = requests.post(url, headers=headers, data=data)
if(response.status_code == ACCEPTED):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
s = pxssh.pxssh()
hostname = self.vm_public_ip[0]
- username = "cirros"
- password = "cubswin:)"
- s.login(hostname, username, password)
+ s.login(hostname, self.cirros_username, self.cirros_password)
s.sendline("ping -c 5 " + str(self.port_ip[2]))
s.prompt() # match the prompt
- ping_re = re.search("transmitted.*received", s.before).group()
- x = re.split('\s+', ping_re)
+ ping_re = re.search("transmitted.*received", s.before).group()
+ x = re.split('\s+', ping_re)
if (x[1] >= "1"):
self.logger.info("Ping is Successfull")
else:
def vm1(queue1):
s = pxssh.pxssh()
hostname = self.vm_public_ip[1]
- username = "cirros"
- password = "cubswin:)"
- s.login(hostname, username, password)
+ s.login(hostname, self.cirros_username, self.cirros_password)
s.sendline('sudo ./firewall')
s.prompt()
output_pack = s.before
if result0 == 0 and result1 == 0:
time.sleep(300)
queue1 = Queue()
- p1 = Process(target=vm1, args=(queue1, ))
+ p1 = Process(target=vm1, args=(queue1, ))
p1.start()
p2 = Process(target=vm0)
p2.start()
"""Check the PC SF Map Stats in the ONOS."""
response = requests.get('http://' + self.onos_hostname +
':8181/onos/vtn/portChainSfMap/' +
- self.PC_id, auth=("karaf", "karaf"))
+ self.PC_id, auth=("karaf", "karaf"))
if (response.status_code == OK):
self.logger.info("portChainSfMap is successfully Queries")
return(response.status_code)
def deletePortChain(self):
"""Deletion of PortChain."""
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_chains/' + self.PC_id
+ url = ('http://' + self.neutron_hostname + ':9696/' +
+ self.osver + '/sfc/port_chains/' + self.PC_id)
headers = {"Accept": "application/json", "Content-Type":
"application/json", "X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
def deleteFlowClassifier(self):
"""Deletion of Flow Classifier."""
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/flow_classifiers/' + self.flow_class_if
+ url = ("http://%s:9696/%s/sfc/"
+ "flow_classifiers/%s" % (self.neutron_hostname,
+ self.osver,
+ self.flow_class_if))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
def deletePortGroup(self):
"""Deletion of PortGroup."""
for p in range(0, 1):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pair_groups/' + self.port_grp_id[p]
+ url = ("http://%s:9696/%s/sfc/"
+ "port_pair_groups/%s" % (self.neutron_hostname,
+ self.osver,
+ self.port_grp_id[p]))
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.delete(url, headers=headers)
def deletePortPair(self):
"""Deletion of Portpair."""
- for p in range(1, 2):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pairs/' + self.port_pair_id[0]
+ for p in range(1, 2):
+ url = ("http://%s:9696/%s/sfc/"
+ "port_pairs/%s" % (self.neutron_hostname,
+ self.osver,
+ self.port_pair_id[0]))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
"""Cleanup."""
self.logger.info("Deleting VMs")
for y in range(0, 3):
- url = 'http://' + self.nova_hostname + \
- ':8774/v2.1/servers/' + self.vm[y]
+ url = ("http://%s:8774/v2.1/"
+ "/servers/%s" % (self.nova_hostname,
+ self.vm[y]))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
return(response.status_code)
self.logger.info("Deleting Ports")
for x in range(self.i, self.numTerms):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/ports/' + self.port_num[x]
+ url = ('http://' + self.neutron_hostname + ':9696/' +
+ self.osver + '/ports/' + self.port_num[x])
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.delete(url, headers=headers)
Dicdata['external_gateway_info'] = {}
Dicdata = {'router': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/routers/' + self.router_id
+ url = ("http://%s:9696/%s/"
+ "/routers/%s" % (self.neutron_hostname,
+ self.osver,
+ self.router_id))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.put(url, headers=headers, data=data)
+ response = requests.put(url, headers=headers, data=data)
if (response.status_code == OK):
self.logger.debug(response.status_code)
self.logger.debug(response.content)
if self.subnetId != '':
Dicdata1['subnet_id'] = self.subnetId
data = json.dumps(Dicdata1, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/routers/' + self.router_id + \
- '/remove_router_interface.json'
+ url = ("http://%s:9696/%s/routers/%s"
+ "/remove_router_interface.json" % (self.neutron_hostname,
+ self.osver,
+ self.router_id))
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
- response = requests.put(url, headers=headers, data=data)
+ response = requests.put(url, headers=headers, data=data)
if (response.status_code == OK):
- url = ('http://' + self.neutron_hostname + ':9696/' +
- self.osver + '/routers/' + self.router_id)
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
+ url = ("http://%s:9696/%s/"
+ "routers/%s" % (self.neutron_hostname,
+ self.osver,
+ self.router_id))
+ headers = {"Accept": "application/json",
+ "X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
if (response.status_code == NO_CONTENT):
self.logger.debug(response.status_code)
return(response.status_code)
self.logger.info("Deleting Network")
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/networks/' + self.net_id
+ url = "http://%s:9696/%s/networks/%s" % (self.neutron_hostname,
+ self.osver,
+ self.net_id)
+
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
self.logger.info("Deleting Floating ip")
for ip_num in range(0, 2):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/floatingips/' + self.vm_public_id[ip_num]
+ url = ("http://%s:9696/%s/floatingips"
+ "/%s" % (self.neutron_hostname,
+ self.osver,
+ self.vm_public_id[ip_num]))
+
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.delete(url, headers=headers)