from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import keystone_utils
from snaps.openstack.utils import neutron_utils
+from snaps.openstack.utils.neutron_utils import NeutronException
__author__ = 'spisarski'
ext_networks = neutron_utils.get_external_networks(neutron)
found = False
for network in ext_networks:
- if network['network']['name'] == self.ext_net_name:
+ if network.name == self.ext_net_name:
found = True
break
self.assertTrue(found)
"""
if self.network:
neutron_utils.delete_network(self.neutron, self.network)
- validate_network(self.neutron, self.network['network']['name'],
- False)
def test_create_network(self):
"""
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
def test_create_network_empty_name(self):
"""
Cleans the remote OpenStack objects
"""
if self.subnet:
- neutron_utils.delete_subnet(self.neutron, self.subnet)
- validate_subnet(self.neutron, self.subnet.get('name'),
- self.net_config.network_settings.subnet_settings[
- 0].cidr, False)
-
+ try:
+ neutron_utils.delete_subnet(self.neutron, self.subnet)
+ except:
+ pass
if self.network:
- neutron_utils.delete_network(self.neutron, self.network)
- validate_network(self.neutron, self.network['network']['name'],
- False)
+ try:
+ neutron_utils.delete_network(self.neutron, self.network)
+ except:
+ pass
def test_create_subnet(self):
"""
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
- self.neutron, subnet_setting,
- self.os_creds, network=self.network)
- validate_subnet(
- self.neutron,
- subnet_setting.name,
- subnet_setting.cidr, True)
+ self.neutron, subnet_setting, self.os_creds, network=self.network)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
+
+ subnet_query1 = neutron_utils.get_subnet(
+ self.neutron, subnet_name=subnet_setting.name)
+ self.assertEqual(self.subnet, subnet_query1)
+
+ subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
+ self.network)
+ self.assertIsNotNone(subnet_query2)
+ self.assertEqual(1, len(subnet_query2))
+ self.assertEqual(self.subnet, subnet_query2[0])
def test_create_subnet_null_name(self):
"""
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
with self.assertRaises(Exception):
SubnetSettings(cidr=self.net_config.subnet_cidr)
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
- neutron_utils.create_subnet(
- self.neutron, subnet_setting,
- self.os_creds, network=self.network)
- validate_subnet(self.neutron, '',
- subnet_setting.cidr, True)
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting, self.os_creds, network=self.network)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
+ self.assertFalse(validate_subnet(
+ self.neutron, '', subnet_setting.cidr, True))
+
+ subnet_query1 = neutron_utils.get_subnet(
+ self.neutron, subnet_name=subnet_setting.name)
+ self.assertEqual(self.subnet, subnet_query1)
+
+ subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
+ self.network)
+ self.assertIsNotNone(subnet_query2)
+ self.assertEqual(1, len(subnet_query2))
+ self.assertEqual(self.subnet, subnet_query2[0])
def test_create_subnet_null_cidr(self):
"""
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
with self.assertRaises(Exception):
- sub_sets = SubnetSettings(cidr=None,
- name=self.net_config.subnet_name)
- neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
- network=self.network)
+ sub_sets = SubnetSettings(
+ cidr=None, name=self.net_config.subnet_name)
+ neutron_utils.create_subnet(
+ self.neutron, sub_sets, self.os_creds, network=self.network)
def test_create_subnet_empty_cidr(self):
"""
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
with self.assertRaises(Exception):
- sub_sets = SubnetSettings(cidr='',
- name=self.net_config.subnet_name)
+ sub_sets = SubnetSettings(
+ cidr='', name=self.net_config.subnet_name)
neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
network=self.network)
self.subnet)
if self.router:
- neutron_utils.delete_router(self.neutron, self.router)
- validate_router(self.neutron, self.router.get('name'), False)
+ try:
+ neutron_utils.delete_router(self.neutron, self.router)
+ validate_router(self.neutron, self.router.name, False)
+ except:
+ pass
if self.port:
- neutron_utils.delete_port(self.neutron, self.port)
+ try:
+ neutron_utils.delete_port(self.neutron, self.port)
+ except:
+ pass
if self.subnet:
- neutron_utils.delete_subnet(self.neutron, self.subnet)
- validate_subnet(self.neutron, self.subnet.get('name'),
- self.net_config.network_settings.subnet_settings[
- 0].cidr, False)
+ try:
+ neutron_utils.delete_subnet(self.neutron, self.subnet)
+ except:
+ pass
if self.network:
- neutron_utils.delete_network(self.neutron, self.network)
- validate_network(self.neutron, self.network['network']['name'],
- False)
+ try:
+ neutron_utils.delete_network(self.neutron, self.network)
+ except:
+ pass
def test_create_router_simple(self):
"""
self.neutron, self.os_creds, self.net_config.router_settings)
validate_router(self.neutron, self.net_config.router_settings.name,
True)
- # TODO - Add validation that the router gatway has been set
+
+ ext_net = neutron_utils.get_network(
+ self.neutron, network_name=self.ext_net_name)
+ self.assertEqual(
+ self.router.external_gateway_info['network_id'], ext_net.id)
def test_create_router_empty_name(self):
"""
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting,
self.os_creds, self.network)
- validate_subnet(
- self.neutron,
- subnet_setting.name,
- subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
self.router = neutron_utils.create_router(
self.neutron, self.os_creds, self.net_config.router_settings)
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting,
self.os_creds, self.network)
- validate_subnet(
- self.neutron,
- subnet_setting.name,
- subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
- with self.assertRaises(Exception):
+ with self.assertRaises(NeutronException):
self.interface_router = neutron_utils.add_interface_router(
self.neutron, self.router, self.subnet)
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
self.router = neutron_utils.create_router(
self.neutron, self.os_creds, self.net_config.router_settings)
validate_router(self.neutron, self.net_config.router_settings.name,
True)
- with self.assertRaises(Exception):
+ with self.assertRaises(NeutronException):
self.interface_router = neutron_utils.add_interface_router(
self.neutron, self.router, self.subnet)
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting, self.os_creds, self.network)
- validate_subnet(self.neutron, subnet_setting.name,
- subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
self.port = neutron_utils.create_port(
self.neutron, self.os_creds, PortSettings(
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting, self.os_creds, self.network)
- validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr,
- True)
+ self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
+ subnet_setting.cidr, True))
self.port = neutron_utils.create_port(
self.neutron, self.os_creds, PortSettings(
def test_create_port_null_name(self):
"""
- Tests the neutron_utils.create_port() function for an Exception when
- the port name value is None
+ Tests the neutron_utils.create_port() when the port name value is None
"""
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting,
self.os_creds, self.network)
- validate_subnet(
- self.neutron,
- subnet_setting.name,
- subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
- with self.assertRaises(Exception):
- self.port = neutron_utils.create_port(
- self.neutron, self.os_creds,
- PortSettings(
- network_name=self.net_config.network_settings.name,
- ip_addrs=[{
- 'subnet_name': subnet_setting.name,
- 'ip': ip_1}]))
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds,
+ PortSettings(
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': ip_1}]))
+
+ port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
+ self.assertEqual(self.port, port)
def test_create_port_null_network_object(self):
"""
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting,
self.os_creds, self.network)
- validate_subnet(
- self.neutron,
- subnet_setting.name,
- subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
with self.assertRaises(Exception):
self.port = neutron_utils.create_port(
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
- self.neutron,
- subnet_setting,
- self.os_creds, self.network)
- validate_subnet(self.neutron,
- subnet_setting.name,
- subnet_setting.cidr, True)
+ self.neutron, subnet_setting, self.os_creds, self.network)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
with self.assertRaises(Exception):
self.port = neutron_utils.create_port(
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.assertEqual(self.net_config.network_settings.name,
- self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron,
- self.net_config.network_settings.name,
- True))
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
- self.neutron,
- subnet_setting,
- self.os_creds, self.network)
- validate_subnet(self.neutron,
- subnet_setting.name,
- subnet_setting.cidr, True)
+ self.neutron, subnet_setting, self.os_creds, self.network)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
with self.assertRaises(Exception):
self.port = neutron_utils.create_port(
self.keystone,
sec_grp_settings)
- self.assertTrue(sec_grp_settings.name,
- security_group['security_group']['name'])
+ self.assertTrue(sec_grp_settings.name, security_group.name)
- sec_grp_get = neutron_utils.get_security_group(self.neutron,
- sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp_get)
self.assertTrue(validation_utils.objects_equivalent(
- security_group['security_group'], sec_grp_get['security_group']))
+ security_group, sec_grp_get))
neutron_utils.delete_security_group(self.neutron, security_group)
- sec_grp_get = neutron_utils.get_security_group(self.neutron,
- sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNone(sec_grp_get)
def test_create_sec_grp_no_name(self):
neutron_utils.create_security_group(self.neutron, self.keystone,
sec_grp_settings))
- self.assertTrue(sec_grp_settings.name,
- self.security_groups[0]['security_group']['name'])
- self.assertTrue(sec_grp_settings.description,
- self.security_groups[0]['security_group'][
- 'description'])
+ self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
+ self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
- sec_grp_get = neutron_utils.get_security_group(self.neutron,
- sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp_get)
- self.assertTrue(validation_utils.objects_equivalent(
- self.security_groups[0]['security_group'],
- sec_grp_get['security_group']))
+ self.assertEqual(self.security_groups[0], sec_grp_get)
def test_create_sec_grp_one_rule(self):
"""
# Refresh object so it is populated with the newly added rule
security_group = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings.name)
+ self.neutron, sec_grp_settings=sec_grp_settings)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
security_group)
self.assertTrue(
- validation_utils.objects_equivalent(self.security_group_rules,
- rules))
+ validation_utils.objects_equivalent(
+ self.security_group_rules, rules))
- self.assertTrue(sec_grp_settings.name,
- security_group['security_group']['name'])
- self.assertTrue(sec_grp_settings.description,
- security_group['security_group']['description'])
+ self.assertTrue(sec_grp_settings.name, security_group.name)
- sec_grp_get = neutron_utils.get_security_group(self.neutron,
- sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp_get)
- self.assertTrue(validation_utils.objects_equivalent(
- security_group['security_group'], sec_grp_get['security_group']))
+ self.assertEqual(security_group, sec_grp_get)
def test_get_sec_grp_by_id(self):
"""
description='hello group')))
sec_grp_1b = neutron_utils.get_security_group_by_id(
- self.neutron, self.security_groups[0]['security_group']['id'])
+ self.neutron, self.security_groups[0].id)
sec_grp_2b = neutron_utils.get_security_group_by_id(
- self.neutron, self.security_groups[1]['security_group']['id'])
+ self.neutron, self.security_groups[1].id)
- self.assertEqual(self.security_groups[0]['security_group']['id'],
- sec_grp_1b['security_group']['id'])
- self.assertEqual(self.security_groups[1]['security_group']['id'],
- sec_grp_2b['security_group']['id'])
+ self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
+ self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
class NeutronUtilsFloatingIpTests(OSComponentTestCase):
Cleans the image and downloaded image file
"""
if self.floating_ip:
- neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
+ try:
+ neutron_utils.delete_floating_ip(
+ self.neutron, self.floating_ip)
+ except:
+ pass
def test_floating_ips(self):
"""
:param exists: Whether or not the network name should exist or not
:return: True/False
"""
- network = neutron_utils.get_network(neutron, name)
+ network = neutron_utils.get_network(neutron, network_name=name)
if exists and network:
return True
if not exists and not network:
:param exists: Whether or not the network name should exist or not
:return: True/False
"""
- subnet = neutron_utils.get_subnet_by_name(neutron, name)
- if exists and subnet:
- return subnet.get('cidr') == cidr
+ subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
+ if exists and subnet and subnet.name == name:
+ return subnet.cidr == cidr
if not exists and not subnet:
return True
return False
:param exists: Whether or not the network name should exist or not
:return: True/False
"""
- router = neutron_utils.get_router_by_name(neutron, name)
+ router = neutron_utils.get_router(neutron, router_name=name)
if exists and router:
return True
return False
"""
Returns true if the router ID & subnet ID have been properly included into
the interface router object
- :param interface_router: the object to validate
+ :param interface_router: the SNAPS-OO InterfaceRouter domain object
:param router: to validate against the interface_router
:param subnet: to validate against the interface_router
:return: True if both IDs match else False
"""
- subnet_id = interface_router.get('subnet_id')
- router_id = interface_router.get('port_id')
+ subnet_id = interface_router.subnet_id
+ router_id = interface_router.port_id
- return subnet.get('id') == subnet_id and router.get('id') == router_id
+ return subnet.id == subnet_id and router.id == router_id
def validate_port(neutron, port_obj, this_port_name):
:param this_port_name: The expected router name
:return: True/False
"""
- ports = neutron.list_ports()
- for port, port_insts in ports.items():
- for inst in port_insts:
- if inst['id'] == port_obj['port']['id']:
- return inst['name'] == this_port_name
+ os_ports = neutron.list_ports()
+ for os_port, os_port_insts in os_ports.items():
+ for os_inst in os_port_insts:
+ if os_inst['id'] == port_obj.id:
+ return os_inst['name'] == this_port_name
return False