1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from neutronclient.common.exceptions import NotFound, BadRequest
19 from snaps.openstack import create_router
20 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
22 from snaps.openstack.create_security_group import SecurityGroupSettings, \
23 SecurityGroupRuleSettings, Direction
24 from snaps.openstack.tests import openstack_tests
25 from snaps.openstack.tests import validation_utils
26 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
27 from snaps.openstack.utils import keystone_utils
28 from snaps.openstack.utils import neutron_utils
29 from snaps.openstack.utils.neutron_utils import NeutronException
31 __author__ = 'spisarski'
37 class NeutronSmokeTests(OSComponentTestCase):
39 Tests to ensure that the neutron client can communicate with the cloud
42 def test_neutron_connect_success(self):
44 Tests to ensure that the proper credentials can connect.
46 neutron = neutron_utils.neutron_client(self.os_creds)
48 networks = neutron.list_networks()
51 networks = networks.get('networks')
52 for network in networks:
53 if network.get('name') == self.ext_net_name:
55 self.assertTrue(found)
57 def test_neutron_connect_fail(self):
59 Tests to ensure that the improper credentials cannot connect.
61 from snaps.openstack.os_credentials import OSCreds
63 with self.assertRaises(Exception):
64 neutron = neutron_utils.neutron_client(
65 OSCreds(username='user', password='pass', auth_url='url',
66 project_name='project'))
67 neutron.list_networks()
69 def test_retrieve_ext_network_name(self):
71 Tests the neutron_utils.get_external_network_names to ensure the
72 configured self.ext_net_name is contained within the returned list
75 neutron = neutron_utils.neutron_client(self.os_creds)
76 ext_networks = neutron_utils.get_external_networks(neutron)
78 for network in ext_networks:
79 if network.name == self.ext_net_name:
82 self.assertTrue(found)
85 class NeutronUtilsNetworkTests(OSComponentTestCase):
87 Test for creating networks via neutron_utils.py
91 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
92 self.port_name = str(guid) + '-port'
93 self.neutron = neutron_utils.neutron_client(self.os_creds)
95 self.net_config = openstack_tests.get_pub_net_config(
96 net_name=guid + '-pub-net')
100 Cleans the remote OpenStack objects
103 neutron_utils.delete_network(self.neutron, self.network)
105 def test_create_network(self):
107 Tests the neutron_utils.create_network() function
109 self.network = neutron_utils.create_network(
110 self.neutron, self.os_creds, self.net_config.network_settings)
111 self.assertEqual(self.net_config.network_settings.name,
113 self.assertTrue(validate_network(
114 self.neutron, self.net_config.network_settings.name, True))
115 self.assertEqual(len(self.net_config.network_settings.subnet_settings),
116 len(self.network.subnets))
118 def test_create_network_empty_name(self):
120 Tests the neutron_utils.create_network() function with an empty
123 with self.assertRaises(Exception):
124 self.network = neutron_utils.create_network(
125 self.neutron, self.os_creds,
126 network_settings=NetworkSettings(name=''))
128 def test_create_network_null_name(self):
130 Tests the neutron_utils.create_network() function when the network
133 with self.assertRaises(Exception):
134 self.network = neutron_utils.create_network(
135 self.neutron, self.os_creds,
136 network_settings=NetworkSettings())
139 class NeutronUtilsSubnetTests(OSComponentTestCase):
141 Test for creating networks with subnets via neutron_utils.py
145 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
146 self.port_name = str(guid) + '-port'
147 self.neutron = neutron_utils.neutron_client(self.os_creds)
149 self.net_config = openstack_tests.get_pub_net_config(
150 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
151 external_net=self.ext_net_name)
155 Cleans the remote OpenStack objects
159 neutron_utils.delete_network(self.neutron, self.network)
163 def test_create_subnet(self):
165 Tests the neutron_utils.create_network() function
167 self.network = neutron_utils.create_network(
168 self.neutron, self.os_creds, self.net_config.network_settings)
169 self.assertEqual(self.net_config.network_settings.name,
171 self.assertTrue(validate_network(
172 self.neutron, self.net_config.network_settings.name, True))
174 subnet_setting = self.net_config.network_settings.subnet_settings[0]
175 self.assertTrue(validate_subnet(
176 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
178 subnet_query1 = neutron_utils.get_subnet(
179 self.neutron, subnet_name=subnet_setting.name)
180 self.assertEqual(self.network.subnets[0], subnet_query1)
182 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
184 self.assertIsNotNone(subnet_query2)
185 self.assertEqual(1, len(subnet_query2))
186 self.assertEqual(self.network.subnets[0], subnet_query2[0])
188 def test_create_subnet_null_name(self):
190 Tests the neutron_utils.create_neutron_subnet() function for an
191 Exception when the subnet name is None
193 self.network = neutron_utils.create_network(
194 self.neutron, self.os_creds, self.net_config.network_settings)
195 self.assertEqual(self.net_config.network_settings.name,
197 self.assertTrue(validate_network(
198 self.neutron, self.net_config.network_settings.name, True))
200 with self.assertRaises(Exception):
201 SubnetSettings(cidr=self.net_config.subnet_cidr)
203 def test_create_subnet_empty_name(self):
205 Tests the neutron_utils.create_network() function with an empty
208 self.network = neutron_utils.create_network(
209 self.neutron, self.os_creds, self.net_config.network_settings)
210 self.assertEqual(self.net_config.network_settings.name,
212 self.assertTrue(validate_network(
213 self.neutron, self.net_config.network_settings.name, True))
215 subnet_setting = self.net_config.network_settings.subnet_settings[0]
216 self.assertTrue(validate_subnet(
217 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
218 self.assertFalse(validate_subnet(
219 self.neutron, '', subnet_setting.cidr, True))
221 subnet_query1 = neutron_utils.get_subnet(
222 self.neutron, subnet_name=subnet_setting.name)
223 self.assertEqual(self.network.subnets[0], subnet_query1)
225 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
227 self.assertIsNotNone(subnet_query2)
228 self.assertEqual(1, len(subnet_query2))
229 self.assertEqual(self.network.subnets[0], subnet_query2[0])
231 def test_create_subnet_null_cidr(self):
233 Tests the neutron_utils.create_neutron_subnet() function for an
234 Exception when the subnet CIDR value is None
236 self.net_config.network_settings.subnet_settings[0].cidr = None
237 with self.assertRaises(Exception):
238 self.network = neutron_utils.create_network(
239 self.neutron, self.os_creds, self.net_config.network_settings)
241 def test_create_subnet_empty_cidr(self):
243 Tests the neutron_utils.create_neutron_subnet() function for an
244 Exception when the subnet CIDR value is empty
246 self.net_config.network_settings.subnet_settings[0].cidr = ''
247 with self.assertRaises(Exception):
248 self.network = neutron_utils.create_network(
249 self.neutron, self.os_creds, self.net_config.network_settings)
252 class NeutronUtilsIPv6Tests(OSComponentTestCase):
254 Test for creating IPv6 networks with subnets via neutron_utils.py
258 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
259 self.neutron = neutron_utils.neutron_client(self.os_creds)
264 Cleans the remote OpenStack objects
268 neutron_utils.delete_network(self.neutron, self.network)
272 def test_create_network_slaac(self):
274 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
275 is True and IPv6 modes are slaac
277 sub_setting = SubnetSettings(
278 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
279 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
280 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
281 enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
282 self.network_settings = NetworkSettings(
283 name=self.guid + '-net', subnet_settings=[sub_setting])
285 self.network = neutron_utils.create_network(
286 self.neutron, self.os_creds, self.network_settings)
287 self.assertEqual(self.network_settings.name, self.network.name)
289 subnet_settings = self.network_settings.subnet_settings[0]
290 self.assertEqual(1, len(self.network.subnets))
291 subnet = self.network.subnets[0]
293 self.assertEqual(self.network.id, subnet.network_id)
294 self.assertEqual(subnet_settings.name, subnet.name)
295 self.assertEqual(subnet_settings.start, subnet.start)
296 self.assertEqual(subnet_settings.end, subnet.end)
297 self.assertEqual('1:1::/64', subnet.cidr)
298 self.assertEqual(6, subnet.ip_version)
299 self.assertEqual(1, len(subnet.dns_nameservers))
301 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
302 self.assertTrue(subnet.enable_dhcp)
304 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
306 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
308 def test_create_network_stateful(self):
310 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
311 is True and IPv6 modes are stateful
313 sub_setting = SubnetSettings(
314 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
315 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
316 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
317 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
318 ipv6_address_mode='dhcpv6-stateful')
319 self.network_settings = NetworkSettings(
320 name=self.guid + '-net', subnet_settings=[sub_setting])
322 self.network = neutron_utils.create_network(
323 self.neutron, self.os_creds, self.network_settings)
325 self.assertEqual(self.network_settings.name, self.network.name)
327 subnet_settings = self.network_settings.subnet_settings[0]
328 self.assertEqual(1, len(self.network.subnets))
329 subnet = self.network.subnets[0]
331 self.assertEqual(self.network.id, subnet.network_id)
332 self.assertEqual(subnet_settings.name, subnet.name)
333 self.assertEqual(subnet_settings.start, subnet.start)
334 self.assertEqual(subnet_settings.end, subnet.end)
335 self.assertEqual('1:1::/64', subnet.cidr)
336 self.assertEqual(6, subnet.ip_version)
337 self.assertEqual(1, len(subnet.dns_nameservers))
339 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
340 self.assertTrue(subnet.enable_dhcp)
342 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
344 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
346 def test_create_network_stateless(self):
348 Tests the neutron_utils.create_network() when DHCP is enabled and
349 the RA and address modes are both 'slaac'
351 sub_setting = SubnetSettings(
352 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
353 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
354 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
355 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
356 ipv6_address_mode='dhcpv6-stateless')
357 self.network_settings = NetworkSettings(
358 name=self.guid + '-net', subnet_settings=[sub_setting])
360 self.network = neutron_utils.create_network(
361 self.neutron, self.os_creds, self.network_settings)
363 self.assertEqual(self.network_settings.name, self.network.name)
365 subnet_settings = self.network_settings.subnet_settings[0]
366 self.assertEqual(1, len(self.network.subnets))
367 subnet = self.network.subnets[0]
369 self.assertEqual(self.network.id, subnet.network_id)
370 self.assertEqual(subnet_settings.name, subnet.name)
371 self.assertEqual(subnet_settings.start, subnet.start)
372 self.assertEqual(subnet_settings.end, subnet.end)
373 self.assertEqual('1:1::/64', subnet.cidr)
374 self.assertEqual(6, subnet.ip_version)
375 self.assertEqual(1, len(subnet.dns_nameservers))
377 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
378 self.assertTrue(subnet.enable_dhcp)
380 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
382 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
384 def test_create_network_no_dhcp_slaac(self):
386 Tests the neutron_utils.create_network() for a BadRequest when
387 DHCP is not enabled and the RA and address modes are both 'slaac'
389 sub_setting = SubnetSettings(
390 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
391 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
392 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
393 enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
394 self.network_settings = NetworkSettings(
395 name=self.guid + '-net', subnet_settings=[sub_setting])
397 with self.assertRaises(BadRequest):
398 self.network = neutron_utils.create_network(
399 self.neutron, self.os_creds, self.network_settings)
401 def test_create_network_invalid_start_ip(self):
403 Tests the neutron_utils.create_network() that contains one IPv6 subnet
404 with an invalid start IP to ensure Neutron assigns it the smallest IP
407 sub_setting = SubnetSettings(
408 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
410 self.network_settings = NetworkSettings(
411 name=self.guid + '-net', subnet_settings=[sub_setting])
413 self.network = neutron_utils.create_network(
414 self.neutron, self.os_creds, self.network_settings)
416 self.assertEqual('1:1::2', self.network.subnets[0].start)
418 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
420 def test_create_network_invalid_end_ip(self):
422 Tests the neutron_utils.create_network() that contains one IPv6 subnet
423 with an invalid end IP to ensure Neutron assigns it the largest IP
426 sub_setting = SubnetSettings(
427 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
429 self.network_settings = NetworkSettings(
430 name=self.guid + '-net', subnet_settings=[sub_setting])
432 self.network = neutron_utils.create_network(
433 self.neutron, self.os_creds, self.network_settings)
435 self.assertEqual('1:1::2', self.network.subnets[0].start)
437 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
439 def test_create_network_with_bad_cidr(self):
441 Tests the neutron_utils.create_network() for a BadRequest when
442 the subnet CIDR is invalid
444 sub_setting = SubnetSettings(
445 name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
446 self.network_settings = NetworkSettings(
447 name=self.guid + '-net', subnet_settings=[sub_setting])
449 with self.assertRaises(BadRequest):
450 self.network = neutron_utils.create_network(
451 self.neutron, self.os_creds, self.network_settings)
453 def test_create_network_invalid_gateway_ip(self):
455 Tests the neutron_utils.create_network() for a BadRequest when
456 the subnet gateway IP is invalid
458 sub_setting = SubnetSettings(
459 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
461 self.network_settings = NetworkSettings(
462 name=self.guid + '-net', subnet_settings=[sub_setting])
464 with self.assertRaises(BadRequest):
465 self.network = neutron_utils.create_network(
466 self.neutron, self.os_creds, self.network_settings)
468 def test_create_network_with_bad_dns(self):
470 Tests the neutron_utils.create_network() for a BadRequest when
471 the DNS IP is invalid
473 sub_setting = SubnetSettings(
474 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
475 dns_nameservers=['foo'])
476 self.network_settings = NetworkSettings(
477 name=self.guid + '-net', subnet_settings=[sub_setting])
479 with self.assertRaises(BadRequest):
480 self.network = neutron_utils.create_network(
481 self.neutron, self.os_creds, self.network_settings)
484 class NeutronUtilsRouterTests(OSComponentTestCase):
486 Test for creating routers via neutron_utils.py
490 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
491 self.port_name = str(guid) + '-port'
492 self.neutron = neutron_utils.neutron_client(self.os_creds)
496 self.interface_router = None
497 self.net_config = openstack_tests.get_pub_net_config(
498 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
499 router_name=guid + '-pub-router', external_net=self.ext_net_name)
503 Cleans the remote OpenStack objects
505 if self.interface_router:
506 neutron_utils.remove_interface_router(
507 self.neutron, self.router, self.network.subnets[0])
511 neutron_utils.delete_router(self.neutron, self.router)
512 validate_router(self.neutron, self.router.name, False)
518 neutron_utils.delete_port(self.neutron, self.port)
523 neutron_utils.delete_network(self.neutron, self.network)
525 def test_create_router_simple(self):
527 Tests the neutron_utils.create_router()
529 self.router = neutron_utils.create_router(
530 self.neutron, self.os_creds, self.net_config.router_settings)
531 validate_router(self.neutron, self.net_config.router_settings.name,
534 def test_create_router_with_public_interface(self):
536 Tests the neutron_utils.create_router() function with a pubic interface
538 subnet_setting = self.net_config.network_settings.subnet_settings[0]
539 self.net_config = openstack_tests.OSNetworkConfig(
540 self.net_config.network_settings.name,
543 self.net_config.router_settings.name,
545 self.router = neutron_utils.create_router(
546 self.neutron, self.os_creds, self.net_config.router_settings)
547 validate_router(self.neutron, self.net_config.router_settings.name,
550 ext_net = neutron_utils.get_network(
551 self.neutron, network_name=self.ext_net_name)
552 self.assertEqual(self.router.external_network_id, ext_net.id)
554 def test_add_interface_router(self):
556 Tests the neutron_utils.add_interface_router() function
558 self.network = neutron_utils.create_network(
559 self.neutron, self.os_creds, self.net_config.network_settings)
560 self.assertEqual(self.net_config.network_settings.name,
562 self.assertTrue(validate_network(
563 self.neutron, self.net_config.network_settings.name, True))
565 subnet_setting = self.net_config.network_settings.subnet_settings[0]
566 self.assertTrue(validate_subnet(
567 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
569 self.router = neutron_utils.create_router(
570 self.neutron, self.os_creds, self.net_config.router_settings)
571 validate_router(self.neutron, self.net_config.router_settings.name,
574 self.interface_router = neutron_utils.add_interface_router(
575 self.neutron, self.router, self.network.subnets[0])
576 validate_interface_router(self.interface_router, self.router,
577 self.network.subnets[0])
579 def test_add_interface_router_null_router(self):
581 Tests the neutron_utils.add_interface_router() function for an
582 Exception when the router value is None
584 self.network = neutron_utils.create_network(
585 self.neutron, self.os_creds, self.net_config.network_settings)
586 self.assertEqual(self.net_config.network_settings.name,
588 self.assertTrue(validate_network(
589 self.neutron, self.net_config.network_settings.name, True))
591 subnet_setting = self.net_config.network_settings.subnet_settings[0]
592 self.assertTrue(validate_subnet(
593 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
595 with self.assertRaises(NeutronException):
596 self.interface_router = neutron_utils.add_interface_router(
597 self.neutron, self.router, self.network.subnets[0])
599 def test_add_interface_router_null_subnet(self):
601 Tests the neutron_utils.add_interface_router() function for an
602 Exception when the subnet value is None
604 self.network = neutron_utils.create_network(
605 self.neutron, self.os_creds, self.net_config.network_settings)
606 self.assertEqual(self.net_config.network_settings.name,
608 self.assertTrue(validate_network(
609 self.neutron, self.net_config.network_settings.name, True))
611 self.router = neutron_utils.create_router(
612 self.neutron, self.os_creds, self.net_config.router_settings)
613 validate_router(self.neutron, self.net_config.router_settings.name,
616 with self.assertRaises(NeutronException):
617 self.interface_router = neutron_utils.add_interface_router(
618 self.neutron, self.router, None)
620 def test_add_interface_router_missing_subnet(self):
622 Tests the neutron_utils.add_interface_router() function for an
623 Exception when the subnet object has been deleted
625 self.network = neutron_utils.create_network(
626 self.neutron, self.os_creds, self.net_config.network_settings)
627 self.assertEqual(self.net_config.network_settings.name,
629 self.assertTrue(validate_network(
630 self.neutron, self.net_config.network_settings.name, True))
632 self.router = neutron_utils.create_router(
633 self.neutron, self.os_creds, self.net_config.router_settings)
634 validate_router(self.neutron, self.net_config.router_settings.name,
637 for subnet in self.network.subnets:
638 neutron_utils.delete_subnet(self.neutron, subnet)
640 with self.assertRaises(NotFound):
641 self.interface_router = neutron_utils.add_interface_router(
642 self.neutron, self.router, self.network.subnets[0])
644 def test_create_port(self):
646 Tests the neutron_utils.create_port() function
648 self.network = neutron_utils.create_network(
649 self.neutron, self.os_creds, self.net_config.network_settings)
650 self.assertEqual(self.net_config.network_settings.name,
652 self.assertTrue(validate_network(
653 self.neutron, self.net_config.network_settings.name, True))
655 subnet_setting = self.net_config.network_settings.subnet_settings[0]
656 self.assertTrue(validate_subnet(
657 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
659 self.port = neutron_utils.create_port(
660 self.neutron, self.os_creds, PortSettings(
663 'subnet_name': subnet_setting.name,
665 network_name=self.net_config.network_settings.name))
666 validate_port(self.neutron, self.port, self.port_name)
668 def test_create_port_empty_name(self):
670 Tests the neutron_utils.create_port() function
672 self.network = neutron_utils.create_network(
673 self.neutron, self.os_creds, self.net_config.network_settings)
674 self.assertEqual(self.net_config.network_settings.name,
676 self.assertTrue(validate_network(
677 self.neutron, self.net_config.network_settings.name, True))
679 subnet_setting = self.net_config.network_settings.subnet_settings[0]
680 self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
681 subnet_setting.cidr, True))
683 self.port = neutron_utils.create_port(
684 self.neutron, self.os_creds, PortSettings(
686 network_name=self.net_config.network_settings.name,
688 'subnet_name': subnet_setting.name,
690 validate_port(self.neutron, self.port, self.port_name)
692 def test_create_port_null_name(self):
694 Tests the neutron_utils.create_port() when the port name value is None
696 self.network = neutron_utils.create_network(
697 self.neutron, self.os_creds, self.net_config.network_settings)
698 self.assertEqual(self.net_config.network_settings.name,
700 self.assertTrue(validate_network(
701 self.neutron, self.net_config.network_settings.name, True))
703 subnet_setting = self.net_config.network_settings.subnet_settings[0]
704 self.assertTrue(validate_subnet(
705 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
707 self.port = neutron_utils.create_port(
708 self.neutron, self.os_creds,
710 network_name=self.net_config.network_settings.name,
712 'subnet_name': subnet_setting.name,
715 port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
716 self.assertEqual(self.port, port)
718 def test_create_port_null_network_object(self):
720 Tests the neutron_utils.create_port() function for an Exception when
721 the network object is None
723 with self.assertRaises(Exception):
724 self.port = neutron_utils.create_port(
725 self.neutron, self.os_creds,
728 network_name=self.net_config.network_settings.name,
731 self.net_config.network_settings.subnet_settings[
735 def test_create_port_null_ip(self):
737 Tests the neutron_utils.create_port() function for an Exception when
740 self.network = neutron_utils.create_network(
741 self.neutron, self.os_creds, self.net_config.network_settings)
742 self.assertEqual(self.net_config.network_settings.name,
744 self.assertTrue(validate_network(
745 self.neutron, self.net_config.network_settings.name, True))
747 subnet_setting = self.net_config.network_settings.subnet_settings[0]
748 self.assertTrue(validate_subnet(
749 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
751 with self.assertRaises(Exception):
752 self.port = neutron_utils.create_port(
753 self.neutron, self.os_creds,
756 network_name=self.net_config.network_settings.name,
758 'subnet_name': subnet_setting.name,
761 def test_create_port_invalid_ip(self):
763 Tests the neutron_utils.create_port() function for an Exception when
766 self.network = neutron_utils.create_network(
767 self.neutron, self.os_creds, self.net_config.network_settings)
768 self.assertEqual(self.net_config.network_settings.name,
770 self.assertTrue(validate_network(
771 self.neutron, self.net_config.network_settings.name, True))
773 subnet_setting = self.net_config.network_settings.subnet_settings[0]
774 self.assertTrue(validate_subnet(
775 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
777 with self.assertRaises(Exception):
778 self.port = neutron_utils.create_port(
779 self.neutron, self.os_creds,
782 network_name=self.net_config.network_settings.name,
784 'subnet_name': subnet_setting.name,
787 def test_create_port_invalid_ip_to_subnet(self):
789 Tests the neutron_utils.create_port() function for an Exception when
792 self.network = neutron_utils.create_network(
793 self.neutron, self.os_creds, self.net_config.network_settings)
794 self.assertEqual(self.net_config.network_settings.name,
796 self.assertTrue(validate_network(
797 self.neutron, self.net_config.network_settings.name, True))
799 subnet_setting = self.net_config.network_settings.subnet_settings[0]
800 self.assertTrue(validate_subnet(
801 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
803 with self.assertRaises(Exception):
804 self.port = neutron_utils.create_port(
805 self.neutron, self.os_creds,
808 network_name=self.net_config.network_settings.name,
810 'subnet_name': subnet_setting.name,
811 'ip': '10.197.123.100'}]))
814 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
816 Test for creating security groups via neutron_utils.py
820 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
821 self.sec_grp_name = guid + 'name'
823 self.security_groups = list()
824 self.security_group_rules = list()
825 self.neutron = neutron_utils.neutron_client(self.os_creds)
826 self.keystone = keystone_utils.keystone_client(self.os_creds)
830 Cleans the remote OpenStack objects
832 for rule in self.security_group_rules:
833 neutron_utils.delete_security_group_rule(self.neutron, rule)
835 for security_group in self.security_groups:
837 neutron_utils.delete_security_group(self.neutron,
842 def test_create_delete_simple_sec_grp(self):
844 Tests the neutron_utils.create_security_group() function
846 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
847 security_group = neutron_utils.create_security_group(self.neutron,
851 self.assertTrue(sec_grp_settings.name, security_group.name)
853 sec_grp_get = neutron_utils.get_security_group(
854 self.neutron, sec_grp_settings=sec_grp_settings)
855 self.assertIsNotNone(sec_grp_get)
856 self.assertTrue(validation_utils.objects_equivalent(
857 security_group, sec_grp_get))
859 neutron_utils.delete_security_group(self.neutron, security_group)
860 sec_grp_get = neutron_utils.get_security_group(
861 self.neutron, sec_grp_settings=sec_grp_settings)
862 self.assertIsNone(sec_grp_get)
864 def test_create_sec_grp_no_name(self):
866 Tests the SecurityGroupSettings constructor and
867 neutron_utils.create_security_group() function to ensure that
868 attempting to create a security group without a name will raise an
871 with self.assertRaises(Exception):
872 sec_grp_settings = SecurityGroupSettings()
873 self.security_groups.append(
874 neutron_utils.create_security_group(self.neutron,
878 def test_create_sec_grp_no_rules(self):
880 Tests the neutron_utils.create_security_group() function
882 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
883 description='hello group')
884 self.security_groups.append(
885 neutron_utils.create_security_group(self.neutron, self.keystone,
888 self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
889 self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
891 sec_grp_get = neutron_utils.get_security_group(
892 self.neutron, sec_grp_settings=sec_grp_settings)
893 self.assertIsNotNone(sec_grp_get)
894 self.assertEqual(self.security_groups[0], sec_grp_get)
896 def test_create_sec_grp_one_rule(self):
898 Tests the neutron_utils.create_security_group() function
901 sec_grp_rule_settings = SecurityGroupRuleSettings(
902 sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
903 sec_grp_settings = SecurityGroupSettings(
904 name=self.sec_grp_name, description='hello group',
905 rule_settings=[sec_grp_rule_settings])
907 self.security_groups.append(
908 neutron_utils.create_security_group(self.neutron, self.keystone,
910 free_rules = neutron_utils.get_rules_by_security_group(
911 self.neutron, self.security_groups[0])
912 for free_rule in free_rules:
913 self.security_group_rules.append(free_rule)
915 self.security_group_rules.append(
916 neutron_utils.create_security_group_rule(
917 self.neutron, sec_grp_settings.rule_settings[0]))
919 # Refresh object so it is populated with the newly added rule
920 security_group = neutron_utils.get_security_group(
921 self.neutron, sec_grp_settings=sec_grp_settings)
923 rules = neutron_utils.get_rules_by_security_group(self.neutron,
927 validation_utils.objects_equivalent(
928 self.security_group_rules, rules))
930 self.assertTrue(sec_grp_settings.name, security_group.name)
932 sec_grp_get = neutron_utils.get_security_group(
933 self.neutron, sec_grp_settings=sec_grp_settings)
934 self.assertIsNotNone(sec_grp_get)
935 self.assertEqual(security_group, sec_grp_get)
937 def test_get_sec_grp_by_id(self):
939 Tests the neutron_utils.create_security_group() function
942 self.security_groups.append(neutron_utils.create_security_group(
943 self.neutron, self.keystone,
944 SecurityGroupSettings(name=self.sec_grp_name + '-1',
945 description='hello group')))
946 self.security_groups.append(neutron_utils.create_security_group(
947 self.neutron, self.keystone,
948 SecurityGroupSettings(name=self.sec_grp_name + '-2',
949 description='hello group')))
951 sec_grp_1b = neutron_utils.get_security_group_by_id(
952 self.neutron, self.security_groups[0].id)
953 sec_grp_2b = neutron_utils.get_security_group_by_id(
954 self.neutron, self.security_groups[1].id)
956 self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
957 self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
960 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
962 Test basic nova keypair functionality
967 Instantiates the CreateImage object that is responsible for downloading
968 and creating an OS image file within OpenStack
970 self.neutron = neutron_utils.neutron_client(self.os_creds)
971 self.floating_ip = None
975 Cleans the image and downloaded image file
979 neutron_utils.delete_floating_ip(
980 self.neutron, self.floating_ip)
984 def test_floating_ips(self):
986 Tests the creation of a floating IP
989 initial_fips = neutron_utils.get_floating_ips(self.neutron)
991 self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
993 all_fips = neutron_utils.get_floating_ips(self.neutron)
994 self.assertEqual(len(initial_fips) + 1, len(all_fips))
995 returned = neutron_utils.get_floating_ip(self.neutron,
997 self.assertEqual(self.floating_ip.id, returned.id)
998 self.assertEqual(self.floating_ip.ip, returned.ip)
1006 def validate_network(neutron, name, exists):
1008 Returns true if a network for a given name DOES NOT exist if the exists
1009 parameter is false conversely true. Returns false if a network for a given
1010 name DOES exist if the exists parameter is true conversely false.
1011 :param neutron: The neutron client
1012 :param name: The expected network name
1013 :param exists: Whether or not the network name should exist or not
1016 network = neutron_utils.get_network(neutron, network_name=name)
1017 if exists and network:
1019 if not exists and not network:
1024 def validate_subnet(neutron, name, cidr, exists):
1026 Returns true if a subnet for a given name DOES NOT exist if the exists
1027 parameter is false conversely true. Returns false if a subnet for a given
1028 name DOES exist if the exists parameter is true conversely false.
1029 :param neutron: The neutron client
1030 :param name: The expected subnet name
1031 :param cidr: The expected CIDR value
1032 :param exists: Whether or not the network name should exist or not
1035 subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
1036 if exists and subnet and subnet.name == name:
1037 return subnet.cidr == cidr
1038 if not exists and not subnet:
1043 def validate_router(neutron, name, exists):
1045 Returns true if a router for a given name DOES NOT exist if the exists
1046 parameter is false conversely true. Returns false if a router for a given
1047 name DOES exist if the exists parameter is true conversely false.
1048 :param neutron: The neutron client
1049 :param name: The expected router name
1050 :param exists: Whether or not the network name should exist or not
1053 router = neutron_utils.get_router(neutron, router_name=name)
1054 if exists and router:
1059 def validate_interface_router(interface_router, router, subnet):
1061 Returns true if the router ID & subnet ID have been properly included into
1062 the interface router object
1063 :param interface_router: the SNAPS-OO InterfaceRouter domain object
1064 :param router: to validate against the interface_router
1065 :param subnet: to validate against the interface_router
1066 :return: True if both IDs match else False
1068 subnet_id = interface_router.subnet_id
1069 router_id = interface_router.port_id
1071 return subnet.id == subnet_id and router.id == router_id
1074 def validate_port(neutron, port_obj, this_port_name):
1076 Returns true if a port for a given name DOES NOT exist if the exists
1077 parameter is false conversely true. Returns false if a port for a given
1078 name DOES exist if the exists parameter is true conversely false.
1079 :param neutron: The neutron client
1080 :param port_obj: The port object to lookup
1081 :param this_port_name: The expected router name
1084 os_ports = neutron.list_ports()
1085 for os_port, os_port_insts in os_ports.items():
1086 for os_inst in os_port_insts:
1087 if os_inst['id'] == port_obj.id:
1088 return os_inst['name'] == this_port_name