1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from neutronclient.common.exceptions import NotFound, BadRequest
19 from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
20 from snaps.config.security_group import (
21 SecurityGroupConfig, SecurityGroupRuleConfig, Direction)
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
29 __author__ = 'spisarski'
35 class NeutronSmokeTests(OSComponentTestCase):
37 Tests to ensure that the neutron client can communicate with the cloud
40 def test_neutron_connect_success(self):
42 Tests to ensure that the proper credentials can connect.
44 neutron = neutron_utils.neutron_client(self.os_creds)
46 networks = neutron.list_networks()
49 networks = networks.get('networks')
50 for network in networks:
51 if network.get('name') == self.ext_net_name:
53 self.assertTrue(found)
55 def test_neutron_connect_fail(self):
57 Tests to ensure that the improper credentials cannot connect.
59 from snaps.openstack.os_credentials import OSCreds
61 with self.assertRaises(Exception):
62 neutron = neutron_utils.neutron_client(
63 OSCreds(username='user', password='pass', auth_url='url',
64 project_name='project'))
65 neutron.list_networks()
67 def test_retrieve_ext_network_name(self):
69 Tests the neutron_utils.get_external_network_names to ensure the
70 configured self.ext_net_name is contained within the returned list
73 neutron = neutron_utils.neutron_client(self.os_creds)
74 ext_networks = neutron_utils.get_external_networks(neutron)
76 for network in ext_networks:
77 if network.name == self.ext_net_name:
80 self.assertTrue(found)
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
85 Test for creating networks via neutron_utils.py
89 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90 self.port_name = str(guid) + '-port'
91 self.neutron = neutron_utils.neutron_client(self.os_creds)
93 self.net_config = openstack_tests.get_pub_net_config(
94 net_name=guid + '-pub-net')
98 Cleans the remote OpenStack objects
101 neutron_utils.delete_network(self.neutron, self.network)
103 def test_create_network(self):
105 Tests the neutron_utils.create_network() function
107 self.network = neutron_utils.create_network(
108 self.neutron, self.os_creds, self.net_config.network_settings)
109 self.assertEqual(self.net_config.network_settings.name,
111 self.assertTrue(validate_network(
112 self.neutron, self.net_config.network_settings.name, True,
114 self.assertEqual(len(self.net_config.network_settings.subnet_settings),
115 len(self.network.subnets))
117 def test_create_network_empty_name(self):
119 Tests the neutron_utils.create_network() function with an empty
122 with self.assertRaises(Exception):
123 self.network = neutron_utils.create_network(
124 self.neutron, self.os_creds,
125 network_settings=NetworkConfig(name=''))
127 def test_create_network_null_name(self):
129 Tests the neutron_utils.create_network() function when the network
132 with self.assertRaises(Exception):
133 self.network = neutron_utils.create_network(
134 self.neutron, self.os_creds,
135 network_settings=NetworkConfig())
138 class NeutronUtilsSubnetTests(OSComponentTestCase):
140 Test for creating networks with subnets via neutron_utils.py
144 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
145 self.port_name = str(guid) + '-port'
146 self.neutron = neutron_utils.neutron_client(self.os_creds)
148 self.net_config = openstack_tests.get_pub_net_config(
149 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
150 external_net=self.ext_net_name)
154 Cleans the remote OpenStack objects
158 neutron_utils.delete_network(self.neutron, self.network)
162 def test_create_subnet(self):
164 Tests the neutron_utils.create_network() function
166 self.network = neutron_utils.create_network(
167 self.neutron, self.os_creds, self.net_config.network_settings)
168 self.assertEqual(self.net_config.network_settings.name,
170 self.assertTrue(validate_network(
171 self.neutron, self.net_config.network_settings.name, True,
174 subnet_setting = self.net_config.network_settings.subnet_settings[0]
175 self.assertTrue(validate_subnet(
176 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
178 subnet_query1 = neutron_utils.get_subnet(
179 self.neutron, subnet_name=subnet_setting.name)
180 self.assertEqual(self.network.subnets[0], subnet_query1)
182 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
184 self.assertIsNotNone(subnet_query2)
185 self.assertEqual(1, len(subnet_query2))
186 self.assertEqual(self.network.subnets[0], subnet_query2[0])
188 def test_create_subnet_null_name(self):
190 Tests the neutron_utils.create_neutron_subnet() function for an
191 Exception when the subnet name is None
193 self.network = neutron_utils.create_network(
194 self.neutron, self.os_creds, self.net_config.network_settings)
195 self.assertEqual(self.net_config.network_settings.name,
197 self.assertTrue(validate_network(
198 self.neutron, self.net_config.network_settings.name, True,
201 with self.assertRaises(Exception):
202 SubnetConfig(cidr=self.net_config.subnet_cidr)
204 def test_create_subnet_empty_name(self):
206 Tests the neutron_utils.create_network() function with an empty
209 self.network = neutron_utils.create_network(
210 self.neutron, self.os_creds, self.net_config.network_settings)
211 self.assertEqual(self.net_config.network_settings.name,
213 self.assertTrue(validate_network(
214 self.neutron, self.net_config.network_settings.name, True,
217 subnet_setting = self.net_config.network_settings.subnet_settings[0]
218 self.assertTrue(validate_subnet(
219 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
220 self.assertFalse(validate_subnet(
221 self.neutron, '', subnet_setting.cidr, True))
223 subnet_query1 = neutron_utils.get_subnet(
224 self.neutron, subnet_name=subnet_setting.name)
225 self.assertEqual(self.network.subnets[0], subnet_query1)
227 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
229 self.assertIsNotNone(subnet_query2)
230 self.assertEqual(1, len(subnet_query2))
231 self.assertEqual(self.network.subnets[0], subnet_query2[0])
233 def test_create_subnet_null_cidr(self):
235 Tests the neutron_utils.create_neutron_subnet() function for an
236 Exception when the subnet CIDR value is None
238 self.net_config.network_settings.subnet_settings[0].cidr = None
239 with self.assertRaises(Exception):
240 self.network = neutron_utils.create_network(
241 self.neutron, self.os_creds, self.net_config.network_settings)
243 def test_create_subnet_empty_cidr(self):
245 Tests the neutron_utils.create_neutron_subnet() function for an
246 Exception when the subnet CIDR value is empty
248 self.net_config.network_settings.subnet_settings[0].cidr = ''
249 with self.assertRaises(Exception):
250 self.network = neutron_utils.create_network(
251 self.neutron, self.os_creds, self.net_config.network_settings)
254 class NeutronUtilsIPv6Tests(OSComponentTestCase):
256 Test for creating IPv6 networks with subnets via neutron_utils.py
260 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
261 self.neutron = neutron_utils.neutron_client(self.os_creds)
266 Cleans the remote OpenStack objects
270 neutron_utils.delete_network(self.neutron, self.network)
274 def test_create_network_slaac(self):
276 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
277 is True and IPv6 modes are slaac
279 sub_setting = SubnetConfig(
280 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
281 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
282 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
283 enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
284 self.network_settings = NetworkConfig(
285 name=self.guid + '-net', subnet_settings=[sub_setting])
287 self.network = neutron_utils.create_network(
288 self.neutron, self.os_creds, self.network_settings)
289 self.assertEqual(self.network_settings.name, self.network.name)
291 subnet_settings = self.network_settings.subnet_settings[0]
292 self.assertEqual(1, len(self.network.subnets))
293 subnet = self.network.subnets[0]
295 self.assertEqual(self.network.id, subnet.network_id)
296 self.assertEqual(subnet_settings.name, subnet.name)
297 self.assertEqual(subnet_settings.start, subnet.start)
298 self.assertEqual(subnet_settings.end, subnet.end)
299 self.assertEqual('1:1::/64', subnet.cidr)
300 self.assertEqual(6, subnet.ip_version)
301 self.assertEqual(1, len(subnet.dns_nameservers))
303 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
304 self.assertTrue(subnet.enable_dhcp)
306 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
308 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
310 def test_create_network_stateful(self):
312 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
313 is True and IPv6 modes are stateful
315 sub_setting = SubnetConfig(
316 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
317 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
318 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
319 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
320 ipv6_address_mode='dhcpv6-stateful')
321 self.network_settings = NetworkConfig(
322 name=self.guid + '-net', subnet_settings=[sub_setting])
324 self.network = neutron_utils.create_network(
325 self.neutron, self.os_creds, self.network_settings)
327 self.assertEqual(self.network_settings.name, self.network.name)
329 subnet_settings = self.network_settings.subnet_settings[0]
330 self.assertEqual(1, len(self.network.subnets))
331 subnet = self.network.subnets[0]
333 self.assertEqual(self.network.id, subnet.network_id)
334 self.assertEqual(subnet_settings.name, subnet.name)
335 self.assertEqual(subnet_settings.start, subnet.start)
336 self.assertEqual(subnet_settings.end, subnet.end)
337 self.assertEqual('1:1::/64', subnet.cidr)
338 self.assertEqual(6, subnet.ip_version)
339 self.assertEqual(1, len(subnet.dns_nameservers))
341 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
342 self.assertTrue(subnet.enable_dhcp)
344 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
346 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
348 def test_create_network_stateless(self):
350 Tests the neutron_utils.create_network() when DHCP is enabled and
351 the RA and address modes are both 'slaac'
353 sub_setting = SubnetConfig(
354 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
355 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
356 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
357 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
358 ipv6_address_mode='dhcpv6-stateless')
359 self.network_settings = NetworkConfig(
360 name=self.guid + '-net', subnet_settings=[sub_setting])
362 self.network = neutron_utils.create_network(
363 self.neutron, self.os_creds, self.network_settings)
365 self.assertEqual(self.network_settings.name, self.network.name)
367 subnet_settings = self.network_settings.subnet_settings[0]
368 self.assertEqual(1, len(self.network.subnets))
369 subnet = self.network.subnets[0]
371 self.assertEqual(self.network.id, subnet.network_id)
372 self.assertEqual(subnet_settings.name, subnet.name)
373 self.assertEqual(subnet_settings.start, subnet.start)
374 self.assertEqual(subnet_settings.end, subnet.end)
375 self.assertEqual('1:1::/64', subnet.cidr)
376 self.assertEqual(6, subnet.ip_version)
377 self.assertEqual(1, len(subnet.dns_nameservers))
379 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
380 self.assertTrue(subnet.enable_dhcp)
382 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
384 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
386 def test_create_network_no_dhcp_slaac(self):
388 Tests the neutron_utils.create_network() for a BadRequest when
389 DHCP is not enabled and the RA and address modes are both 'slaac'
391 sub_setting = SubnetConfig(
392 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
393 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
394 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
395 enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
396 self.network_settings = NetworkConfig(
397 name=self.guid + '-net', subnet_settings=[sub_setting])
399 with self.assertRaises(BadRequest):
400 self.network = neutron_utils.create_network(
401 self.neutron, self.os_creds, self.network_settings)
403 def test_create_network_invalid_start_ip(self):
405 Tests the neutron_utils.create_network() that contains one IPv6 subnet
406 with an invalid start IP to ensure Neutron assigns it the smallest IP
409 sub_setting = SubnetConfig(
410 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
412 self.network_settings = NetworkConfig(
413 name=self.guid + '-net', subnet_settings=[sub_setting])
415 self.network = neutron_utils.create_network(
416 self.neutron, self.os_creds, self.network_settings)
418 self.assertEqual('1:1::2', self.network.subnets[0].start)
420 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
422 def test_create_network_invalid_end_ip(self):
424 Tests the neutron_utils.create_network() that contains one IPv6 subnet
425 with an invalid end IP to ensure Neutron assigns it the largest IP
428 sub_setting = SubnetConfig(
429 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
431 self.network_settings = NetworkConfig(
432 name=self.guid + '-net', subnet_settings=[sub_setting])
434 self.network = neutron_utils.create_network(
435 self.neutron, self.os_creds, self.network_settings)
437 self.assertEqual('1:1::2', self.network.subnets[0].start)
439 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
441 def test_create_network_with_bad_cidr(self):
443 Tests the neutron_utils.create_network() for a BadRequest when
444 the subnet CIDR is invalid
446 sub_setting = SubnetConfig(
447 name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
448 self.network_settings = NetworkConfig(
449 name=self.guid + '-net', subnet_settings=[sub_setting])
451 with self.assertRaises(BadRequest):
452 self.network = neutron_utils.create_network(
453 self.neutron, self.os_creds, self.network_settings)
455 def test_create_network_invalid_gateway_ip(self):
457 Tests the neutron_utils.create_network() for a BadRequest when
458 the subnet gateway IP is invalid
460 sub_setting = SubnetConfig(
461 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
462 gateway_ip='192.168.0.1')
463 self.network_settings = NetworkConfig(
464 name=self.guid + '-net', subnet_settings=[sub_setting])
466 with self.assertRaises(BadRequest):
467 self.network = neutron_utils.create_network(
468 self.neutron, self.os_creds, self.network_settings)
470 def test_create_network_with_bad_dns(self):
472 Tests the neutron_utils.create_network() for a BadRequest when
473 the DNS IP is invalid
475 sub_setting = SubnetConfig(
476 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
477 dns_nameservers=['foo'])
478 self.network_settings = NetworkConfig(
479 name=self.guid + '-net', subnet_settings=[sub_setting])
481 with self.assertRaises(BadRequest):
482 self.network = neutron_utils.create_network(
483 self.neutron, self.os_creds, self.network_settings)
486 class NeutronUtilsRouterTests(OSComponentTestCase):
488 Test for creating routers via neutron_utils.py
492 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
493 self.port_name = str(guid) + '-port'
494 self.neutron = neutron_utils.neutron_client(self.os_creds)
498 self.interface_router = None
499 self.net_config = openstack_tests.get_pub_net_config(
500 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
501 router_name=guid + '-pub-router', external_net=self.ext_net_name)
505 Cleans the remote OpenStack objects
507 if self.interface_router:
508 neutron_utils.remove_interface_router(
509 self.neutron, self.router, self.network.subnets[0])
513 neutron_utils.delete_router(self.neutron, self.router)
514 validate_router(self.neutron, self.router.name, False)
520 neutron_utils.delete_port(self.neutron, self.port)
525 neutron_utils.delete_network(self.neutron, self.network)
527 def test_create_router_simple(self):
529 Tests the neutron_utils.create_router()
531 self.router = neutron_utils.create_router(
532 self.neutron, self.os_creds, self.net_config.router_settings,
534 validate_router(self.neutron, self.net_config.router_settings.name,
537 def test_create_router_with_public_interface(self):
539 Tests the neutron_utils.create_router() function with a pubic interface
541 subnet_setting = self.net_config.network_settings.subnet_settings[0]
542 self.net_config = openstack_tests.OSNetworkConfig(
543 self.net_config.network_settings.name,
546 self.net_config.router_settings.name,
548 self.router = neutron_utils.create_router(
549 self.neutron, self.os_creds, self.net_config.router_settings,
551 validate_router(self.neutron, self.net_config.router_settings.name,
554 ext_net = neutron_utils.get_network(
555 self.neutron, network_name=self.ext_net_name)
556 self.assertEqual(self.router.external_network_id, ext_net.id)
558 def test_add_interface_router(self):
560 Tests the neutron_utils.add_interface_router() function
562 self.network = neutron_utils.create_network(
563 self.neutron, self.os_creds, self.net_config.network_settings)
564 self.assertEqual(self.net_config.network_settings.name,
566 self.assertTrue(validate_network(
567 self.neutron, self.net_config.network_settings.name, True,
570 subnet_setting = self.net_config.network_settings.subnet_settings[0]
571 self.assertTrue(validate_subnet(
572 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
574 self.router = neutron_utils.create_router(
575 self.neutron, self.os_creds, self.net_config.router_settings,
577 validate_router(self.neutron, self.net_config.router_settings.name,
580 self.interface_router = neutron_utils.add_interface_router(
581 self.neutron, self.router, self.network.subnets[0])
582 validate_interface_router(self.interface_router, self.router,
583 self.network.subnets[0])
585 def test_add_interface_router_null_router(self):
587 Tests the neutron_utils.add_interface_router() function for an
588 Exception when the router value is None
590 self.network = neutron_utils.create_network(
591 self.neutron, self.os_creds, self.net_config.network_settings)
592 self.assertEqual(self.net_config.network_settings.name,
594 self.assertTrue(validate_network(
595 self.neutron, self.net_config.network_settings.name, True,
598 subnet_setting = self.net_config.network_settings.subnet_settings[0]
599 self.assertTrue(validate_subnet(
600 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
602 with self.assertRaises(NeutronException):
603 self.interface_router = neutron_utils.add_interface_router(
604 self.neutron, self.router, self.network.subnets[0])
606 def test_add_interface_router_null_subnet(self):
608 Tests the neutron_utils.add_interface_router() function for an
609 Exception when the subnet value is None
611 self.network = neutron_utils.create_network(
612 self.neutron, self.os_creds, self.net_config.network_settings)
613 self.assertEqual(self.net_config.network_settings.name,
615 self.assertTrue(validate_network(
616 self.neutron, self.net_config.network_settings.name, True,
619 self.router = neutron_utils.create_router(
620 self.neutron, self.os_creds, self.net_config.router_settings,
622 validate_router(self.neutron, self.net_config.router_settings.name,
625 with self.assertRaises(NeutronException):
626 self.interface_router = neutron_utils.add_interface_router(
627 self.neutron, self.router, None)
629 def test_add_interface_router_missing_subnet(self):
631 Tests the neutron_utils.add_interface_router() function for an
632 Exception when the subnet object has been deleted
634 self.network = neutron_utils.create_network(
635 self.neutron, self.os_creds, self.net_config.network_settings)
636 self.assertEqual(self.net_config.network_settings.name,
638 self.assertTrue(validate_network(
639 self.neutron, self.net_config.network_settings.name, True,
642 self.router = neutron_utils.create_router(
643 self.neutron, self.os_creds, self.net_config.router_settings,
645 validate_router(self.neutron, self.net_config.router_settings.name,
648 for subnet in self.network.subnets:
649 neutron_utils.delete_subnet(self.neutron, subnet)
651 with self.assertRaises(NotFound):
652 self.interface_router = neutron_utils.add_interface_router(
653 self.neutron, self.router, self.network.subnets[0])
655 def test_create_port(self):
657 Tests the neutron_utils.create_port() function
659 self.network = neutron_utils.create_network(
660 self.neutron, self.os_creds, self.net_config.network_settings)
661 self.assertEqual(self.net_config.network_settings.name,
663 self.assertTrue(validate_network(
664 self.neutron, self.net_config.network_settings.name, True,
667 subnet_setting = self.net_config.network_settings.subnet_settings[0]
668 self.assertTrue(validate_subnet(
669 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
671 self.port = neutron_utils.create_port(
672 self.neutron, self.os_creds, PortConfig(
675 'subnet_name': subnet_setting.name,
677 network_name=self.net_config.network_settings.name))
678 validate_port(self.neutron, self.port, self.port_name)
680 def test_create_port_empty_name(self):
682 Tests the neutron_utils.create_port() function
684 self.network = neutron_utils.create_network(
685 self.neutron, self.os_creds, self.net_config.network_settings)
686 self.assertEqual(self.net_config.network_settings.name,
688 self.assertTrue(validate_network(
689 self.neutron, self.net_config.network_settings.name, True,
692 subnet_setting = self.net_config.network_settings.subnet_settings[0]
693 self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
694 subnet_setting.cidr, True))
696 self.port = neutron_utils.create_port(
697 self.neutron, self.os_creds, PortConfig(
699 network_name=self.net_config.network_settings.name,
701 'subnet_name': subnet_setting.name,
703 validate_port(self.neutron, self.port, self.port_name)
705 def test_create_port_null_name(self):
707 Tests the neutron_utils.create_port() when the port name value is None
709 self.network = neutron_utils.create_network(
710 self.neutron, self.os_creds, self.net_config.network_settings)
711 self.assertEqual(self.net_config.network_settings.name,
713 self.assertTrue(validate_network(
714 self.neutron, self.net_config.network_settings.name, True,
717 subnet_setting = self.net_config.network_settings.subnet_settings[0]
718 self.assertTrue(validate_subnet(
719 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
721 self.port = neutron_utils.create_port(
722 self.neutron, self.os_creds,
724 network_name=self.net_config.network_settings.name,
726 'subnet_name': subnet_setting.name,
729 port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
730 self.assertEqual(self.port, port)
732 def test_create_port_null_network_object(self):
734 Tests the neutron_utils.create_port() function for an Exception when
735 the network object is None
737 with self.assertRaises(Exception):
738 self.port = neutron_utils.create_port(
739 self.neutron, self.os_creds,
742 network_name=self.net_config.network_settings.name,
745 self.net_config.network_settings.subnet_settings[
749 def test_create_port_null_ip(self):
751 Tests the neutron_utils.create_port() function for an Exception when
754 self.network = neutron_utils.create_network(
755 self.neutron, self.os_creds, self.net_config.network_settings)
756 self.assertEqual(self.net_config.network_settings.name,
758 self.assertTrue(validate_network(
759 self.neutron, self.net_config.network_settings.name, True,
762 subnet_setting = self.net_config.network_settings.subnet_settings[0]
763 self.assertTrue(validate_subnet(
764 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
766 with self.assertRaises(Exception):
767 self.port = neutron_utils.create_port(
768 self.neutron, self.os_creds,
771 network_name=self.net_config.network_settings.name,
773 'subnet_name': subnet_setting.name,
776 def test_create_port_invalid_ip(self):
778 Tests the neutron_utils.create_port() function for an Exception when
781 self.network = neutron_utils.create_network(
782 self.neutron, self.os_creds, self.net_config.network_settings)
783 self.assertEqual(self.net_config.network_settings.name,
785 self.assertTrue(validate_network(
786 self.neutron, self.net_config.network_settings.name, True,
789 subnet_setting = self.net_config.network_settings.subnet_settings[0]
790 self.assertTrue(validate_subnet(
791 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
793 with self.assertRaises(Exception):
794 self.port = neutron_utils.create_port(
795 self.neutron, self.os_creds,
798 network_name=self.net_config.network_settings.name,
800 'subnet_name': subnet_setting.name,
803 def test_create_port_invalid_ip_to_subnet(self):
805 Tests the neutron_utils.create_port() function for an Exception when
808 self.network = neutron_utils.create_network(
809 self.neutron, self.os_creds, self.net_config.network_settings)
810 self.assertEqual(self.net_config.network_settings.name,
812 self.assertTrue(validate_network(
813 self.neutron, self.net_config.network_settings.name, True,
816 subnet_setting = self.net_config.network_settings.subnet_settings[0]
817 self.assertTrue(validate_subnet(
818 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
820 with self.assertRaises(Exception):
821 self.port = neutron_utils.create_port(
822 self.neutron, self.os_creds,
825 network_name=self.net_config.network_settings.name,
827 'subnet_name': subnet_setting.name,
828 'ip': '10.197.123.100'}]))
831 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
833 Test for creating security groups via neutron_utils.py
837 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
838 self.sec_grp_name = guid + 'name'
840 self.security_groups = list()
841 self.security_group_rules = list()
842 self.neutron = neutron_utils.neutron_client(self.os_creds)
843 self.keystone = keystone_utils.keystone_client(self.os_creds)
847 Cleans the remote OpenStack objects
849 for rule in self.security_group_rules:
850 neutron_utils.delete_security_group_rule(self.neutron, rule)
852 for security_group in self.security_groups:
854 neutron_utils.delete_security_group(self.neutron,
859 def test_create_delete_simple_sec_grp(self):
861 Tests the neutron_utils.create_security_group() function
863 sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
864 security_group = neutron_utils.create_security_group(
865 self.neutron, self.keystone, sec_grp_settings, self.project_id)
867 self.assertTrue(sec_grp_settings.name, security_group.name)
869 sec_grp_get = neutron_utils.get_security_group(
870 self.neutron, sec_grp_settings=sec_grp_settings)
871 self.assertIsNotNone(sec_grp_get)
872 self.assertTrue(validation_utils.objects_equivalent(
873 security_group, sec_grp_get))
875 neutron_utils.delete_security_group(self.neutron, security_group)
876 sec_grp_get = neutron_utils.get_security_group(
877 self.neutron, sec_grp_settings=sec_grp_settings)
878 self.assertIsNone(sec_grp_get)
880 def test_create_sec_grp_no_name(self):
882 Tests the SecurityGroupConfig constructor and
883 neutron_utils.create_security_group() function to ensure that
884 attempting to create a security group without a name will raise an
887 with self.assertRaises(Exception):
888 sec_grp_settings = SecurityGroupConfig()
889 self.security_groups.append(
890 neutron_utils.create_security_group(
891 self.neutron, self.keystone, sec_grp_settings,
894 def test_create_sec_grp_no_rules(self):
896 Tests the neutron_utils.create_security_group() function
898 sec_grp_settings = SecurityGroupConfig(
899 name=self.sec_grp_name, description='hello group')
900 self.security_groups.append(
901 neutron_utils.create_security_group(
902 self.neutron, self.keystone, sec_grp_settings,
905 self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
906 self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
908 sec_grp_get = neutron_utils.get_security_group(
909 self.neutron, sec_grp_settings=sec_grp_settings)
910 self.assertIsNotNone(sec_grp_get)
911 self.assertEqual(self.security_groups[0], sec_grp_get)
913 def test_create_sec_grp_one_rule(self):
915 Tests the neutron_utils.create_security_group() function
918 sec_grp_rule_settings = SecurityGroupRuleConfig(
919 sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
920 sec_grp_settings = SecurityGroupConfig(
921 name=self.sec_grp_name, description='hello group',
922 rule_settings=[sec_grp_rule_settings])
924 self.security_groups.append(
925 neutron_utils.create_security_group(
926 self.neutron, self.keystone, sec_grp_settings,
928 free_rules = neutron_utils.get_rules_by_security_group(
929 self.neutron, self.security_groups[0])
930 for free_rule in free_rules:
931 self.security_group_rules.append(free_rule)
933 keystone = keystone_utils.keystone_client(self.os_creds)
934 project_id = keystone_utils.get_project(
935 keystone, self.os_creds.project_name)
936 self.security_group_rules.append(
937 neutron_utils.create_security_group_rule(
938 self.neutron, sec_grp_settings.rule_settings[0], project_id))
940 # Refresh object so it is populated with the newly added rule
941 security_group = neutron_utils.get_security_group(
942 self.neutron, sec_grp_settings=sec_grp_settings)
944 rules = neutron_utils.get_rules_by_security_group(self.neutron,
948 validation_utils.objects_equivalent(
949 self.security_group_rules, rules))
951 self.assertTrue(sec_grp_settings.name, security_group.name)
953 sec_grp_get = neutron_utils.get_security_group(
954 self.neutron, sec_grp_settings=sec_grp_settings)
955 self.assertIsNotNone(sec_grp_get)
956 self.assertEqual(security_group, sec_grp_get)
958 def test_get_sec_grp_by_id(self):
960 Tests the neutron_utils.create_security_group() function
963 self.security_groups.append(neutron_utils.create_security_group(
964 self.neutron, self.keystone,
966 name=self.sec_grp_name + '-1', description='hello group'),
968 self.security_groups.append(neutron_utils.create_security_group(
969 self.neutron, self.keystone,
971 name=self.sec_grp_name + '-2', description='hello group'),
974 sec_grp_1b = neutron_utils.get_security_group_by_id(
975 self.neutron, self.security_groups[0].id)
976 sec_grp_2b = neutron_utils.get_security_group_by_id(
977 self.neutron, self.security_groups[1].id)
979 self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
980 self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
983 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
985 Test basic nova keypair functionality
990 Instantiates the CreateImage object that is responsible for downloading
991 and creating an OS image file within OpenStack
993 self.neutron = neutron_utils.neutron_client(self.os_creds)
994 self.floating_ip = None
998 Cleans the image and downloaded image file
1000 if self.floating_ip:
1002 neutron_utils.delete_floating_ip(
1003 self.neutron, self.floating_ip)
1007 def test_floating_ips(self):
1009 Tests the creation of a floating IP
1012 initial_fips = neutron_utils.get_floating_ips(self.neutron)
1014 self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
1016 all_fips = neutron_utils.get_floating_ips(self.neutron)
1017 self.assertEqual(len(initial_fips) + 1, len(all_fips))
1018 returned = neutron_utils.get_floating_ip(self.neutron,
1020 self.assertEqual(self.floating_ip.id, returned.id)
1021 self.assertEqual(self.floating_ip.ip, returned.ip)
1029 def validate_network(neutron, name, exists, project_id):
1031 Returns true if a network for a given name DOES NOT exist if the exists
1032 parameter is false conversely true. Returns false if a network for a given
1033 name DOES exist if the exists parameter is true conversely false.
1034 :param neutron: The neutron client
1035 :param name: The expected network name
1036 :param exists: Whether or not the network name should exist or not
1037 :param project_id: the associated project ID
1040 network = neutron_utils.get_network(neutron, network_name=name,
1041 project_id=project_id)
1042 if exists and network:
1044 if not exists and not network:
1049 def validate_subnet(neutron, name, cidr, exists):
1051 Returns true if a subnet for a given name DOES NOT exist if the exists
1052 parameter is false conversely true. Returns false if a subnet for a given
1053 name DOES exist if the exists parameter is true conversely false.
1054 :param neutron: The neutron client
1055 :param name: The expected subnet name
1056 :param cidr: The expected CIDR value
1057 :param exists: Whether or not the network name should exist or not
1060 subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
1061 if exists and subnet and subnet.name == name:
1062 return subnet.cidr == cidr
1063 if not exists and not subnet:
1068 def validate_router(neutron, name, exists):
1070 Returns true if a router for a given name DOES NOT exist if the exists
1071 parameter is false conversely true. Returns false if a router for a given
1072 name DOES exist if the exists parameter is true conversely false.
1073 :param neutron: The neutron client
1074 :param name: The expected router name
1075 :param exists: Whether or not the network name should exist or not
1078 router = neutron_utils.get_router(neutron, router_name=name)
1079 if exists and router:
1084 def validate_interface_router(interface_router, router, subnet):
1086 Returns true if the router ID & subnet ID have been properly included into
1087 the interface router object
1088 :param interface_router: the SNAPS-OO InterfaceRouter domain object
1089 :param router: to validate against the interface_router
1090 :param subnet: to validate against the interface_router
1091 :return: True if both IDs match else False
1093 subnet_id = interface_router.subnet_id
1094 router_id = interface_router.port_id
1096 return subnet.id == subnet_id and router.id == router_id
1099 def validate_port(neutron, port_obj, this_port_name):
1101 Returns true if a port for a given name DOES NOT exist if the exists
1102 parameter is false conversely true. Returns false if a port for a given
1103 name DOES exist if the exists parameter is true conversely false.
1104 :param neutron: The neutron client
1105 :param port_obj: The port object to lookup
1106 :param this_port_name: The expected router name
1109 os_ports = neutron.list_ports()
1110 for os_port, os_port_insts in os_ports.items():
1111 for os_inst in os_port_insts:
1112 if os_inst['id'] == port_obj.id:
1113 return os_inst['name'] == this_port_name