1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from neutronclient.common.exceptions import NotFound, BadRequest
19 from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
20 from snaps.config.security_group import (
21 SecurityGroupConfig, SecurityGroupRuleConfig, Direction)
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
29 __author__ = 'spisarski'
35 class NeutronSmokeTests(OSComponentTestCase):
37 Tests to ensure that the neutron client can communicate with the cloud
40 def test_neutron_connect_success(self):
42 Tests to ensure that the proper credentials can connect.
44 neutron = neutron_utils.neutron_client(self.os_creds)
46 networks = neutron.list_networks()
49 networks = networks.get('networks')
50 for network in networks:
51 if network.get('name') == self.ext_net_name:
53 self.assertTrue(found)
55 def test_neutron_connect_fail(self):
57 Tests to ensure that the improper credentials cannot connect.
59 from snaps.openstack.os_credentials import OSCreds
61 with self.assertRaises(Exception):
62 neutron = neutron_utils.neutron_client(
63 OSCreds(username='user', password='pass', auth_url='url',
64 project_name='project'))
65 neutron.list_networks()
67 def test_retrieve_ext_network_name(self):
69 Tests the neutron_utils.get_external_network_names to ensure the
70 configured self.ext_net_name is contained within the returned list
73 neutron = neutron_utils.neutron_client(self.os_creds)
74 ext_networks = neutron_utils.get_external_networks(neutron)
76 for network in ext_networks:
77 if network.name == self.ext_net_name:
80 self.assertTrue(found)
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
85 Test for creating networks via neutron_utils.py
89 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90 self.port_name = str(guid) + '-port'
91 self.neutron = neutron_utils.neutron_client(self.os_creds)
93 self.net_config = openstack_tests.get_pub_net_config(
94 net_name=guid + '-pub-net')
98 Cleans the remote OpenStack objects
101 neutron_utils.delete_network(self.neutron, self.network)
103 def test_create_network(self):
105 Tests the neutron_utils.create_network() function
107 self.network = neutron_utils.create_network(
108 self.neutron, self.os_creds, self.net_config.network_settings)
109 self.assertEqual(self.net_config.network_settings.name,
111 self.assertTrue(validate_network(
112 self.neutron, self.net_config.network_settings.name, True))
113 self.assertEqual(len(self.net_config.network_settings.subnet_settings),
114 len(self.network.subnets))
116 def test_create_network_empty_name(self):
118 Tests the neutron_utils.create_network() function with an empty
121 with self.assertRaises(Exception):
122 self.network = neutron_utils.create_network(
123 self.neutron, self.os_creds,
124 network_settings=NetworkConfig(name=''))
126 def test_create_network_null_name(self):
128 Tests the neutron_utils.create_network() function when the network
131 with self.assertRaises(Exception):
132 self.network = neutron_utils.create_network(
133 self.neutron, self.os_creds,
134 network_settings=NetworkConfig())
137 class NeutronUtilsSubnetTests(OSComponentTestCase):
139 Test for creating networks with subnets via neutron_utils.py
143 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
144 self.port_name = str(guid) + '-port'
145 self.neutron = neutron_utils.neutron_client(self.os_creds)
147 self.net_config = openstack_tests.get_pub_net_config(
148 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
149 external_net=self.ext_net_name)
153 Cleans the remote OpenStack objects
157 neutron_utils.delete_network(self.neutron, self.network)
161 def test_create_subnet(self):
163 Tests the neutron_utils.create_network() function
165 self.network = neutron_utils.create_network(
166 self.neutron, self.os_creds, self.net_config.network_settings)
167 self.assertEqual(self.net_config.network_settings.name,
169 self.assertTrue(validate_network(
170 self.neutron, self.net_config.network_settings.name, True))
172 subnet_setting = self.net_config.network_settings.subnet_settings[0]
173 self.assertTrue(validate_subnet(
174 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
176 subnet_query1 = neutron_utils.get_subnet(
177 self.neutron, subnet_name=subnet_setting.name)
178 self.assertEqual(self.network.subnets[0], subnet_query1)
180 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
182 self.assertIsNotNone(subnet_query2)
183 self.assertEqual(1, len(subnet_query2))
184 self.assertEqual(self.network.subnets[0], subnet_query2[0])
186 def test_create_subnet_null_name(self):
188 Tests the neutron_utils.create_neutron_subnet() function for an
189 Exception when the subnet name is None
191 self.network = neutron_utils.create_network(
192 self.neutron, self.os_creds, self.net_config.network_settings)
193 self.assertEqual(self.net_config.network_settings.name,
195 self.assertTrue(validate_network(
196 self.neutron, self.net_config.network_settings.name, True))
198 with self.assertRaises(Exception):
199 SubnetConfig(cidr=self.net_config.subnet_cidr)
201 def test_create_subnet_empty_name(self):
203 Tests the neutron_utils.create_network() function with an empty
206 self.network = neutron_utils.create_network(
207 self.neutron, self.os_creds, self.net_config.network_settings)
208 self.assertEqual(self.net_config.network_settings.name,
210 self.assertTrue(validate_network(
211 self.neutron, self.net_config.network_settings.name, True))
213 subnet_setting = self.net_config.network_settings.subnet_settings[0]
214 self.assertTrue(validate_subnet(
215 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
216 self.assertFalse(validate_subnet(
217 self.neutron, '', subnet_setting.cidr, True))
219 subnet_query1 = neutron_utils.get_subnet(
220 self.neutron, subnet_name=subnet_setting.name)
221 self.assertEqual(self.network.subnets[0], subnet_query1)
223 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
225 self.assertIsNotNone(subnet_query2)
226 self.assertEqual(1, len(subnet_query2))
227 self.assertEqual(self.network.subnets[0], subnet_query2[0])
229 def test_create_subnet_null_cidr(self):
231 Tests the neutron_utils.create_neutron_subnet() function for an
232 Exception when the subnet CIDR value is None
234 self.net_config.network_settings.subnet_settings[0].cidr = None
235 with self.assertRaises(Exception):
236 self.network = neutron_utils.create_network(
237 self.neutron, self.os_creds, self.net_config.network_settings)
239 def test_create_subnet_empty_cidr(self):
241 Tests the neutron_utils.create_neutron_subnet() function for an
242 Exception when the subnet CIDR value is empty
244 self.net_config.network_settings.subnet_settings[0].cidr = ''
245 with self.assertRaises(Exception):
246 self.network = neutron_utils.create_network(
247 self.neutron, self.os_creds, self.net_config.network_settings)
250 class NeutronUtilsIPv6Tests(OSComponentTestCase):
252 Test for creating IPv6 networks with subnets via neutron_utils.py
256 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
257 self.neutron = neutron_utils.neutron_client(self.os_creds)
262 Cleans the remote OpenStack objects
266 neutron_utils.delete_network(self.neutron, self.network)
270 def test_create_network_slaac(self):
272 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
273 is True and IPv6 modes are slaac
275 sub_setting = SubnetConfig(
276 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
277 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
278 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
279 enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
280 self.network_settings = NetworkConfig(
281 name=self.guid + '-net', subnet_settings=[sub_setting])
283 self.network = neutron_utils.create_network(
284 self.neutron, self.os_creds, self.network_settings)
285 self.assertEqual(self.network_settings.name, self.network.name)
287 subnet_settings = self.network_settings.subnet_settings[0]
288 self.assertEqual(1, len(self.network.subnets))
289 subnet = self.network.subnets[0]
291 self.assertEqual(self.network.id, subnet.network_id)
292 self.assertEqual(subnet_settings.name, subnet.name)
293 self.assertEqual(subnet_settings.start, subnet.start)
294 self.assertEqual(subnet_settings.end, subnet.end)
295 self.assertEqual('1:1::/64', subnet.cidr)
296 self.assertEqual(6, subnet.ip_version)
297 self.assertEqual(1, len(subnet.dns_nameservers))
299 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
300 self.assertTrue(subnet.enable_dhcp)
302 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
304 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
306 def test_create_network_stateful(self):
308 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
309 is True and IPv6 modes are stateful
311 sub_setting = SubnetConfig(
312 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
313 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
314 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
315 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
316 ipv6_address_mode='dhcpv6-stateful')
317 self.network_settings = NetworkConfig(
318 name=self.guid + '-net', subnet_settings=[sub_setting])
320 self.network = neutron_utils.create_network(
321 self.neutron, self.os_creds, self.network_settings)
323 self.assertEqual(self.network_settings.name, self.network.name)
325 subnet_settings = self.network_settings.subnet_settings[0]
326 self.assertEqual(1, len(self.network.subnets))
327 subnet = self.network.subnets[0]
329 self.assertEqual(self.network.id, subnet.network_id)
330 self.assertEqual(subnet_settings.name, subnet.name)
331 self.assertEqual(subnet_settings.start, subnet.start)
332 self.assertEqual(subnet_settings.end, subnet.end)
333 self.assertEqual('1:1::/64', subnet.cidr)
334 self.assertEqual(6, subnet.ip_version)
335 self.assertEqual(1, len(subnet.dns_nameservers))
337 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
338 self.assertTrue(subnet.enable_dhcp)
340 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
342 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
344 def test_create_network_stateless(self):
346 Tests the neutron_utils.create_network() when DHCP is enabled and
347 the RA and address modes are both 'slaac'
349 sub_setting = SubnetConfig(
350 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
351 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
352 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
353 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
354 ipv6_address_mode='dhcpv6-stateless')
355 self.network_settings = NetworkConfig(
356 name=self.guid + '-net', subnet_settings=[sub_setting])
358 self.network = neutron_utils.create_network(
359 self.neutron, self.os_creds, self.network_settings)
361 self.assertEqual(self.network_settings.name, self.network.name)
363 subnet_settings = self.network_settings.subnet_settings[0]
364 self.assertEqual(1, len(self.network.subnets))
365 subnet = self.network.subnets[0]
367 self.assertEqual(self.network.id, subnet.network_id)
368 self.assertEqual(subnet_settings.name, subnet.name)
369 self.assertEqual(subnet_settings.start, subnet.start)
370 self.assertEqual(subnet_settings.end, subnet.end)
371 self.assertEqual('1:1::/64', subnet.cidr)
372 self.assertEqual(6, subnet.ip_version)
373 self.assertEqual(1, len(subnet.dns_nameservers))
375 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
376 self.assertTrue(subnet.enable_dhcp)
378 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
380 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
382 def test_create_network_no_dhcp_slaac(self):
384 Tests the neutron_utils.create_network() for a BadRequest when
385 DHCP is not enabled and the RA and address modes are both 'slaac'
387 sub_setting = SubnetConfig(
388 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
389 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
390 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
391 enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
392 self.network_settings = NetworkConfig(
393 name=self.guid + '-net', subnet_settings=[sub_setting])
395 with self.assertRaises(BadRequest):
396 self.network = neutron_utils.create_network(
397 self.neutron, self.os_creds, self.network_settings)
399 def test_create_network_invalid_start_ip(self):
401 Tests the neutron_utils.create_network() that contains one IPv6 subnet
402 with an invalid start IP to ensure Neutron assigns it the smallest IP
405 sub_setting = SubnetConfig(
406 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
408 self.network_settings = NetworkConfig(
409 name=self.guid + '-net', subnet_settings=[sub_setting])
411 self.network = neutron_utils.create_network(
412 self.neutron, self.os_creds, self.network_settings)
414 self.assertEqual('1:1::2', self.network.subnets[0].start)
416 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
418 def test_create_network_invalid_end_ip(self):
420 Tests the neutron_utils.create_network() that contains one IPv6 subnet
421 with an invalid end IP to ensure Neutron assigns it the largest IP
424 sub_setting = SubnetConfig(
425 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
427 self.network_settings = NetworkConfig(
428 name=self.guid + '-net', subnet_settings=[sub_setting])
430 self.network = neutron_utils.create_network(
431 self.neutron, self.os_creds, self.network_settings)
433 self.assertEqual('1:1::2', self.network.subnets[0].start)
435 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
437 def test_create_network_with_bad_cidr(self):
439 Tests the neutron_utils.create_network() for a BadRequest when
440 the subnet CIDR is invalid
442 sub_setting = SubnetConfig(
443 name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
444 self.network_settings = NetworkConfig(
445 name=self.guid + '-net', subnet_settings=[sub_setting])
447 with self.assertRaises(BadRequest):
448 self.network = neutron_utils.create_network(
449 self.neutron, self.os_creds, self.network_settings)
451 def test_create_network_invalid_gateway_ip(self):
453 Tests the neutron_utils.create_network() for a BadRequest when
454 the subnet gateway IP is invalid
456 sub_setting = SubnetConfig(
457 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
459 self.network_settings = NetworkConfig(
460 name=self.guid + '-net', subnet_settings=[sub_setting])
462 with self.assertRaises(BadRequest):
463 self.network = neutron_utils.create_network(
464 self.neutron, self.os_creds, self.network_settings)
466 def test_create_network_with_bad_dns(self):
468 Tests the neutron_utils.create_network() for a BadRequest when
469 the DNS IP is invalid
471 sub_setting = SubnetConfig(
472 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
473 dns_nameservers=['foo'])
474 self.network_settings = NetworkConfig(
475 name=self.guid + '-net', subnet_settings=[sub_setting])
477 with self.assertRaises(BadRequest):
478 self.network = neutron_utils.create_network(
479 self.neutron, self.os_creds, self.network_settings)
482 class NeutronUtilsRouterTests(OSComponentTestCase):
484 Test for creating routers via neutron_utils.py
488 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
489 self.port_name = str(guid) + '-port'
490 self.neutron = neutron_utils.neutron_client(self.os_creds)
494 self.interface_router = None
495 self.net_config = openstack_tests.get_pub_net_config(
496 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
497 router_name=guid + '-pub-router', external_net=self.ext_net_name)
501 Cleans the remote OpenStack objects
503 if self.interface_router:
504 neutron_utils.remove_interface_router(
505 self.neutron, self.router, self.network.subnets[0])
509 neutron_utils.delete_router(self.neutron, self.router)
510 validate_router(self.neutron, self.router.name, False)
516 neutron_utils.delete_port(self.neutron, self.port)
521 neutron_utils.delete_network(self.neutron, self.network)
523 def test_create_router_simple(self):
525 Tests the neutron_utils.create_router()
527 self.router = neutron_utils.create_router(
528 self.neutron, self.os_creds, self.net_config.router_settings)
529 validate_router(self.neutron, self.net_config.router_settings.name,
532 def test_create_router_with_public_interface(self):
534 Tests the neutron_utils.create_router() function with a pubic interface
536 subnet_setting = self.net_config.network_settings.subnet_settings[0]
537 self.net_config = openstack_tests.OSNetworkConfig(
538 self.net_config.network_settings.name,
541 self.net_config.router_settings.name,
543 self.router = neutron_utils.create_router(
544 self.neutron, self.os_creds, self.net_config.router_settings)
545 validate_router(self.neutron, self.net_config.router_settings.name,
548 ext_net = neutron_utils.get_network(
549 self.neutron, network_name=self.ext_net_name)
550 self.assertEqual(self.router.external_network_id, ext_net.id)
552 def test_add_interface_router(self):
554 Tests the neutron_utils.add_interface_router() function
556 self.network = neutron_utils.create_network(
557 self.neutron, self.os_creds, self.net_config.network_settings)
558 self.assertEqual(self.net_config.network_settings.name,
560 self.assertTrue(validate_network(
561 self.neutron, self.net_config.network_settings.name, True))
563 subnet_setting = self.net_config.network_settings.subnet_settings[0]
564 self.assertTrue(validate_subnet(
565 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
567 self.router = neutron_utils.create_router(
568 self.neutron, self.os_creds, self.net_config.router_settings)
569 validate_router(self.neutron, self.net_config.router_settings.name,
572 self.interface_router = neutron_utils.add_interface_router(
573 self.neutron, self.router, self.network.subnets[0])
574 validate_interface_router(self.interface_router, self.router,
575 self.network.subnets[0])
577 def test_add_interface_router_null_router(self):
579 Tests the neutron_utils.add_interface_router() function for an
580 Exception when the router value is None
582 self.network = neutron_utils.create_network(
583 self.neutron, self.os_creds, self.net_config.network_settings)
584 self.assertEqual(self.net_config.network_settings.name,
586 self.assertTrue(validate_network(
587 self.neutron, self.net_config.network_settings.name, True))
589 subnet_setting = self.net_config.network_settings.subnet_settings[0]
590 self.assertTrue(validate_subnet(
591 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
593 with self.assertRaises(NeutronException):
594 self.interface_router = neutron_utils.add_interface_router(
595 self.neutron, self.router, self.network.subnets[0])
597 def test_add_interface_router_null_subnet(self):
599 Tests the neutron_utils.add_interface_router() function for an
600 Exception when the subnet value is None
602 self.network = neutron_utils.create_network(
603 self.neutron, self.os_creds, self.net_config.network_settings)
604 self.assertEqual(self.net_config.network_settings.name,
606 self.assertTrue(validate_network(
607 self.neutron, self.net_config.network_settings.name, True))
609 self.router = neutron_utils.create_router(
610 self.neutron, self.os_creds, self.net_config.router_settings)
611 validate_router(self.neutron, self.net_config.router_settings.name,
614 with self.assertRaises(NeutronException):
615 self.interface_router = neutron_utils.add_interface_router(
616 self.neutron, self.router, None)
618 def test_add_interface_router_missing_subnet(self):
620 Tests the neutron_utils.add_interface_router() function for an
621 Exception when the subnet object has been deleted
623 self.network = neutron_utils.create_network(
624 self.neutron, self.os_creds, self.net_config.network_settings)
625 self.assertEqual(self.net_config.network_settings.name,
627 self.assertTrue(validate_network(
628 self.neutron, self.net_config.network_settings.name, True))
630 self.router = neutron_utils.create_router(
631 self.neutron, self.os_creds, self.net_config.router_settings)
632 validate_router(self.neutron, self.net_config.router_settings.name,
635 for subnet in self.network.subnets:
636 neutron_utils.delete_subnet(self.neutron, subnet)
638 with self.assertRaises(NotFound):
639 self.interface_router = neutron_utils.add_interface_router(
640 self.neutron, self.router, self.network.subnets[0])
642 def test_create_port(self):
644 Tests the neutron_utils.create_port() function
646 self.network = neutron_utils.create_network(
647 self.neutron, self.os_creds, self.net_config.network_settings)
648 self.assertEqual(self.net_config.network_settings.name,
650 self.assertTrue(validate_network(
651 self.neutron, self.net_config.network_settings.name, True))
653 subnet_setting = self.net_config.network_settings.subnet_settings[0]
654 self.assertTrue(validate_subnet(
655 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
657 self.port = neutron_utils.create_port(
658 self.neutron, self.os_creds, PortConfig(
661 'subnet_name': subnet_setting.name,
663 network_name=self.net_config.network_settings.name))
664 validate_port(self.neutron, self.port, self.port_name)
666 def test_create_port_empty_name(self):
668 Tests the neutron_utils.create_port() function
670 self.network = neutron_utils.create_network(
671 self.neutron, self.os_creds, self.net_config.network_settings)
672 self.assertEqual(self.net_config.network_settings.name,
674 self.assertTrue(validate_network(
675 self.neutron, self.net_config.network_settings.name, True))
677 subnet_setting = self.net_config.network_settings.subnet_settings[0]
678 self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
679 subnet_setting.cidr, True))
681 self.port = neutron_utils.create_port(
682 self.neutron, self.os_creds, PortConfig(
684 network_name=self.net_config.network_settings.name,
686 'subnet_name': subnet_setting.name,
688 validate_port(self.neutron, self.port, self.port_name)
690 def test_create_port_null_name(self):
692 Tests the neutron_utils.create_port() when the port name value is None
694 self.network = neutron_utils.create_network(
695 self.neutron, self.os_creds, self.net_config.network_settings)
696 self.assertEqual(self.net_config.network_settings.name,
698 self.assertTrue(validate_network(
699 self.neutron, self.net_config.network_settings.name, True))
701 subnet_setting = self.net_config.network_settings.subnet_settings[0]
702 self.assertTrue(validate_subnet(
703 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
705 self.port = neutron_utils.create_port(
706 self.neutron, self.os_creds,
708 network_name=self.net_config.network_settings.name,
710 'subnet_name': subnet_setting.name,
713 port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
714 self.assertEqual(self.port, port)
716 def test_create_port_null_network_object(self):
718 Tests the neutron_utils.create_port() function for an Exception when
719 the network object is None
721 with self.assertRaises(Exception):
722 self.port = neutron_utils.create_port(
723 self.neutron, self.os_creds,
726 network_name=self.net_config.network_settings.name,
729 self.net_config.network_settings.subnet_settings[
733 def test_create_port_null_ip(self):
735 Tests the neutron_utils.create_port() function for an Exception when
738 self.network = neutron_utils.create_network(
739 self.neutron, self.os_creds, self.net_config.network_settings)
740 self.assertEqual(self.net_config.network_settings.name,
742 self.assertTrue(validate_network(
743 self.neutron, self.net_config.network_settings.name, True))
745 subnet_setting = self.net_config.network_settings.subnet_settings[0]
746 self.assertTrue(validate_subnet(
747 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
749 with self.assertRaises(Exception):
750 self.port = neutron_utils.create_port(
751 self.neutron, self.os_creds,
754 network_name=self.net_config.network_settings.name,
756 'subnet_name': subnet_setting.name,
759 def test_create_port_invalid_ip(self):
761 Tests the neutron_utils.create_port() function for an Exception when
764 self.network = neutron_utils.create_network(
765 self.neutron, self.os_creds, self.net_config.network_settings)
766 self.assertEqual(self.net_config.network_settings.name,
768 self.assertTrue(validate_network(
769 self.neutron, self.net_config.network_settings.name, True))
771 subnet_setting = self.net_config.network_settings.subnet_settings[0]
772 self.assertTrue(validate_subnet(
773 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
775 with self.assertRaises(Exception):
776 self.port = neutron_utils.create_port(
777 self.neutron, self.os_creds,
780 network_name=self.net_config.network_settings.name,
782 'subnet_name': subnet_setting.name,
785 def test_create_port_invalid_ip_to_subnet(self):
787 Tests the neutron_utils.create_port() function for an Exception when
790 self.network = neutron_utils.create_network(
791 self.neutron, self.os_creds, self.net_config.network_settings)
792 self.assertEqual(self.net_config.network_settings.name,
794 self.assertTrue(validate_network(
795 self.neutron, self.net_config.network_settings.name, True))
797 subnet_setting = self.net_config.network_settings.subnet_settings[0]
798 self.assertTrue(validate_subnet(
799 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
801 with self.assertRaises(Exception):
802 self.port = neutron_utils.create_port(
803 self.neutron, self.os_creds,
806 network_name=self.net_config.network_settings.name,
808 'subnet_name': subnet_setting.name,
809 'ip': '10.197.123.100'}]))
812 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
814 Test for creating security groups via neutron_utils.py
818 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
819 self.sec_grp_name = guid + 'name'
821 self.security_groups = list()
822 self.security_group_rules = list()
823 self.neutron = neutron_utils.neutron_client(self.os_creds)
824 self.keystone = keystone_utils.keystone_client(self.os_creds)
828 Cleans the remote OpenStack objects
830 for rule in self.security_group_rules:
831 neutron_utils.delete_security_group_rule(self.neutron, rule)
833 for security_group in self.security_groups:
835 neutron_utils.delete_security_group(self.neutron,
840 def test_create_delete_simple_sec_grp(self):
842 Tests the neutron_utils.create_security_group() function
844 sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
845 security_group = neutron_utils.create_security_group(self.neutron,
849 self.assertTrue(sec_grp_settings.name, security_group.name)
851 sec_grp_get = neutron_utils.get_security_group(
852 self.neutron, sec_grp_settings=sec_grp_settings)
853 self.assertIsNotNone(sec_grp_get)
854 self.assertTrue(validation_utils.objects_equivalent(
855 security_group, sec_grp_get))
857 neutron_utils.delete_security_group(self.neutron, security_group)
858 sec_grp_get = neutron_utils.get_security_group(
859 self.neutron, sec_grp_settings=sec_grp_settings)
860 self.assertIsNone(sec_grp_get)
862 def test_create_sec_grp_no_name(self):
864 Tests the SecurityGroupConfig constructor and
865 neutron_utils.create_security_group() function to ensure that
866 attempting to create a security group without a name will raise an
869 with self.assertRaises(Exception):
870 sec_grp_settings = SecurityGroupConfig()
871 self.security_groups.append(
872 neutron_utils.create_security_group(self.neutron,
876 def test_create_sec_grp_no_rules(self):
878 Tests the neutron_utils.create_security_group() function
880 sec_grp_settings = SecurityGroupConfig(
881 name=self.sec_grp_name, description='hello group')
882 self.security_groups.append(
883 neutron_utils.create_security_group(self.neutron, self.keystone,
886 self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
887 self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
889 sec_grp_get = neutron_utils.get_security_group(
890 self.neutron, sec_grp_settings=sec_grp_settings)
891 self.assertIsNotNone(sec_grp_get)
892 self.assertEqual(self.security_groups[0], sec_grp_get)
894 def test_create_sec_grp_one_rule(self):
896 Tests the neutron_utils.create_security_group() function
899 sec_grp_rule_settings = SecurityGroupRuleConfig(
900 sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
901 sec_grp_settings = SecurityGroupConfig(
902 name=self.sec_grp_name, description='hello group',
903 rule_settings=[sec_grp_rule_settings])
905 self.security_groups.append(
906 neutron_utils.create_security_group(self.neutron, self.keystone,
908 free_rules = neutron_utils.get_rules_by_security_group(
909 self.neutron, self.security_groups[0])
910 for free_rule in free_rules:
911 self.security_group_rules.append(free_rule)
913 self.security_group_rules.append(
914 neutron_utils.create_security_group_rule(
915 self.neutron, sec_grp_settings.rule_settings[0]))
917 # Refresh object so it is populated with the newly added rule
918 security_group = neutron_utils.get_security_group(
919 self.neutron, sec_grp_settings=sec_grp_settings)
921 rules = neutron_utils.get_rules_by_security_group(self.neutron,
925 validation_utils.objects_equivalent(
926 self.security_group_rules, rules))
928 self.assertTrue(sec_grp_settings.name, security_group.name)
930 sec_grp_get = neutron_utils.get_security_group(
931 self.neutron, sec_grp_settings=sec_grp_settings)
932 self.assertIsNotNone(sec_grp_get)
933 self.assertEqual(security_group, sec_grp_get)
935 def test_get_sec_grp_by_id(self):
937 Tests the neutron_utils.create_security_group() function
940 self.security_groups.append(neutron_utils.create_security_group(
941 self.neutron, self.keystone,
943 name=self.sec_grp_name + '-1', description='hello group')))
944 self.security_groups.append(neutron_utils.create_security_group(
945 self.neutron, self.keystone,
947 name=self.sec_grp_name + '-2', description='hello group')))
949 sec_grp_1b = neutron_utils.get_security_group_by_id(
950 self.neutron, self.security_groups[0].id)
951 sec_grp_2b = neutron_utils.get_security_group_by_id(
952 self.neutron, self.security_groups[1].id)
954 self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
955 self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
958 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
960 Test basic nova keypair functionality
965 Instantiates the CreateImage object that is responsible for downloading
966 and creating an OS image file within OpenStack
968 self.neutron = neutron_utils.neutron_client(self.os_creds)
969 self.floating_ip = None
973 Cleans the image and downloaded image file
977 neutron_utils.delete_floating_ip(
978 self.neutron, self.floating_ip)
982 def test_floating_ips(self):
984 Tests the creation of a floating IP
987 initial_fips = neutron_utils.get_floating_ips(self.neutron)
989 self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
991 all_fips = neutron_utils.get_floating_ips(self.neutron)
992 self.assertEqual(len(initial_fips) + 1, len(all_fips))
993 returned = neutron_utils.get_floating_ip(self.neutron,
995 self.assertEqual(self.floating_ip.id, returned.id)
996 self.assertEqual(self.floating_ip.ip, returned.ip)
1004 def validate_network(neutron, name, exists):
1006 Returns true if a network for a given name DOES NOT exist if the exists
1007 parameter is false conversely true. Returns false if a network for a given
1008 name DOES exist if the exists parameter is true conversely false.
1009 :param neutron: The neutron client
1010 :param name: The expected network name
1011 :param exists: Whether or not the network name should exist or not
1014 network = neutron_utils.get_network(neutron, network_name=name)
1015 if exists and network:
1017 if not exists and not network:
1022 def validate_subnet(neutron, name, cidr, exists):
1024 Returns true if a subnet for a given name DOES NOT exist if the exists
1025 parameter is false conversely true. Returns false if a subnet for a given
1026 name DOES exist if the exists parameter is true conversely false.
1027 :param neutron: The neutron client
1028 :param name: The expected subnet name
1029 :param cidr: The expected CIDR value
1030 :param exists: Whether or not the network name should exist or not
1033 subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
1034 if exists and subnet and subnet.name == name:
1035 return subnet.cidr == cidr
1036 if not exists and not subnet:
1041 def validate_router(neutron, name, exists):
1043 Returns true if a router for a given name DOES NOT exist if the exists
1044 parameter is false conversely true. Returns false if a router for a given
1045 name DOES exist if the exists parameter is true conversely false.
1046 :param neutron: The neutron client
1047 :param name: The expected router name
1048 :param exists: Whether or not the network name should exist or not
1051 router = neutron_utils.get_router(neutron, router_name=name)
1052 if exists and router:
1057 def validate_interface_router(interface_router, router, subnet):
1059 Returns true if the router ID & subnet ID have been properly included into
1060 the interface router object
1061 :param interface_router: the SNAPS-OO InterfaceRouter domain object
1062 :param router: to validate against the interface_router
1063 :param subnet: to validate against the interface_router
1064 :return: True if both IDs match else False
1066 subnet_id = interface_router.subnet_id
1067 router_id = interface_router.port_id
1069 return subnet.id == subnet_id and router.id == router_id
1072 def validate_port(neutron, port_obj, this_port_name):
1074 Returns true if a port for a given name DOES NOT exist if the exists
1075 parameter is false conversely true. Returns false if a port for a given
1076 name DOES exist if the exists parameter is true conversely false.
1077 :param neutron: The neutron client
1078 :param port_obj: The port object to lookup
1079 :param this_port_name: The expected router name
1082 os_ports = neutron.list_ports()
1083 for os_port, os_port_insts in os_ports.items():
1084 for os_inst in os_port_insts:
1085 if os_inst['id'] == port_obj.id:
1086 return os_inst['name'] == this_port_name