1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from neutronclient.common.exceptions import NotFound, BadRequest
19 from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
20 from snaps.config.security_group import (
21 SecurityGroupConfig, SecurityGroupRuleConfig, Direction)
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
29 __author__ = 'spisarski'
35 class NeutronSmokeTests(OSComponentTestCase):
37 Tests to ensure that the neutron client can communicate with the cloud
40 def test_neutron_connect_success(self):
42 Tests to ensure that the proper credentials can connect.
44 neutron = neutron_utils.neutron_client(self.os_creds, self.os_session)
46 networks = neutron.list_networks()
49 networks = networks.get('networks')
50 for network in networks:
51 if network.get('name') == self.ext_net_name:
53 self.assertTrue(found)
55 def test_neutron_connect_fail(self):
57 Tests to ensure that the improper credentials cannot connect.
59 from snaps.openstack.os_credentials import OSCreds
61 with self.assertRaises(Exception):
62 neutron = neutron_utils.neutron_client(
63 OSCreds(username='user', password='pass', auth_url='url',
64 project_name='project'))
65 neutron.list_networks()
67 def test_retrieve_ext_network_name(self):
69 Tests the neutron_utils.get_external_network_names to ensure the
70 configured self.ext_net_name is contained within the returned list
73 neutron = neutron_utils.neutron_client(self.os_creds, self.os_session)
74 ext_networks = neutron_utils.get_external_networks(neutron)
76 for network in ext_networks:
77 if network.name == self.ext_net_name:
80 self.assertTrue(found)
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
85 Test for creating networks via neutron_utils.py
89 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90 self.port_name = str(guid) + '-port'
91 self.neutron = neutron_utils.neutron_client(
92 self.os_creds, self.os_session)
93 self.keystone = keystone_utils.keystone_client(
94 self.os_creds, self.os_session)
96 self.net_config = openstack_tests.get_pub_net_config(
97 project_name=self.os_creds.project_name,
98 net_name=guid + '-pub-net')
102 Cleans the remote OpenStack objects
105 neutron_utils.delete_network(self.neutron, self.network)
107 super(self.__class__, self).__clean__()
109 def test_create_network(self):
111 Tests the neutron_utils.create_network() function
113 self.network = neutron_utils.create_network(
114 self.neutron, self.os_creds, self.net_config.network_settings)
115 self.assertEqual(self.net_config.network_settings.name,
117 self.assertTrue(validate_network(
118 self.neutron, self.keystone,
119 self.net_config.network_settings.name, True,
120 self.os_creds.project_name))
121 self.assertEqual(len(self.net_config.network_settings.subnet_settings),
122 len(self.network.subnets))
124 def test_create_network_empty_name(self):
126 Tests the neutron_utils.create_network() function with an empty
129 with self.assertRaises(Exception):
130 self.network = neutron_utils.create_network(
131 self.neutron, self.os_creds,
132 network_settings=NetworkConfig(name=''))
134 def test_create_network_null_name(self):
136 Tests the neutron_utils.create_network() function when the network
139 with self.assertRaises(Exception):
140 self.network = neutron_utils.create_network(
141 self.neutron, self.os_creds,
142 network_settings=NetworkConfig())
145 class NeutronUtilsSubnetTests(OSComponentTestCase):
147 Test for creating networks with subnets via neutron_utils.py
151 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
152 self.port_name = str(guid) + '-port'
153 self.neutron = neutron_utils.neutron_client(
154 self.os_creds, self.os_session)
155 self.keystone = keystone_utils.keystone_client(
156 self.os_creds, self.os_session)
158 self.net_config = openstack_tests.get_pub_net_config(
159 project_name=self.os_creds.project_name,
160 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
161 external_net=self.ext_net_name)
165 Cleans the remote OpenStack objects
169 neutron_utils.delete_network(self.neutron, self.network)
173 super(self.__class__, self).__clean__()
175 def test_create_subnet(self):
177 Tests the neutron_utils.create_network() function
179 self.network = neutron_utils.create_network(
180 self.neutron, self.os_creds, self.net_config.network_settings)
181 self.assertEqual(self.net_config.network_settings.name,
183 self.assertTrue(validate_network(
184 self.neutron, self.keystone,
185 self.net_config.network_settings.name, True,
186 self.os_creds.project_name))
188 subnet_setting = self.net_config.network_settings.subnet_settings[0]
189 self.assertTrue(validate_subnet(
190 self.neutron, self.network, subnet_setting.name,
191 subnet_setting.cidr, True))
193 subnet_query1 = neutron_utils.get_subnet(
194 self.neutron, self.network, subnet_name=subnet_setting.name)
195 self.assertEqual(self.network.subnets[0], subnet_query1)
197 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
199 self.assertIsNotNone(subnet_query2)
200 self.assertEqual(1, len(subnet_query2))
201 self.assertEqual(self.network.subnets[0], subnet_query2[0])
203 subnet_query3 = neutron_utils.get_subnet_by_name(
204 self.neutron, self.keystone, subnet_setting.name,
205 self.os_creds.project_name)
206 self.assertIsNotNone(subnet_query3)
207 self.assertEqual(self.network.subnets[0], subnet_query3)
209 def test_create_subnet_null_name(self):
211 Tests the neutron_utils.create_neutron_subnet() function for an
212 Exception when the subnet name is None
214 self.network = neutron_utils.create_network(
215 self.neutron, self.os_creds, self.net_config.network_settings)
216 self.assertEqual(self.net_config.network_settings.name,
218 self.assertTrue(validate_network(
219 self.neutron, self.keystone,
220 self.net_config.network_settings.name, True,
221 self.os_creds.project_name))
223 with self.assertRaises(Exception):
224 SubnetConfig(cidr=self.net_config.subnet_cidr)
226 def test_create_subnet_empty_name(self):
228 Tests the neutron_utils.create_network() function with an empty
231 self.network = neutron_utils.create_network(
232 self.neutron, self.os_creds, self.net_config.network_settings)
233 self.assertEqual(self.net_config.network_settings.name,
235 self.assertTrue(validate_network(
236 self.neutron, self.keystone,
237 self.net_config.network_settings.name, True,
238 self.os_creds.project_name))
240 subnet_setting = self.net_config.network_settings.subnet_settings[0]
241 self.assertTrue(validate_subnet(
242 self.neutron, self.network, subnet_setting.name,
243 subnet_setting.cidr, True))
244 self.assertFalse(validate_subnet(
245 self.neutron, self.network, '', subnet_setting.cidr, True))
247 subnet_query1 = neutron_utils.get_subnet(
248 self.neutron, self.network, subnet_name=subnet_setting.name)
249 self.assertEqual(self.network.subnets[0], subnet_query1)
251 subnet_query2 = neutron_utils.get_subnets_by_network(
252 self.neutron, self.network)
253 self.assertIsNotNone(subnet_query2)
254 self.assertEqual(1, len(subnet_query2))
255 self.assertEqual(self.network.subnets[0], subnet_query2[0])
257 def test_create_subnet_null_cidr(self):
259 Tests the neutron_utils.create_neutron_subnet() function for an
260 Exception when the subnet CIDR value is None
262 self.net_config.network_settings.subnet_settings[0].cidr = None
263 with self.assertRaises(Exception):
264 self.network = neutron_utils.create_network(
265 self.neutron, self.os_creds, self.net_config.network_settings)
267 def test_create_subnet_empty_cidr(self):
269 Tests the neutron_utils.create_neutron_subnet() function for an
270 Exception when the subnet CIDR value is empty
272 self.net_config.network_settings.subnet_settings[0].cidr = ''
273 with self.assertRaises(Exception):
274 self.network = neutron_utils.create_network(
275 self.neutron, self.os_creds, self.net_config.network_settings)
278 class NeutronUtilsIPv6Tests(OSComponentTestCase):
280 Test for creating IPv6 networks with subnets via neutron_utils.py
284 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
285 self.neutron = neutron_utils.neutron_client(
286 self.os_creds, self.os_session)
291 Cleans the remote OpenStack objects
295 neutron_utils.delete_network(self.neutron, self.network)
299 super(self.__class__, self).__clean__()
301 def test_create_network_slaac(self):
303 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
304 is True and IPv6 modes are slaac
306 sub_setting = SubnetConfig(
307 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
308 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
309 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
310 enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
311 self.network_settings = NetworkConfig(
312 name=self.guid + '-net', subnet_settings=[sub_setting])
314 self.network = neutron_utils.create_network(
315 self.neutron, self.os_creds, self.network_settings)
316 self.assertEqual(self.network_settings.name, self.network.name)
318 subnet_settings = self.network_settings.subnet_settings[0]
319 self.assertEqual(1, len(self.network.subnets))
320 subnet = self.network.subnets[0]
322 self.assertEqual(self.network.id, subnet.network_id)
323 self.assertEqual(subnet_settings.name, subnet.name)
324 self.assertEqual(subnet_settings.start, subnet.start)
325 self.assertEqual(subnet_settings.end, subnet.end)
326 self.assertEqual('1:1::/64', subnet.cidr)
327 self.assertEqual(6, subnet.ip_version)
328 self.assertEqual(1, len(subnet.dns_nameservers))
330 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
331 self.assertTrue(subnet.enable_dhcp)
333 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
335 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
337 def test_create_network_stateful(self):
339 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
340 is True and IPv6 modes are stateful
342 sub_setting = SubnetConfig(
343 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
344 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
345 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
346 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
347 ipv6_address_mode='dhcpv6-stateful')
348 self.network_settings = NetworkConfig(
349 name=self.guid + '-net', subnet_settings=[sub_setting])
351 self.network = neutron_utils.create_network(
352 self.neutron, self.os_creds, self.network_settings)
354 self.assertEqual(self.network_settings.name, self.network.name)
356 subnet_settings = self.network_settings.subnet_settings[0]
357 self.assertEqual(1, len(self.network.subnets))
358 subnet = self.network.subnets[0]
360 self.assertEqual(self.network.id, subnet.network_id)
361 self.assertEqual(subnet_settings.name, subnet.name)
362 self.assertEqual(subnet_settings.start, subnet.start)
363 self.assertEqual(subnet_settings.end, subnet.end)
364 self.assertEqual('1:1::/64', subnet.cidr)
365 self.assertEqual(6, subnet.ip_version)
366 self.assertEqual(1, len(subnet.dns_nameservers))
368 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
369 self.assertTrue(subnet.enable_dhcp)
371 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
373 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
375 def test_create_network_stateless(self):
377 Tests the neutron_utils.create_network() when DHCP is enabled and
378 the RA and address modes are both 'slaac'
380 sub_setting = SubnetConfig(
381 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
382 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
383 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
384 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
385 ipv6_address_mode='dhcpv6-stateless')
386 self.network_settings = NetworkConfig(
387 name=self.guid + '-net', subnet_settings=[sub_setting])
389 self.network = neutron_utils.create_network(
390 self.neutron, self.os_creds, self.network_settings)
392 self.assertEqual(self.network_settings.name, self.network.name)
394 subnet_settings = self.network_settings.subnet_settings[0]
395 self.assertEqual(1, len(self.network.subnets))
396 subnet = self.network.subnets[0]
398 self.assertEqual(self.network.id, subnet.network_id)
399 self.assertEqual(subnet_settings.name, subnet.name)
400 self.assertEqual(subnet_settings.start, subnet.start)
401 self.assertEqual(subnet_settings.end, subnet.end)
402 self.assertEqual('1:1::/64', subnet.cidr)
403 self.assertEqual(6, subnet.ip_version)
404 self.assertEqual(1, len(subnet.dns_nameservers))
406 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
407 self.assertTrue(subnet.enable_dhcp)
409 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
411 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
413 def test_create_network_no_dhcp_slaac(self):
415 Tests the neutron_utils.create_network() for a BadRequest when
416 DHCP is not enabled and the RA and address modes are both 'slaac'
418 sub_setting = SubnetConfig(
419 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
420 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
421 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
422 enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
423 self.network_settings = NetworkConfig(
424 name=self.guid + '-net', subnet_settings=[sub_setting])
426 with self.assertRaises(BadRequest):
427 self.network = neutron_utils.create_network(
428 self.neutron, self.os_creds, self.network_settings)
430 def test_create_network_invalid_start_ip(self):
432 Tests the neutron_utils.create_network() that contains one IPv6 subnet
433 with an invalid start IP to ensure Neutron assigns it the smallest IP
436 sub_setting = SubnetConfig(
437 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
439 self.network_settings = NetworkConfig(
440 name=self.guid + '-net', subnet_settings=[sub_setting])
442 self.network = neutron_utils.create_network(
443 self.neutron, self.os_creds, self.network_settings)
445 self.assertEqual('1:1::2', self.network.subnets[0].start)
447 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
449 def test_create_network_invalid_end_ip(self):
451 Tests the neutron_utils.create_network() that contains one IPv6 subnet
452 with an invalid end IP to ensure Neutron assigns it the largest IP
455 sub_setting = SubnetConfig(
456 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
458 self.network_settings = NetworkConfig(
459 name=self.guid + '-net', subnet_settings=[sub_setting])
461 self.network = neutron_utils.create_network(
462 self.neutron, self.os_creds, self.network_settings)
464 self.assertEqual('1:1::2', self.network.subnets[0].start)
466 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
468 def test_create_network_with_bad_cidr(self):
470 Tests the neutron_utils.create_network() for a BadRequest when
471 the subnet CIDR is invalid
473 sub_setting = SubnetConfig(
474 name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
475 self.network_settings = NetworkConfig(
476 name=self.guid + '-net', subnet_settings=[sub_setting])
478 with self.assertRaises(BadRequest):
479 self.network = neutron_utils.create_network(
480 self.neutron, self.os_creds, self.network_settings)
482 def test_create_network_invalid_gateway_ip(self):
484 Tests the neutron_utils.create_network() for a BadRequest when
485 the subnet gateway IP is invalid
487 sub_setting = SubnetConfig(
488 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
489 gateway_ip='192.168.0.1')
490 self.network_settings = NetworkConfig(
491 name=self.guid + '-net', subnet_settings=[sub_setting])
493 with self.assertRaises(BadRequest):
494 self.network = neutron_utils.create_network(
495 self.neutron, self.os_creds, self.network_settings)
497 def test_create_network_with_bad_dns(self):
499 Tests the neutron_utils.create_network() for a BadRequest when
500 the DNS IP is invalid
502 sub_setting = SubnetConfig(
503 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
504 dns_nameservers=['foo'])
505 self.network_settings = NetworkConfig(
506 name=self.guid + '-net', subnet_settings=[sub_setting])
508 with self.assertRaises(BadRequest):
509 self.network = neutron_utils.create_network(
510 self.neutron, self.os_creds, self.network_settings)
513 class NeutronUtilsRouterTests(OSComponentTestCase):
515 Test for creating routers via neutron_utils.py
519 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
520 self.port_name = str(guid) + '-port'
521 self.neutron = neutron_utils.neutron_client(
522 self.os_creds, self.os_session)
523 self.keystone = keystone_utils.keystone_client(
524 self.os_creds, self.os_session)
528 self.interface_router = None
529 self.net_config = openstack_tests.get_pub_net_config(
530 project_name=self.os_creds.project_name,
531 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
532 router_name=guid + '-pub-router', external_net=self.ext_net_name)
536 Cleans the remote OpenStack objects
538 if self.interface_router:
539 neutron_utils.remove_interface_router(
540 self.neutron, self.router, self.network.subnets[0])
544 neutron_utils.delete_router(self.neutron, self.router)
546 self.neutron, self.keystone, self.router.name,
547 self.os_creds.project_name, False)
553 neutron_utils.delete_port(self.neutron, self.port)
558 neutron_utils.delete_network(self.neutron, self.network)
560 super(self.__class__, self).__clean__()
562 def test_create_router_simple(self):
564 Tests the neutron_utils.create_router()
566 self.router = neutron_utils.create_router(
567 self.neutron, self.os_creds, self.net_config.router_settings)
569 self.neutron, self.keystone, self.net_config.router_settings.name,
570 self.os_creds.project_name, True)
572 def test_create_router_with_public_interface(self):
574 Tests the neutron_utils.create_router() function with a pubic interface
576 subnet_setting = self.net_config.network_settings.subnet_settings[0]
577 self.net_config = openstack_tests.OSNetworkConfig(
578 project_name=self.os_creds.project_name,
579 net_name=self.net_config.network_settings.name,
580 subnet_name=subnet_setting.name, subnet_cidr=subnet_setting.cidr,
581 router_name=self.net_config.router_settings.name,
582 external_gateway=self.ext_net_name)
583 self.router = neutron_utils.create_router(
584 self.neutron, self.os_creds, self.net_config.router_settings)
586 self.neutron, self.keystone, self.net_config.router_settings.name,
587 self.os_creds.project_name, True)
589 ext_net = neutron_utils.get_network(
590 self.neutron, self.keystone, network_name=self.ext_net_name)
591 self.assertEqual(self.router.external_network_id, ext_net.id)
593 def test_add_interface_router(self):
595 Tests the neutron_utils.add_interface_router() function
597 self.network = neutron_utils.create_network(
598 self.neutron, self.os_creds, self.net_config.network_settings)
599 self.assertEqual(self.net_config.network_settings.name,
601 self.assertTrue(validate_network(
602 self.neutron, self.keystone,
603 self.net_config.network_settings.name, True,
604 self.os_creds.project_name))
606 subnet_setting = self.net_config.network_settings.subnet_settings[0]
607 self.assertTrue(validate_subnet(
608 self.neutron, self.network, subnet_setting.name,
609 subnet_setting.cidr, True))
611 self.router = neutron_utils.create_router(
612 self.neutron, self.os_creds, self.net_config.router_settings)
614 self.neutron, self.keystone, self.net_config.router_settings.name,
615 self.os_creds.project_name, True)
617 self.interface_router = neutron_utils.add_interface_router(
618 self.neutron, self.router, self.network.subnets[0])
619 validate_interface_router(self.interface_router, self.router,
620 self.network.subnets[0])
622 def test_add_interface_router_null_router(self):
624 Tests the neutron_utils.add_interface_router() function for an
625 Exception when the router value is None
627 self.network = neutron_utils.create_network(
628 self.neutron, self.os_creds, self.net_config.network_settings)
629 self.assertEqual(self.net_config.network_settings.name,
631 self.assertTrue(validate_network(
632 self.neutron, self.keystone,
633 self.net_config.network_settings.name, True,
634 self.os_creds.project_name))
636 subnet_setting = self.net_config.network_settings.subnet_settings[0]
637 self.assertTrue(validate_subnet(
638 self.neutron, self.network, subnet_setting.name,
639 subnet_setting.cidr, True))
641 with self.assertRaises(NeutronException):
642 self.interface_router = neutron_utils.add_interface_router(
643 self.neutron, self.router, self.network.subnets[0])
645 def test_add_interface_router_null_subnet(self):
647 Tests the neutron_utils.add_interface_router() function for an
648 Exception when the subnet value is None
650 self.network = neutron_utils.create_network(
651 self.neutron, self.os_creds, self.net_config.network_settings)
652 self.assertEqual(self.net_config.network_settings.name,
654 self.assertTrue(validate_network(
655 self.neutron, self.keystone,
656 self.net_config.network_settings.name, True,
657 self.os_creds.project_name))
659 self.router = neutron_utils.create_router(
660 self.neutron, self.os_creds, self.net_config.router_settings)
662 self.neutron, self.keystone, self.net_config.router_settings.name,
663 self.os_creds.project_name, True)
665 with self.assertRaises(NeutronException):
666 self.interface_router = neutron_utils.add_interface_router(
667 self.neutron, self.router, None)
669 def test_add_interface_router_missing_subnet(self):
671 Tests the neutron_utils.add_interface_router() function for an
672 Exception when the subnet object has been deleted
674 self.network = neutron_utils.create_network(
675 self.neutron, self.os_creds, self.net_config.network_settings)
676 self.assertEqual(self.net_config.network_settings.name,
678 self.assertTrue(validate_network(
679 self.neutron, self.keystone,
680 self.net_config.network_settings.name, True,
681 self.os_creds.project_name))
683 self.router = neutron_utils.create_router(
684 self.neutron, self.os_creds, self.net_config.router_settings)
686 self.neutron, self.keystone, self.net_config.router_settings.name,
687 self.os_creds.project_name, True)
689 for subnet in self.network.subnets:
690 neutron_utils.delete_subnet(self.neutron, subnet)
692 with self.assertRaises(NotFound):
693 self.interface_router = neutron_utils.add_interface_router(
694 self.neutron, self.router, self.network.subnets[0])
696 def test_create_port(self):
698 Tests the neutron_utils.create_port() function
700 self.network = neutron_utils.create_network(
701 self.neutron, self.os_creds, self.net_config.network_settings)
702 self.assertEqual(self.net_config.network_settings.name,
704 self.assertTrue(validate_network(
705 self.neutron, self.keystone,
706 self.net_config.network_settings.name, True,
707 self.os_creds.project_name))
709 subnet_setting = self.net_config.network_settings.subnet_settings[0]
710 self.assertTrue(validate_subnet(
711 self.neutron, self.network, subnet_setting.name,
712 subnet_setting.cidr, True))
714 self.port = neutron_utils.create_port(
715 self.neutron, self.os_creds, PortConfig(
718 'subnet_name': subnet_setting.name,
720 network_name=self.net_config.network_settings.name))
721 validate_port(self.neutron, self.port, self.port_name)
723 def test_create_port_empty_name(self):
725 Tests the neutron_utils.create_port() function
727 self.network = neutron_utils.create_network(
728 self.neutron, self.os_creds, self.net_config.network_settings)
729 self.assertEqual(self.net_config.network_settings.name,
731 self.assertTrue(validate_network(
732 self.neutron, self.keystone,
733 self.net_config.network_settings.name, True,
734 self.os_creds.project_name))
736 subnet_setting = self.net_config.network_settings.subnet_settings[0]
737 self.assertTrue(validate_subnet(
738 self.neutron, self.network, subnet_setting.name,
739 subnet_setting.cidr, True))
741 self.port = neutron_utils.create_port(
742 self.neutron, self.os_creds, PortConfig(
744 network_name=self.net_config.network_settings.name,
746 'subnet_name': subnet_setting.name,
748 validate_port(self.neutron, self.port, self.port_name)
750 def test_create_port_null_name(self):
752 Tests the neutron_utils.create_port() when the port name value is None
754 self.network = neutron_utils.create_network(
755 self.neutron, self.os_creds, self.net_config.network_settings)
756 self.assertEqual(self.net_config.network_settings.name,
758 self.assertTrue(validate_network(
759 self.neutron, self.keystone,
760 self.net_config.network_settings.name, True,
761 self.os_creds.project_name))
763 subnet_setting = self.net_config.network_settings.subnet_settings[0]
764 self.assertTrue(validate_subnet(
765 self.neutron, self.network, subnet_setting.name,
766 subnet_setting.cidr, True))
768 self.port = neutron_utils.create_port(
769 self.neutron, self.os_creds,
771 network_name=self.net_config.network_settings.name,
773 'subnet_name': subnet_setting.name,
776 port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
777 self.assertEqual(self.port, port)
779 def test_create_port_null_network_object(self):
781 Tests the neutron_utils.create_port() function for an Exception when
782 the network object is None
784 with self.assertRaises(Exception):
785 self.port = neutron_utils.create_port(
786 self.neutron, self.os_creds,
789 network_name=self.net_config.network_settings.name,
792 self.net_config.network_settings.subnet_settings[
796 def test_create_port_null_ip(self):
798 Tests the neutron_utils.create_port() function for an Exception when
801 self.network = neutron_utils.create_network(
802 self.neutron, self.os_creds, self.net_config.network_settings)
803 self.assertEqual(self.net_config.network_settings.name,
805 self.assertTrue(validate_network(
806 self.neutron, self.keystone,
807 self.net_config.network_settings.name, True,
808 self.os_creds.project_name))
810 subnet_setting = self.net_config.network_settings.subnet_settings[0]
811 self.assertTrue(validate_subnet(
812 self.neutron, self.network, subnet_setting.name,
813 subnet_setting.cidr, True))
815 with self.assertRaises(Exception):
816 self.port = neutron_utils.create_port(
817 self.neutron, self.os_creds,
820 network_name=self.net_config.network_settings.name,
822 'subnet_name': subnet_setting.name,
825 def test_create_port_invalid_ip(self):
827 Tests the neutron_utils.create_port() function for an Exception when
830 self.network = neutron_utils.create_network(
831 self.neutron, self.os_creds, self.net_config.network_settings)
832 self.assertEqual(self.net_config.network_settings.name,
834 self.assertTrue(validate_network(
835 self.neutron, self.keystone,
836 self.net_config.network_settings.name, True,
837 self.os_creds.project_name))
839 subnet_setting = self.net_config.network_settings.subnet_settings[0]
840 self.assertTrue(validate_subnet(
841 self.neutron, self.network, subnet_setting.name,
842 subnet_setting.cidr, True))
844 with self.assertRaises(Exception):
845 self.port = neutron_utils.create_port(
846 self.neutron, self.os_creds,
849 network_name=self.net_config.network_settings.name,
851 'subnet_name': subnet_setting.name,
854 def test_create_port_invalid_ip_to_subnet(self):
856 Tests the neutron_utils.create_port() function for an Exception when
859 self.network = neutron_utils.create_network(
860 self.neutron, self.os_creds, self.net_config.network_settings)
861 self.assertEqual(self.net_config.network_settings.name,
863 self.assertTrue(validate_network(
864 self.neutron, self.keystone,
865 self.net_config.network_settings.name, True,
866 self.os_creds.project_name))
868 subnet_setting = self.net_config.network_settings.subnet_settings[0]
869 self.assertTrue(validate_subnet(
870 self.neutron, self.network, subnet_setting.name,
871 subnet_setting.cidr, True))
873 with self.assertRaises(Exception):
874 self.port = neutron_utils.create_port(
875 self.neutron, self.os_creds,
878 network_name=self.net_config.network_settings.name,
880 'subnet_name': subnet_setting.name,
881 'ip': '10.197.123.100'}]))
884 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
886 Test for creating security groups via neutron_utils.py
890 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
891 self.sec_grp_name = guid + 'name'
893 self.security_groups = list()
894 self.security_group_rules = list()
895 self.neutron = neutron_utils.neutron_client(
896 self.os_creds, self.os_session)
897 self.keystone = keystone_utils.keystone_client(
898 self.os_creds, self.os_session)
902 Cleans the remote OpenStack objects
904 for rule in self.security_group_rules:
905 neutron_utils.delete_security_group_rule(self.neutron, rule)
907 for security_group in self.security_groups:
909 neutron_utils.delete_security_group(self.neutron,
914 super(self.__class__, self).__clean__()
916 def test_create_delete_simple_sec_grp(self):
918 Tests the neutron_utils.create_security_group() function
920 sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
921 security_group = neutron_utils.create_security_group(
922 self.neutron, self.keystone, sec_grp_settings)
924 self.assertTrue(sec_grp_settings.name, security_group.name)
926 sec_grp_get = neutron_utils.get_security_group(
927 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
928 self.assertIsNotNone(sec_grp_get)
929 self.assertTrue(validation_utils.objects_equivalent(
930 security_group, sec_grp_get))
932 neutron_utils.delete_security_group(self.neutron, security_group)
933 sec_grp_get = neutron_utils.get_security_group(
934 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
935 self.assertIsNone(sec_grp_get)
937 def test_create_sec_grp_no_name(self):
939 Tests the SecurityGroupConfig constructor and
940 neutron_utils.create_security_group() function to ensure that
941 attempting to create a security group without a name will raise an
944 with self.assertRaises(Exception):
945 sec_grp_settings = SecurityGroupConfig()
946 self.security_groups.append(
947 neutron_utils.create_security_group(
948 self.neutron, self.keystone, sec_grp_settings))
950 def test_create_sec_grp_no_rules(self):
952 Tests the neutron_utils.create_security_group() function
954 sec_grp_settings = SecurityGroupConfig(
955 name=self.sec_grp_name, description='hello group')
956 self.security_groups.append(
957 neutron_utils.create_security_group(
958 self.neutron, self.keystone, sec_grp_settings))
960 self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
961 self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
963 sec_grp_get = neutron_utils.get_security_group(
964 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
965 self.assertIsNotNone(sec_grp_get)
966 self.assertEqual(self.security_groups[0], sec_grp_get)
968 def test_create_sec_grp_one_rule(self):
970 Tests the neutron_utils.create_security_group() function
973 sec_grp_rule_settings = SecurityGroupRuleConfig(
974 sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
975 sec_grp_settings = SecurityGroupConfig(
976 name=self.sec_grp_name, description='hello group',
977 rule_settings=[sec_grp_rule_settings])
979 self.security_groups.append(
980 neutron_utils.create_security_group(
981 self.neutron, self.keystone, sec_grp_settings))
982 free_rules = neutron_utils.get_rules_by_security_group(
983 self.neutron, self.security_groups[0])
984 for free_rule in free_rules:
985 self.security_group_rules.append(free_rule)
987 keystone = keystone_utils.keystone_client(
988 self.os_creds, self.os_session)
989 self.security_group_rules.append(
990 neutron_utils.create_security_group_rule(
991 self.neutron, keystone, sec_grp_settings.rule_settings[0],
992 self.os_creds.project_name))
994 # Refresh object so it is populated with the newly added rule
995 security_group = neutron_utils.get_security_group(
996 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
998 rules = neutron_utils.get_rules_by_security_group(
999 self.neutron, security_group)
1002 validation_utils.objects_equivalent(
1003 self.security_group_rules, rules))
1005 self.assertTrue(sec_grp_settings.name, security_group.name)
1007 sec_grp_get = neutron_utils.get_security_group(
1008 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
1009 self.assertIsNotNone(sec_grp_get)
1010 self.assertEqual(security_group, sec_grp_get)
1012 def test_get_sec_grp_by_id(self):
1014 Tests the neutron_utils.create_security_group() function
1017 self.security_groups.append(neutron_utils.create_security_group(
1018 self.neutron, self.keystone,
1019 SecurityGroupConfig(
1020 name=self.sec_grp_name + '-1', description='hello group')))
1021 self.security_groups.append(neutron_utils.create_security_group(
1022 self.neutron, self.keystone,
1023 SecurityGroupConfig(
1024 name=self.sec_grp_name + '-2', description='hello group')))
1026 sec_grp_1b = neutron_utils.get_security_group_by_id(
1027 self.neutron, self.security_groups[0].id)
1028 sec_grp_2b = neutron_utils.get_security_group_by_id(
1029 self.neutron, self.security_groups[1].id)
1031 self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
1032 self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
1034 def test_create_list_sec_grp_no_rules(self):
1036 Tests the neutron_utils.create_security_group() and
1037 list_security_groups function
1039 sec_grp_settings = SecurityGroupConfig(
1040 name=self.sec_grp_name + "-1", description='hello group')
1041 self.security_groups.append(neutron_utils.create_security_group(
1042 self.neutron, self.keystone, sec_grp_settings))
1044 sec_grp_settings2 = SecurityGroupConfig(
1045 name=self.sec_grp_name + "-2", description='hola group')
1046 self.security_groups.append(neutron_utils.create_security_group(
1047 self.neutron, self.keystone, sec_grp_settings2))
1049 returned_sec_groups = neutron_utils.list_security_groups(self.neutron)
1051 self.assertIsNotNone(returned_sec_groups)
1053 for sg in returned_sec_groups:
1054 if sec_grp_settings.name == sg.name:
1056 elif sec_grp_settings2.name == sg.name:
1059 self.assertEqual(worked, 2)
1062 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
1064 Test basic nova keypair functionality
1069 Instantiates the CreateImage object that is responsible for downloading
1070 and creating an OS image file within OpenStack
1072 self.neutron = neutron_utils.neutron_client(
1073 self.os_creds, self.os_session)
1074 self.keystone = keystone_utils.keystone_client(
1075 self.os_creds, self.os_session)
1076 self.floating_ip = None
1080 Cleans the image and downloaded image file
1082 if self.floating_ip:
1084 neutron_utils.delete_floating_ip(
1085 self.neutron, self.floating_ip)
1089 super(self.__class__, self).__clean__()
1091 def test_floating_ips(self):
1093 Tests the creation of a floating IP
1096 initial_fips = neutron_utils.get_floating_ips(self.neutron)
1098 self.floating_ip = neutron_utils.create_floating_ip(
1099 self.neutron, self.keystone, self.ext_net_name)
1100 all_fips = neutron_utils.get_floating_ips(self.neutron)
1101 self.assertEqual(len(initial_fips) + 1, len(all_fips))
1102 returned = neutron_utils.get_floating_ip(self.neutron,
1104 self.assertEqual(self.floating_ip.id, returned.id)
1105 self.assertEqual(self.floating_ip.ip, returned.ip)
1113 def validate_network(neutron, keystone, name, exists, project_name, mtu=None):
1115 Returns true if a network for a given name DOES NOT exist if the exists
1116 parameter is false conversely true. Returns false if a network for a given
1117 name DOES exist if the exists parameter is true conversely false.
1118 :param neutron: The neutron client
1119 :param keystone: The keystone client
1120 :param name: The expected network name
1121 :param exists: Whether or not the network name should exist or not
1122 :param project_name: the associated project name
1125 network = neutron_utils.get_network(
1126 neutron, keystone, network_name=name, project_name=project_name)
1127 if exists and network:
1129 if not exists and not network:
1132 return mtu == network.mtu
1136 def validate_subnet(neutron, network, name, cidr, exists):
1138 Returns true if a subnet for a given name DOES NOT exist if the exists
1139 parameter is false conversely true. Returns false if a subnet for a given
1140 name DOES exist if the exists parameter is true conversely false.
1141 :param neutron: The neutron client
1142 :param network: The SNAPS-OO Network domain object
1143 :param name: The expected subnet name
1144 :param cidr: The expected CIDR value
1145 :param exists: Whether or not the network name should exist or not
1148 subnet = neutron_utils.get_subnet(
1149 neutron, network, subnet_name=name)
1150 if exists and subnet and subnet.name == name:
1151 return subnet.cidr == cidr
1152 if not exists and not subnet:
1157 def validate_router(neutron, keystone, name, project_name, exists):
1159 Returns true if a router for a given name DOES NOT exist if the exists
1160 parameter is false conversely true. Returns false if a router for a given
1161 name DOES exist if the exists parameter is true conversely false.
1162 :param neutron: The neutron client
1163 :param keystone: The keystone client
1164 :param name: The expected router name
1165 :param project_name: The name of the project in which the router should
1167 :param exists: Whether or not the network name should exist or not
1170 router = neutron_utils.get_router(
1171 neutron, keystone, router_name=name, project_name=project_name)
1172 if exists and router:
1177 def validate_interface_router(interface_router, router, subnet):
1179 Returns true if the router ID & subnet ID have been properly included into
1180 the interface router object
1181 :param interface_router: the SNAPS-OO InterfaceRouter domain object
1182 :param router: to validate against the interface_router
1183 :param subnet: to validate against the interface_router
1184 :return: True if both IDs match else False
1186 subnet_id = interface_router.subnet_id
1187 router_id = interface_router.port_id
1189 return subnet.id == subnet_id and router.id == router_id
1192 def validate_port(neutron, port_obj, this_port_name):
1194 Returns true if a port for a given name DOES NOT exist if the exists
1195 parameter is false conversely true. Returns false if a port for a given
1196 name DOES exist if the exists parameter is true conversely false.
1197 :param neutron: The neutron client
1198 :param port_obj: The port object to lookup
1199 :param this_port_name: The expected router name
1202 os_ports = neutron.list_ports()
1203 for os_port, os_port_insts in os_ports.items():
1204 for os_inst in os_port_insts:
1205 if os_inst['id'] == port_obj.id:
1206 return os_inst['name'] == this_port_name