1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from neutronclient.common.exceptions import NotFound, BadRequest
19 from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
20 from snaps.config.security_group import (
21 SecurityGroupConfig, SecurityGroupRuleConfig, Direction)
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
29 __author__ = 'spisarski'
35 class NeutronSmokeTests(OSComponentTestCase):
37 Tests to ensure that the neutron client can communicate with the cloud
40 def test_neutron_connect_success(self):
42 Tests to ensure that the proper credentials can connect.
44 neutron = neutron_utils.neutron_client(self.os_creds, self.os_session)
46 networks = neutron.list_networks()
49 networks = networks.get('networks')
50 for network in networks:
51 if network.get('name') == self.ext_net_name:
53 self.assertTrue(found)
55 def test_neutron_connect_fail(self):
57 Tests to ensure that the improper credentials cannot connect.
59 from snaps.openstack.os_credentials import OSCreds
61 with self.assertRaises(Exception):
62 neutron = neutron_utils.neutron_client(
63 OSCreds(username='user', password='pass', auth_url='url',
64 project_name='project'))
65 neutron.list_networks()
67 def test_retrieve_ext_network_name(self):
69 Tests the neutron_utils.get_external_network_names to ensure the
70 configured self.ext_net_name is contained within the returned list
73 neutron = neutron_utils.neutron_client(self.os_creds, self.os_session)
74 ext_networks = neutron_utils.get_external_networks(neutron)
76 for network in ext_networks:
77 if network.name == self.ext_net_name:
80 self.assertTrue(found)
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
85 Test for creating networks via neutron_utils.py
89 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90 self.port_name = str(guid) + '-port'
91 self.neutron = neutron_utils.neutron_client(
92 self.os_creds, self.os_session)
93 self.keystone = keystone_utils.keystone_client(
94 self.os_creds, self.os_session)
96 self.net_config = openstack_tests.get_pub_net_config(
97 project_name=self.os_creds.project_name,
98 net_name=guid + '-pub-net')
102 Cleans the remote OpenStack objects
105 neutron_utils.delete_network(self.neutron, self.network)
107 super(self.__class__, self).__clean__()
109 def test_create_network(self):
111 Tests the neutron_utils.create_network() function
113 self.network = neutron_utils.create_network(
114 self.neutron, self.os_creds, self.net_config.network_settings)
115 self.assertEqual(self.net_config.network_settings.name,
117 self.assertTrue(validate_network(
118 self.neutron, self.keystone,
119 self.net_config.network_settings.name, True,
120 self.os_creds.project_name))
121 self.assertEqual(len(self.net_config.network_settings.subnet_settings),
122 len(self.network.subnets))
124 def test_create_network_empty_name(self):
126 Tests the neutron_utils.create_network() function with an empty
129 with self.assertRaises(Exception):
130 self.network = neutron_utils.create_network(
131 self.neutron, self.os_creds,
132 network_settings=NetworkConfig(name=''))
134 def test_create_network_null_name(self):
136 Tests the neutron_utils.create_network() function when the network
139 with self.assertRaises(Exception):
140 self.network = neutron_utils.create_network(
141 self.neutron, self.os_creds,
142 network_settings=NetworkConfig())
145 class NeutronUtilsSubnetTests(OSComponentTestCase):
147 Test for creating networks with subnets via neutron_utils.py
151 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
152 self.port_name = str(guid) + '-port'
153 self.neutron = neutron_utils.neutron_client(
154 self.os_creds, self.os_session)
155 self.keystone = keystone_utils.keystone_client(
156 self.os_creds, self.os_session)
158 self.net_config = openstack_tests.get_pub_net_config(
159 project_name=self.os_creds.project_name,
160 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
161 external_net=self.ext_net_name)
165 Cleans the remote OpenStack objects
169 neutron_utils.delete_network(self.neutron, self.network)
173 super(self.__class__, self).__clean__()
175 def test_create_subnet(self):
177 Tests the neutron_utils.create_network() function
179 self.network = neutron_utils.create_network(
180 self.neutron, self.os_creds, self.net_config.network_settings)
181 self.assertEqual(self.net_config.network_settings.name,
183 self.assertTrue(validate_network(
184 self.neutron, self.keystone,
185 self.net_config.network_settings.name, True,
186 self.os_creds.project_name))
188 subnet_setting = self.net_config.network_settings.subnet_settings[0]
189 self.assertTrue(validate_subnet(
190 self.neutron, self.network, subnet_setting.name,
191 subnet_setting.cidr, True))
193 subnet_query1 = neutron_utils.get_subnet(
194 self.neutron, self.network, subnet_name=subnet_setting.name)
195 self.assertEqual(self.network.subnets[0], subnet_query1)
197 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
199 self.assertIsNotNone(subnet_query2)
200 self.assertEqual(1, len(subnet_query2))
201 self.assertEqual(self.network.subnets[0], subnet_query2[0])
203 subnet_query3 = neutron_utils.get_subnet_by_name(
204 self.neutron, self.keystone, subnet_setting.name,
205 self.os_creds.project_name)
206 self.assertIsNotNone(subnet_query3)
207 self.assertEqual(self.network.subnets[0], subnet_query3)
209 def test_create_subnet_null_name(self):
211 Tests the neutron_utils.create_neutron_subnet() function for an
212 Exception when the subnet name is None
214 self.network = neutron_utils.create_network(
215 self.neutron, self.os_creds, self.net_config.network_settings)
216 self.assertEqual(self.net_config.network_settings.name,
218 self.assertTrue(validate_network(
219 self.neutron, self.keystone,
220 self.net_config.network_settings.name, True,
221 self.os_creds.project_name))
223 with self.assertRaises(Exception):
224 SubnetConfig(cidr=self.net_config.subnet_cidr)
226 def test_create_subnet_empty_name(self):
228 Tests the neutron_utils.create_network() function with an empty
231 self.network = neutron_utils.create_network(
232 self.neutron, self.os_creds, self.net_config.network_settings)
233 self.assertEqual(self.net_config.network_settings.name,
235 self.assertTrue(validate_network(
236 self.neutron, self.keystone,
237 self.net_config.network_settings.name, True,
238 self.os_creds.project_name))
240 subnet_setting = self.net_config.network_settings.subnet_settings[0]
241 self.assertTrue(validate_subnet(
242 self.neutron, self.network, subnet_setting.name,
243 subnet_setting.cidr, True))
244 self.assertFalse(validate_subnet(
245 self.neutron, self.network, '', subnet_setting.cidr, True))
247 subnet_query1 = neutron_utils.get_subnet(
248 self.neutron, self.network, subnet_name=subnet_setting.name)
249 self.assertEqual(self.network.subnets[0], subnet_query1)
251 subnet_query2 = neutron_utils.get_subnets_by_network(
252 self.neutron, self.network)
253 self.assertIsNotNone(subnet_query2)
254 self.assertEqual(1, len(subnet_query2))
255 self.assertEqual(self.network.subnets[0], subnet_query2[0])
257 def test_create_subnet_null_cidr(self):
259 Tests the neutron_utils.create_neutron_subnet() function for an
260 Exception when the subnet CIDR value is None
262 self.net_config.network_settings.subnet_settings[0].cidr = None
263 with self.assertRaises(Exception):
264 self.network = neutron_utils.create_network(
265 self.neutron, self.os_creds, self.net_config.network_settings)
267 def test_create_subnet_empty_cidr(self):
269 Tests the neutron_utils.create_neutron_subnet() function for an
270 Exception when the subnet CIDR value is empty
272 self.net_config.network_settings.subnet_settings[0].cidr = ''
273 with self.assertRaises(Exception):
274 self.network = neutron_utils.create_network(
275 self.neutron, self.os_creds, self.net_config.network_settings)
278 class NeutronUtilsIPv6Tests(OSComponentTestCase):
280 Test for creating IPv6 networks with subnets via neutron_utils.py
284 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
285 self.neutron = neutron_utils.neutron_client(
286 self.os_creds, self.os_session)
291 Cleans the remote OpenStack objects
295 neutron_utils.delete_network(self.neutron, self.network)
299 super(self.__class__, self).__clean__()
301 def test_create_network_slaac(self):
303 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
304 is True and IPv6 modes are slaac
306 sub_setting = SubnetConfig(
307 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
308 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
309 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
310 enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
311 self.network_settings = NetworkConfig(
312 name=self.guid + '-net', subnet_settings=[sub_setting])
314 self.network = neutron_utils.create_network(
315 self.neutron, self.os_creds, self.network_settings)
316 self.assertEqual(self.network_settings.name, self.network.name)
318 subnet_settings = self.network_settings.subnet_settings[0]
319 self.assertEqual(1, len(self.network.subnets))
320 subnet = self.network.subnets[0]
322 self.assertEqual(self.network.id, subnet.network_id)
323 self.assertEqual(subnet_settings.name, subnet.name)
324 self.assertEqual(subnet_settings.start, subnet.start)
325 self.assertEqual(subnet_settings.end, subnet.end)
326 self.assertEqual('1:1::/64', subnet.cidr)
327 self.assertEqual(6, subnet.ip_version)
328 self.assertEqual(1, len(subnet.dns_nameservers))
330 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
331 self.assertTrue(subnet.enable_dhcp)
333 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
335 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
337 def test_create_network_stateful(self):
339 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
340 is True and IPv6 modes are stateful
342 sub_setting = SubnetConfig(
343 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
344 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
345 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
346 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
347 ipv6_address_mode='dhcpv6-stateful')
348 self.network_settings = NetworkConfig(
349 name=self.guid + '-net', subnet_settings=[sub_setting])
351 self.network = neutron_utils.create_network(
352 self.neutron, self.os_creds, self.network_settings)
354 self.assertEqual(self.network_settings.name, self.network.name)
356 subnet_settings = self.network_settings.subnet_settings[0]
357 self.assertEqual(1, len(self.network.subnets))
358 subnet = self.network.subnets[0]
360 self.assertEqual(self.network.id, subnet.network_id)
361 self.assertEqual(subnet_settings.name, subnet.name)
362 self.assertEqual(subnet_settings.start, subnet.start)
363 self.assertEqual(subnet_settings.end, subnet.end)
364 self.assertEqual('1:1::/64', subnet.cidr)
365 self.assertEqual(6, subnet.ip_version)
366 self.assertEqual(1, len(subnet.dns_nameservers))
368 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
369 self.assertTrue(subnet.enable_dhcp)
371 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
373 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
375 def test_create_network_stateless(self):
377 Tests the neutron_utils.create_network() when DHCP is enabled and
378 the RA and address modes are both 'slaac'
380 sub_setting = SubnetConfig(
381 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
382 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
383 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
384 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
385 ipv6_address_mode='dhcpv6-stateless')
386 self.network_settings = NetworkConfig(
387 name=self.guid + '-net', subnet_settings=[sub_setting])
389 self.network = neutron_utils.create_network(
390 self.neutron, self.os_creds, self.network_settings)
392 self.assertEqual(self.network_settings.name, self.network.name)
394 subnet_settings = self.network_settings.subnet_settings[0]
395 self.assertEqual(1, len(self.network.subnets))
396 subnet = self.network.subnets[0]
398 self.assertEqual(self.network.id, subnet.network_id)
399 self.assertEqual(subnet_settings.name, subnet.name)
400 self.assertEqual(subnet_settings.start, subnet.start)
401 self.assertEqual(subnet_settings.end, subnet.end)
402 self.assertEqual('1:1::/64', subnet.cidr)
403 self.assertEqual(6, subnet.ip_version)
404 self.assertEqual(1, len(subnet.dns_nameservers))
406 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
407 self.assertTrue(subnet.enable_dhcp)
409 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
411 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
413 def test_create_network_no_dhcp_slaac(self):
415 Tests the neutron_utils.create_network() for a BadRequest when
416 DHCP is not enabled and the RA and address modes are both 'slaac'
418 sub_setting = SubnetConfig(
419 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
420 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
421 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
422 enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
423 self.network_settings = NetworkConfig(
424 name=self.guid + '-net', subnet_settings=[sub_setting])
426 with self.assertRaises(BadRequest):
427 self.network = neutron_utils.create_network(
428 self.neutron, self.os_creds, self.network_settings)
430 def test_create_network_invalid_start_ip(self):
432 Tests the neutron_utils.create_network() that contains one IPv6 subnet
433 with an invalid start IP to ensure Neutron assigns it the smallest IP
436 sub_setting = SubnetConfig(
437 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
439 self.network_settings = NetworkConfig(
440 name=self.guid + '-net', subnet_settings=[sub_setting])
442 self.network = neutron_utils.create_network(
443 self.neutron, self.os_creds, self.network_settings)
445 self.assertEqual('1:1::2', self.network.subnets[0].start)
447 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
449 def test_create_network_invalid_end_ip(self):
451 Tests the neutron_utils.create_network() that contains one IPv6 subnet
452 with an invalid end IP to ensure Neutron assigns it the largest IP
455 sub_setting = SubnetConfig(
456 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
458 self.network_settings = NetworkConfig(
459 name=self.guid + '-net', subnet_settings=[sub_setting])
461 self.network = neutron_utils.create_network(
462 self.neutron, self.os_creds, self.network_settings)
464 self.assertEqual('1:1::2', self.network.subnets[0].start)
466 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
468 def test_create_network_with_bad_cidr(self):
470 Tests the neutron_utils.create_network() for a BadRequest when
471 the subnet CIDR is invalid
473 sub_setting = SubnetConfig(
474 name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
475 self.network_settings = NetworkConfig(
476 name=self.guid + '-net', subnet_settings=[sub_setting])
478 with self.assertRaises(BadRequest):
479 self.network = neutron_utils.create_network(
480 self.neutron, self.os_creds, self.network_settings)
482 def test_create_network_invalid_gateway_ip(self):
484 Tests the neutron_utils.create_network() for a BadRequest when
485 the subnet gateway IP is invalid
487 sub_setting = SubnetConfig(
488 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
489 gateway_ip='192.168.0.1')
490 self.network_settings = NetworkConfig(
491 name=self.guid + '-net', subnet_settings=[sub_setting])
493 with self.assertRaises(BadRequest):
494 self.network = neutron_utils.create_network(
495 self.neutron, self.os_creds, self.network_settings)
497 def test_create_network_with_bad_dns(self):
499 Tests the neutron_utils.create_network() for a BadRequest when
500 the DNS IP is invalid
502 sub_setting = SubnetConfig(
503 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
504 dns_nameservers=['foo'])
505 self.network_settings = NetworkConfig(
506 name=self.guid + '-net', subnet_settings=[sub_setting])
508 with self.assertRaises(BadRequest):
509 self.network = neutron_utils.create_network(
510 self.neutron, self.os_creds, self.network_settings)
513 class NeutronUtilsRouterTests(OSComponentTestCase):
515 Test for creating routers via neutron_utils.py
519 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
520 self.port_name = str(guid) + '-port'
521 self.neutron = neutron_utils.neutron_client(
522 self.os_creds, self.os_session)
523 self.keystone = keystone_utils.keystone_client(
524 self.os_creds, self.os_session)
528 self.interface_router = None
529 self.net_config = openstack_tests.get_pub_net_config(
530 project_name=self.os_creds.project_name,
531 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
532 router_name=guid + '-pub-router', external_net=self.ext_net_name)
536 Cleans the remote OpenStack objects
538 if self.interface_router:
539 neutron_utils.remove_interface_router(
540 self.neutron, self.router, self.network.subnets[0])
544 neutron_utils.delete_router(self.neutron, self.router)
546 self.neutron, self.keystone, self.router.name,
547 self.os_creds.project_name, False)
553 neutron_utils.delete_port(self.neutron, self.port)
558 neutron_utils.delete_network(self.neutron, self.network)
560 super(self.__class__, self).__clean__()
562 def test_create_router_simple(self):
564 Tests the neutron_utils.create_router()
566 self.router = neutron_utils.create_router(
567 self.neutron, self.os_creds, self.net_config.router_settings)
569 self.neutron, self.keystone, self.net_config.router_settings.name,
570 self.os_creds.project_name, True)
572 def test_create_router_with_public_interface(self):
574 Tests the neutron_utils.create_router() function with a pubic interface
576 subnet_setting = self.net_config.network_settings.subnet_settings[0]
577 self.net_config = openstack_tests.OSNetworkConfig(
578 self.os_creds.project_name, self.net_config.network_settings.name,
579 subnet_setting.name, subnet_setting.cidr,
580 self.net_config.router_settings.name, self.ext_net_name)
581 self.router = neutron_utils.create_router(
582 self.neutron, self.os_creds, self.net_config.router_settings)
584 self.neutron, self.keystone, self.net_config.router_settings.name,
585 self.os_creds.project_name, True)
587 ext_net = neutron_utils.get_network(
588 self.neutron, self.keystone, network_name=self.ext_net_name)
589 self.assertEqual(self.router.external_network_id, ext_net.id)
591 def test_add_interface_router(self):
593 Tests the neutron_utils.add_interface_router() function
595 self.network = neutron_utils.create_network(
596 self.neutron, self.os_creds, self.net_config.network_settings)
597 self.assertEqual(self.net_config.network_settings.name,
599 self.assertTrue(validate_network(
600 self.neutron, self.keystone,
601 self.net_config.network_settings.name, True,
602 self.os_creds.project_name))
604 subnet_setting = self.net_config.network_settings.subnet_settings[0]
605 self.assertTrue(validate_subnet(
606 self.neutron, self.network, subnet_setting.name,
607 subnet_setting.cidr, True))
609 self.router = neutron_utils.create_router(
610 self.neutron, self.os_creds, self.net_config.router_settings)
612 self.neutron, self.keystone, self.net_config.router_settings.name,
613 self.os_creds.project_name, True)
615 self.interface_router = neutron_utils.add_interface_router(
616 self.neutron, self.router, self.network.subnets[0])
617 validate_interface_router(self.interface_router, self.router,
618 self.network.subnets[0])
620 def test_add_interface_router_null_router(self):
622 Tests the neutron_utils.add_interface_router() function for an
623 Exception when the router value is None
625 self.network = neutron_utils.create_network(
626 self.neutron, self.os_creds, self.net_config.network_settings)
627 self.assertEqual(self.net_config.network_settings.name,
629 self.assertTrue(validate_network(
630 self.neutron, self.keystone,
631 self.net_config.network_settings.name, True,
632 self.os_creds.project_name))
634 subnet_setting = self.net_config.network_settings.subnet_settings[0]
635 self.assertTrue(validate_subnet(
636 self.neutron, self.network, subnet_setting.name,
637 subnet_setting.cidr, True))
639 with self.assertRaises(NeutronException):
640 self.interface_router = neutron_utils.add_interface_router(
641 self.neutron, self.router, self.network.subnets[0])
643 def test_add_interface_router_null_subnet(self):
645 Tests the neutron_utils.add_interface_router() function for an
646 Exception when the subnet value is None
648 self.network = neutron_utils.create_network(
649 self.neutron, self.os_creds, self.net_config.network_settings)
650 self.assertEqual(self.net_config.network_settings.name,
652 self.assertTrue(validate_network(
653 self.neutron, self.keystone,
654 self.net_config.network_settings.name, True,
655 self.os_creds.project_name))
657 self.router = neutron_utils.create_router(
658 self.neutron, self.os_creds, self.net_config.router_settings)
660 self.neutron, self.keystone, self.net_config.router_settings.name,
661 self.os_creds.project_name, True)
663 with self.assertRaises(NeutronException):
664 self.interface_router = neutron_utils.add_interface_router(
665 self.neutron, self.router, None)
667 def test_add_interface_router_missing_subnet(self):
669 Tests the neutron_utils.add_interface_router() function for an
670 Exception when the subnet object has been deleted
672 self.network = neutron_utils.create_network(
673 self.neutron, self.os_creds, self.net_config.network_settings)
674 self.assertEqual(self.net_config.network_settings.name,
676 self.assertTrue(validate_network(
677 self.neutron, self.keystone,
678 self.net_config.network_settings.name, True,
679 self.os_creds.project_name))
681 self.router = neutron_utils.create_router(
682 self.neutron, self.os_creds, self.net_config.router_settings)
684 self.neutron, self.keystone, self.net_config.router_settings.name,
685 self.os_creds.project_name, True)
687 for subnet in self.network.subnets:
688 neutron_utils.delete_subnet(self.neutron, subnet)
690 with self.assertRaises(NotFound):
691 self.interface_router = neutron_utils.add_interface_router(
692 self.neutron, self.router, self.network.subnets[0])
694 def test_create_port(self):
696 Tests the neutron_utils.create_port() function
698 self.network = neutron_utils.create_network(
699 self.neutron, self.os_creds, self.net_config.network_settings)
700 self.assertEqual(self.net_config.network_settings.name,
702 self.assertTrue(validate_network(
703 self.neutron, self.keystone,
704 self.net_config.network_settings.name, True,
705 self.os_creds.project_name))
707 subnet_setting = self.net_config.network_settings.subnet_settings[0]
708 self.assertTrue(validate_subnet(
709 self.neutron, self.network, subnet_setting.name,
710 subnet_setting.cidr, True))
712 self.port = neutron_utils.create_port(
713 self.neutron, self.os_creds, PortConfig(
716 'subnet_name': subnet_setting.name,
718 network_name=self.net_config.network_settings.name))
719 validate_port(self.neutron, self.port, self.port_name)
721 def test_create_port_empty_name(self):
723 Tests the neutron_utils.create_port() function
725 self.network = neutron_utils.create_network(
726 self.neutron, self.os_creds, self.net_config.network_settings)
727 self.assertEqual(self.net_config.network_settings.name,
729 self.assertTrue(validate_network(
730 self.neutron, self.keystone,
731 self.net_config.network_settings.name, True,
732 self.os_creds.project_name))
734 subnet_setting = self.net_config.network_settings.subnet_settings[0]
735 self.assertTrue(validate_subnet(
736 self.neutron, self.network, subnet_setting.name,
737 subnet_setting.cidr, True))
739 self.port = neutron_utils.create_port(
740 self.neutron, self.os_creds, PortConfig(
742 network_name=self.net_config.network_settings.name,
744 'subnet_name': subnet_setting.name,
746 validate_port(self.neutron, self.port, self.port_name)
748 def test_create_port_null_name(self):
750 Tests the neutron_utils.create_port() when the port name value is None
752 self.network = neutron_utils.create_network(
753 self.neutron, self.os_creds, self.net_config.network_settings)
754 self.assertEqual(self.net_config.network_settings.name,
756 self.assertTrue(validate_network(
757 self.neutron, self.keystone,
758 self.net_config.network_settings.name, True,
759 self.os_creds.project_name))
761 subnet_setting = self.net_config.network_settings.subnet_settings[0]
762 self.assertTrue(validate_subnet(
763 self.neutron, self.network, subnet_setting.name,
764 subnet_setting.cidr, True))
766 self.port = neutron_utils.create_port(
767 self.neutron, self.os_creds,
769 network_name=self.net_config.network_settings.name,
771 'subnet_name': subnet_setting.name,
774 port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
775 self.assertEqual(self.port, port)
777 def test_create_port_null_network_object(self):
779 Tests the neutron_utils.create_port() function for an Exception when
780 the network object is None
782 with self.assertRaises(Exception):
783 self.port = neutron_utils.create_port(
784 self.neutron, self.os_creds,
787 network_name=self.net_config.network_settings.name,
790 self.net_config.network_settings.subnet_settings[
794 def test_create_port_null_ip(self):
796 Tests the neutron_utils.create_port() function for an Exception when
799 self.network = neutron_utils.create_network(
800 self.neutron, self.os_creds, self.net_config.network_settings)
801 self.assertEqual(self.net_config.network_settings.name,
803 self.assertTrue(validate_network(
804 self.neutron, self.keystone,
805 self.net_config.network_settings.name, True,
806 self.os_creds.project_name))
808 subnet_setting = self.net_config.network_settings.subnet_settings[0]
809 self.assertTrue(validate_subnet(
810 self.neutron, self.network, subnet_setting.name,
811 subnet_setting.cidr, True))
813 with self.assertRaises(Exception):
814 self.port = neutron_utils.create_port(
815 self.neutron, self.os_creds,
818 network_name=self.net_config.network_settings.name,
820 'subnet_name': subnet_setting.name,
823 def test_create_port_invalid_ip(self):
825 Tests the neutron_utils.create_port() function for an Exception when
828 self.network = neutron_utils.create_network(
829 self.neutron, self.os_creds, self.net_config.network_settings)
830 self.assertEqual(self.net_config.network_settings.name,
832 self.assertTrue(validate_network(
833 self.neutron, self.keystone,
834 self.net_config.network_settings.name, True,
835 self.os_creds.project_name))
837 subnet_setting = self.net_config.network_settings.subnet_settings[0]
838 self.assertTrue(validate_subnet(
839 self.neutron, self.network, subnet_setting.name,
840 subnet_setting.cidr, True))
842 with self.assertRaises(Exception):
843 self.port = neutron_utils.create_port(
844 self.neutron, self.os_creds,
847 network_name=self.net_config.network_settings.name,
849 'subnet_name': subnet_setting.name,
852 def test_create_port_invalid_ip_to_subnet(self):
854 Tests the neutron_utils.create_port() function for an Exception when
857 self.network = neutron_utils.create_network(
858 self.neutron, self.os_creds, self.net_config.network_settings)
859 self.assertEqual(self.net_config.network_settings.name,
861 self.assertTrue(validate_network(
862 self.neutron, self.keystone,
863 self.net_config.network_settings.name, True,
864 self.os_creds.project_name))
866 subnet_setting = self.net_config.network_settings.subnet_settings[0]
867 self.assertTrue(validate_subnet(
868 self.neutron, self.network, subnet_setting.name,
869 subnet_setting.cidr, True))
871 with self.assertRaises(Exception):
872 self.port = neutron_utils.create_port(
873 self.neutron, self.os_creds,
876 network_name=self.net_config.network_settings.name,
878 'subnet_name': subnet_setting.name,
879 'ip': '10.197.123.100'}]))
882 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
884 Test for creating security groups via neutron_utils.py
888 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
889 self.sec_grp_name = guid + 'name'
891 self.security_groups = list()
892 self.security_group_rules = list()
893 self.neutron = neutron_utils.neutron_client(
894 self.os_creds, self.os_session)
895 self.keystone = keystone_utils.keystone_client(
896 self.os_creds, self.os_session)
900 Cleans the remote OpenStack objects
902 for rule in self.security_group_rules:
903 neutron_utils.delete_security_group_rule(self.neutron, rule)
905 for security_group in self.security_groups:
907 neutron_utils.delete_security_group(self.neutron,
912 super(self.__class__, self).__clean__()
914 def test_create_delete_simple_sec_grp(self):
916 Tests the neutron_utils.create_security_group() function
918 sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
919 security_group = neutron_utils.create_security_group(
920 self.neutron, self.keystone, sec_grp_settings)
922 self.assertTrue(sec_grp_settings.name, security_group.name)
924 sec_grp_get = neutron_utils.get_security_group(
925 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
926 self.assertIsNotNone(sec_grp_get)
927 self.assertTrue(validation_utils.objects_equivalent(
928 security_group, sec_grp_get))
930 neutron_utils.delete_security_group(self.neutron, security_group)
931 sec_grp_get = neutron_utils.get_security_group(
932 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
933 self.assertIsNone(sec_grp_get)
935 def test_create_sec_grp_no_name(self):
937 Tests the SecurityGroupConfig constructor and
938 neutron_utils.create_security_group() function to ensure that
939 attempting to create a security group without a name will raise an
942 with self.assertRaises(Exception):
943 sec_grp_settings = SecurityGroupConfig()
944 self.security_groups.append(
945 neutron_utils.create_security_group(
946 self.neutron, self.keystone, sec_grp_settings))
948 def test_create_sec_grp_no_rules(self):
950 Tests the neutron_utils.create_security_group() function
952 sec_grp_settings = SecurityGroupConfig(
953 name=self.sec_grp_name, description='hello group')
954 self.security_groups.append(
955 neutron_utils.create_security_group(
956 self.neutron, self.keystone, sec_grp_settings))
958 self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
959 self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
961 sec_grp_get = neutron_utils.get_security_group(
962 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
963 self.assertIsNotNone(sec_grp_get)
964 self.assertEqual(self.security_groups[0], sec_grp_get)
966 def test_create_sec_grp_one_rule(self):
968 Tests the neutron_utils.create_security_group() function
971 sec_grp_rule_settings = SecurityGroupRuleConfig(
972 sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
973 sec_grp_settings = SecurityGroupConfig(
974 name=self.sec_grp_name, description='hello group',
975 rule_settings=[sec_grp_rule_settings])
977 self.security_groups.append(
978 neutron_utils.create_security_group(
979 self.neutron, self.keystone, sec_grp_settings))
980 free_rules = neutron_utils.get_rules_by_security_group(
981 self.neutron, self.security_groups[0])
982 for free_rule in free_rules:
983 self.security_group_rules.append(free_rule)
985 keystone = keystone_utils.keystone_client(
986 self.os_creds, self.os_session)
987 self.security_group_rules.append(
988 neutron_utils.create_security_group_rule(
989 self.neutron, keystone, sec_grp_settings.rule_settings[0],
990 self.os_creds.project_name))
992 # Refresh object so it is populated with the newly added rule
993 security_group = neutron_utils.get_security_group(
994 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
996 rules = neutron_utils.get_rules_by_security_group(
997 self.neutron, security_group)
1000 validation_utils.objects_equivalent(
1001 self.security_group_rules, rules))
1003 self.assertTrue(sec_grp_settings.name, security_group.name)
1005 sec_grp_get = neutron_utils.get_security_group(
1006 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
1007 self.assertIsNotNone(sec_grp_get)
1008 self.assertEqual(security_group, sec_grp_get)
1010 def test_get_sec_grp_by_id(self):
1012 Tests the neutron_utils.create_security_group() function
1015 self.security_groups.append(neutron_utils.create_security_group(
1016 self.neutron, self.keystone,
1017 SecurityGroupConfig(
1018 name=self.sec_grp_name + '-1', description='hello group')))
1019 self.security_groups.append(neutron_utils.create_security_group(
1020 self.neutron, self.keystone,
1021 SecurityGroupConfig(
1022 name=self.sec_grp_name + '-2', description='hello group')))
1024 sec_grp_1b = neutron_utils.get_security_group_by_id(
1025 self.neutron, self.security_groups[0].id)
1026 sec_grp_2b = neutron_utils.get_security_group_by_id(
1027 self.neutron, self.security_groups[1].id)
1029 self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
1030 self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
1033 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
1035 Test basic nova keypair functionality
1040 Instantiates the CreateImage object that is responsible for downloading
1041 and creating an OS image file within OpenStack
1043 self.neutron = neutron_utils.neutron_client(
1044 self.os_creds, self.os_session)
1045 self.keystone = keystone_utils.keystone_client(
1046 self.os_creds, self.os_session)
1047 self.floating_ip = None
1051 Cleans the image and downloaded image file
1053 if self.floating_ip:
1055 neutron_utils.delete_floating_ip(
1056 self.neutron, self.floating_ip)
1060 super(self.__class__, self).__clean__()
1062 def test_floating_ips(self):
1064 Tests the creation of a floating IP
1067 initial_fips = neutron_utils.get_floating_ips(self.neutron)
1069 self.floating_ip = neutron_utils.create_floating_ip(
1070 self.neutron, self.keystone, self.ext_net_name)
1071 all_fips = neutron_utils.get_floating_ips(self.neutron)
1072 self.assertEqual(len(initial_fips) + 1, len(all_fips))
1073 returned = neutron_utils.get_floating_ip(self.neutron,
1075 self.assertEqual(self.floating_ip.id, returned.id)
1076 self.assertEqual(self.floating_ip.ip, returned.ip)
1084 def validate_network(neutron, keystone, name, exists, project_name):
1086 Returns true if a network for a given name DOES NOT exist if the exists
1087 parameter is false conversely true. Returns false if a network for a given
1088 name DOES exist if the exists parameter is true conversely false.
1089 :param neutron: The neutron client
1090 :param keystone: The keystone client
1091 :param name: The expected network name
1092 :param exists: Whether or not the network name should exist or not
1093 :param project_name: the associated project name
1096 network = neutron_utils.get_network(
1097 neutron, keystone, network_name=name, project_name=project_name)
1098 if exists and network:
1100 if not exists and not network:
1105 def validate_subnet(neutron, network, name, cidr, exists):
1107 Returns true if a subnet for a given name DOES NOT exist if the exists
1108 parameter is false conversely true. Returns false if a subnet for a given
1109 name DOES exist if the exists parameter is true conversely false.
1110 :param neutron: The neutron client
1111 :param network: The SNAPS-OO Network domain object
1112 :param name: The expected subnet name
1113 :param cidr: The expected CIDR value
1114 :param exists: Whether or not the network name should exist or not
1117 subnet = neutron_utils.get_subnet(
1118 neutron, network, subnet_name=name)
1119 if exists and subnet and subnet.name == name:
1120 return subnet.cidr == cidr
1121 if not exists and not subnet:
1126 def validate_router(neutron, keystone, name, project_name, exists):
1128 Returns true if a router for a given name DOES NOT exist if the exists
1129 parameter is false conversely true. Returns false if a router for a given
1130 name DOES exist if the exists parameter is true conversely false.
1131 :param neutron: The neutron client
1132 :param keystone: The keystone client
1133 :param name: The expected router name
1134 :param project_name: The name of the project in which the router should
1136 :param exists: Whether or not the network name should exist or not
1139 router = neutron_utils.get_router(
1140 neutron, keystone, router_name=name, project_name=project_name)
1141 if exists and router:
1146 def validate_interface_router(interface_router, router, subnet):
1148 Returns true if the router ID & subnet ID have been properly included into
1149 the interface router object
1150 :param interface_router: the SNAPS-OO InterfaceRouter domain object
1151 :param router: to validate against the interface_router
1152 :param subnet: to validate against the interface_router
1153 :return: True if both IDs match else False
1155 subnet_id = interface_router.subnet_id
1156 router_id = interface_router.port_id
1158 return subnet.id == subnet_id and router.id == router_id
1161 def validate_port(neutron, port_obj, this_port_name):
1163 Returns true if a port for a given name DOES NOT exist if the exists
1164 parameter is false conversely true. Returns false if a port for a given
1165 name DOES exist if the exists parameter is true conversely false.
1166 :param neutron: The neutron client
1167 :param port_obj: The port object to lookup
1168 :param this_port_name: The expected router name
1171 os_ports = neutron.list_ports()
1172 for os_port, os_port_insts in os_ports.items():
1173 for os_inst in os_port_insts:
1174 if os_inst['id'] == port_obj.id:
1175 return os_inst['name'] == this_port_name