"""
if self.network:
neutron_utils.delete_network(self.neutron, self.network)
- validate_network(self.neutron, self.network.name, False)
def test_create_network(self):
"""
Cleans the remote OpenStack objects
"""
if self.subnet:
- neutron_utils.delete_subnet(self.neutron, self.subnet)
- validate_subnet(self.neutron, self.subnet.name,
- self.net_config.network_settings.subnet_settings[
- 0].cidr, False)
-
+ try:
+ neutron_utils.delete_subnet(self.neutron, self.subnet)
+ except:
+ pass
if self.network:
- neutron_utils.delete_network(self.neutron, self.network)
- validate_network(self.neutron, self.network.name, False)
+ try:
+ neutron_utils.delete_network(self.neutron, self.network)
+ except:
+ pass
def test_create_subnet(self):
"""
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting, self.os_creds, network=self.network)
- validate_subnet(
- self.neutron, subnet_setting.name, subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
+
+ subnet_query1 = neutron_utils.get_subnet(
+ self.neutron, subnet_name=subnet_setting.name)
+ self.assertEqual(self.subnet, subnet_query1)
+
+ subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
+ self.network)
+ self.assertIsNotNone(subnet_query2)
+ self.assertEqual(1, len(subnet_query2))
+ self.assertEqual(self.subnet, subnet_query2[0])
def test_create_subnet_null_name(self):
"""
self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
- neutron_utils.create_subnet(
+ self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting, self.os_creds, network=self.network)
- validate_subnet(self.neutron, '', subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
+ self.assertFalse(validate_subnet(
+ self.neutron, '', subnet_setting.cidr, True))
+
+ subnet_query1 = neutron_utils.get_subnet(
+ self.neutron, subnet_name=subnet_setting.name)
+ self.assertEqual(self.subnet, subnet_query1)
+
+ subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
+ self.network)
+ self.assertIsNotNone(subnet_query2)
+ self.assertEqual(1, len(subnet_query2))
+ self.assertEqual(self.subnet, subnet_query2[0])
def test_create_subnet_null_cidr(self):
"""
self.subnet)
if self.router:
- neutron_utils.delete_router(self.neutron, self.router)
- validate_router(self.neutron, self.router.name, False)
+ try:
+ neutron_utils.delete_router(self.neutron, self.router)
+ validate_router(self.neutron, self.router.name, False)
+ except:
+ pass
if self.port:
- neutron_utils.delete_port(self.neutron, self.port)
+ try:
+ neutron_utils.delete_port(self.neutron, self.port)
+ except:
+ pass
if self.subnet:
- neutron_utils.delete_subnet(self.neutron, self.subnet)
- validate_subnet(
- self.neutron, self.subnet.name,
- self.net_config.network_settings.subnet_settings[0].cidr,
- False)
+ try:
+ neutron_utils.delete_subnet(self.neutron, self.subnet)
+ except:
+ pass
if self.network:
- neutron_utils.delete_network(self.neutron, self.network)
- validate_network(self.neutron, self.network.name, False)
+ try:
+ neutron_utils.delete_network(self.neutron, self.network)
+ except:
+ pass
def test_create_router_simple(self):
"""
validate_router(self.neutron, self.net_config.router_settings.name,
True)
- ext_net = neutron_utils.get_network(self.neutron, self.ext_net_name)
+ ext_net = neutron_utils.get_network(
+ self.neutron, network_name=self.ext_net_name)
self.assertEqual(
self.router.external_gateway_info['network_id'], ext_net.id)
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting,
self.os_creds, self.network)
- validate_subnet(
- self.neutron,
- subnet_setting.name,
- subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
self.router = neutron_utils.create_router(
self.neutron, self.os_creds, self.net_config.router_settings)
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting,
self.os_creds, self.network)
- validate_subnet(
- self.neutron, subnet_setting.name, subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
with self.assertRaises(NeutronException):
self.interface_router = neutron_utils.add_interface_router(
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting, self.os_creds, self.network)
- validate_subnet(self.neutron, subnet_setting.name,
- subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
self.port = neutron_utils.create_port(
self.neutron, self.os_creds, PortSettings(
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting, self.os_creds, self.network)
- validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr,
- True)
+ self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
+ subnet_setting.cidr, True))
self.port = neutron_utils.create_port(
self.neutron, self.os_creds, PortSettings(
def test_create_port_null_name(self):
"""
- Tests the neutron_utils.create_port() function for an Exception when
- the port name value is None
+ Tests the neutron_utils.create_port() when the port name value is None
"""
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting,
self.os_creds, self.network)
- validate_subnet(
- self.neutron,
- subnet_setting.name,
- subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
- with self.assertRaises(Exception):
- self.port = neutron_utils.create_port(
- self.neutron, self.os_creds,
- PortSettings(
- network_name=self.net_config.network_settings.name,
- ip_addrs=[{
- 'subnet_name': subnet_setting.name,
- 'ip': ip_1}]))
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds,
+ PortSettings(
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': ip_1}]))
+
+ port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
+ self.assertEqual(self.port, port)
def test_create_port_null_network_object(self):
"""
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting,
self.os_creds, self.network)
- validate_subnet(
- self.neutron,
- subnet_setting.name,
- subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
with self.assertRaises(Exception):
self.port = neutron_utils.create_port(
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting, self.os_creds, self.network)
- validate_subnet(self.neutron,
- subnet_setting.name,
- subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
with self.assertRaises(Exception):
self.port = neutron_utils.create_port(
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.subnet = neutron_utils.create_subnet(
self.neutron, subnet_setting, self.os_creds, self.network)
- validate_subnet(
- self.neutron, subnet_setting.name, subnet_setting.cidr, True)
+ self.assertTrue(validate_subnet(
+ self.neutron, subnet_setting.name, subnet_setting.cidr, True))
with self.assertRaises(Exception):
self.port = neutron_utils.create_port(
self.assertTrue(sec_grp_settings.name, security_group.name)
- sec_grp_get = neutron_utils.get_security_group(self.neutron,
- sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp_get)
self.assertTrue(validation_utils.objects_equivalent(
security_group, sec_grp_get))
neutron_utils.delete_security_group(self.neutron, security_group)
- sec_grp_get = neutron_utils.get_security_group(self.neutron,
- sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNone(sec_grp_get)
def test_create_sec_grp_no_name(self):
self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
- sec_grp_get = neutron_utils.get_security_group(self.neutron,
- sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp_get)
self.assertEqual(self.security_groups[0], sec_grp_get)
# Refresh object so it is populated with the newly added rule
security_group = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings.name)
+ self.neutron, sec_grp_settings=sec_grp_settings)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
security_group)
self.assertTrue(sec_grp_settings.name, security_group.name)
- sec_grp_get = neutron_utils.get_security_group(self.neutron,
- sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp_get)
self.assertEqual(security_group, sec_grp_get)
Cleans the image and downloaded image file
"""
if self.floating_ip:
- neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
+ try:
+ neutron_utils.delete_floating_ip(
+ self.neutron, self.floating_ip)
+ except:
+ pass
def test_floating_ips(self):
"""
:param exists: Whether or not the network name should exist or not
:return: True/False
"""
- network = neutron_utils.get_network(neutron, name)
+ network = neutron_utils.get_network(neutron, network_name=name)
if exists and network:
return True
if not exists and not network:
:param exists: Whether or not the network name should exist or not
:return: True/False
"""
- subnet = neutron_utils.get_subnet_by_name(neutron, name)
- if exists and subnet:
+ subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
+ if exists and subnet and subnet.name == name:
return subnet.cidr == cidr
if not exists and not subnet:
return True
:param exists: Whether or not the network name should exist or not
:return: True/False
"""
- router = neutron_utils.get_router_by_name(neutron, name)
+ router = neutron_utils.get_router(neutron, router_name=name)
if exists and router:
return True
return False