1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from neutronclient.common.exceptions import NotFound, BadRequest
19 from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
20 from snaps.config.security_group import (
21 SecurityGroupConfig, SecurityGroupRuleConfig, Direction)
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
29 __author__ = 'spisarski'
35 class NeutronSmokeTests(OSComponentTestCase):
37 Tests to ensure that the neutron client can communicate with the cloud
40 def test_neutron_connect_success(self):
42 Tests to ensure that the proper credentials can connect.
44 neutron = neutron_utils.neutron_client(self.os_creds, self.os_session)
46 networks = neutron.list_networks()
49 networks = networks.get('networks')
50 for network in networks:
51 if network.get('name') == self.ext_net_name:
53 self.assertTrue(found)
55 def test_neutron_connect_fail(self):
57 Tests to ensure that the improper credentials cannot connect.
59 from snaps.openstack.os_credentials import OSCreds
61 with self.assertRaises(Exception):
62 neutron = neutron_utils.neutron_client(
63 OSCreds(username='user', password='pass', auth_url='url',
64 project_name='project'))
65 neutron.list_networks()
67 def test_retrieve_ext_network_name(self):
69 Tests the neutron_utils.get_external_network_names to ensure the
70 configured self.ext_net_name is contained within the returned list
73 neutron = neutron_utils.neutron_client(self.os_creds, self.os_session)
74 ext_networks = neutron_utils.get_external_networks(neutron)
76 for network in ext_networks:
77 if network.name == self.ext_net_name:
80 self.assertTrue(found)
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
85 Test for creating networks via neutron_utils.py
89 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90 self.port_name = str(guid) + '-port'
91 self.neutron = neutron_utils.neutron_client(
92 self.os_creds, self.os_session)
93 self.keystone = keystone_utils.keystone_client(
94 self.os_creds, self.os_session)
96 self.net_config = openstack_tests.get_pub_net_config(
97 net_name=guid + '-pub-net')
101 Cleans the remote OpenStack objects
104 neutron_utils.delete_network(self.neutron, self.network)
106 super(self.__class__, self).__clean__()
108 def test_create_network(self):
110 Tests the neutron_utils.create_network() function
112 self.network = neutron_utils.create_network(
113 self.neutron, self.os_creds, self.net_config.network_settings)
114 self.assertEqual(self.net_config.network_settings.name,
116 self.assertTrue(validate_network(
117 self.neutron, self.keystone,
118 self.net_config.network_settings.name, True,
119 self.os_creds.project_name))
120 self.assertEqual(len(self.net_config.network_settings.subnet_settings),
121 len(self.network.subnets))
123 def test_create_network_empty_name(self):
125 Tests the neutron_utils.create_network() function with an empty
128 with self.assertRaises(Exception):
129 self.network = neutron_utils.create_network(
130 self.neutron, self.os_creds,
131 network_settings=NetworkConfig(name=''))
133 def test_create_network_null_name(self):
135 Tests the neutron_utils.create_network() function when the network
138 with self.assertRaises(Exception):
139 self.network = neutron_utils.create_network(
140 self.neutron, self.os_creds,
141 network_settings=NetworkConfig())
144 class NeutronUtilsSubnetTests(OSComponentTestCase):
146 Test for creating networks with subnets via neutron_utils.py
150 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
151 self.port_name = str(guid) + '-port'
152 self.neutron = neutron_utils.neutron_client(
153 self.os_creds, self.os_session)
154 self.keystone = keystone_utils.keystone_client(
155 self.os_creds, self.os_session)
157 self.net_config = openstack_tests.get_pub_net_config(
158 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
159 external_net=self.ext_net_name)
163 Cleans the remote OpenStack objects
167 neutron_utils.delete_network(self.neutron, self.network)
171 super(self.__class__, self).__clean__()
173 def test_create_subnet(self):
175 Tests the neutron_utils.create_network() function
177 self.network = neutron_utils.create_network(
178 self.neutron, self.os_creds, self.net_config.network_settings)
179 self.assertEqual(self.net_config.network_settings.name,
181 self.assertTrue(validate_network(
182 self.neutron, self.keystone,
183 self.net_config.network_settings.name, True,
184 self.os_creds.project_name))
186 subnet_setting = self.net_config.network_settings.subnet_settings[0]
187 self.assertTrue(validate_subnet(
188 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
190 subnet_query1 = neutron_utils.get_subnet(
191 self.neutron, subnet_name=subnet_setting.name)
192 self.assertEqual(self.network.subnets[0], subnet_query1)
194 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
196 self.assertIsNotNone(subnet_query2)
197 self.assertEqual(1, len(subnet_query2))
198 self.assertEqual(self.network.subnets[0], subnet_query2[0])
200 def test_create_subnet_null_name(self):
202 Tests the neutron_utils.create_neutron_subnet() function for an
203 Exception when the subnet name is None
205 self.network = neutron_utils.create_network(
206 self.neutron, self.os_creds, self.net_config.network_settings)
207 self.assertEqual(self.net_config.network_settings.name,
209 self.assertTrue(validate_network(
210 self.neutron, self.keystone,
211 self.net_config.network_settings.name, True,
212 self.os_creds.project_name))
214 with self.assertRaises(Exception):
215 SubnetConfig(cidr=self.net_config.subnet_cidr)
217 def test_create_subnet_empty_name(self):
219 Tests the neutron_utils.create_network() function with an empty
222 self.network = neutron_utils.create_network(
223 self.neutron, self.os_creds, self.net_config.network_settings)
224 self.assertEqual(self.net_config.network_settings.name,
226 self.assertTrue(validate_network(
227 self.neutron, self.keystone,
228 self.net_config.network_settings.name, True,
229 self.os_creds.project_name))
231 subnet_setting = self.net_config.network_settings.subnet_settings[0]
232 self.assertTrue(validate_subnet(
233 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
234 self.assertFalse(validate_subnet(
235 self.neutron, '', subnet_setting.cidr, True))
237 subnet_query1 = neutron_utils.get_subnet(
238 self.neutron, subnet_name=subnet_setting.name)
239 self.assertEqual(self.network.subnets[0], subnet_query1)
241 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
243 self.assertIsNotNone(subnet_query2)
244 self.assertEqual(1, len(subnet_query2))
245 self.assertEqual(self.network.subnets[0], subnet_query2[0])
247 def test_create_subnet_null_cidr(self):
249 Tests the neutron_utils.create_neutron_subnet() function for an
250 Exception when the subnet CIDR value is None
252 self.net_config.network_settings.subnet_settings[0].cidr = None
253 with self.assertRaises(Exception):
254 self.network = neutron_utils.create_network(
255 self.neutron, self.os_creds, self.net_config.network_settings)
257 def test_create_subnet_empty_cidr(self):
259 Tests the neutron_utils.create_neutron_subnet() function for an
260 Exception when the subnet CIDR value is empty
262 self.net_config.network_settings.subnet_settings[0].cidr = ''
263 with self.assertRaises(Exception):
264 self.network = neutron_utils.create_network(
265 self.neutron, self.os_creds, self.net_config.network_settings)
268 class NeutronUtilsIPv6Tests(OSComponentTestCase):
270 Test for creating IPv6 networks with subnets via neutron_utils.py
274 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
275 self.neutron = neutron_utils.neutron_client(
276 self.os_creds, self.os_session)
281 Cleans the remote OpenStack objects
285 neutron_utils.delete_network(self.neutron, self.network)
289 super(self.__class__, self).__clean__()
291 def test_create_network_slaac(self):
293 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
294 is True and IPv6 modes are slaac
296 sub_setting = SubnetConfig(
297 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
298 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
299 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
300 enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
301 self.network_settings = NetworkConfig(
302 name=self.guid + '-net', subnet_settings=[sub_setting])
304 self.network = neutron_utils.create_network(
305 self.neutron, self.os_creds, self.network_settings)
306 self.assertEqual(self.network_settings.name, self.network.name)
308 subnet_settings = self.network_settings.subnet_settings[0]
309 self.assertEqual(1, len(self.network.subnets))
310 subnet = self.network.subnets[0]
312 self.assertEqual(self.network.id, subnet.network_id)
313 self.assertEqual(subnet_settings.name, subnet.name)
314 self.assertEqual(subnet_settings.start, subnet.start)
315 self.assertEqual(subnet_settings.end, subnet.end)
316 self.assertEqual('1:1::/64', subnet.cidr)
317 self.assertEqual(6, subnet.ip_version)
318 self.assertEqual(1, len(subnet.dns_nameservers))
320 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
321 self.assertTrue(subnet.enable_dhcp)
323 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
325 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
327 def test_create_network_stateful(self):
329 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
330 is True and IPv6 modes are stateful
332 sub_setting = SubnetConfig(
333 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
334 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
335 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
336 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
337 ipv6_address_mode='dhcpv6-stateful')
338 self.network_settings = NetworkConfig(
339 name=self.guid + '-net', subnet_settings=[sub_setting])
341 self.network = neutron_utils.create_network(
342 self.neutron, self.os_creds, self.network_settings)
344 self.assertEqual(self.network_settings.name, self.network.name)
346 subnet_settings = self.network_settings.subnet_settings[0]
347 self.assertEqual(1, len(self.network.subnets))
348 subnet = self.network.subnets[0]
350 self.assertEqual(self.network.id, subnet.network_id)
351 self.assertEqual(subnet_settings.name, subnet.name)
352 self.assertEqual(subnet_settings.start, subnet.start)
353 self.assertEqual(subnet_settings.end, subnet.end)
354 self.assertEqual('1:1::/64', subnet.cidr)
355 self.assertEqual(6, subnet.ip_version)
356 self.assertEqual(1, len(subnet.dns_nameservers))
358 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
359 self.assertTrue(subnet.enable_dhcp)
361 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
363 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
365 def test_create_network_stateless(self):
367 Tests the neutron_utils.create_network() when DHCP is enabled and
368 the RA and address modes are both 'slaac'
370 sub_setting = SubnetConfig(
371 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
372 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
373 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
374 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
375 ipv6_address_mode='dhcpv6-stateless')
376 self.network_settings = NetworkConfig(
377 name=self.guid + '-net', subnet_settings=[sub_setting])
379 self.network = neutron_utils.create_network(
380 self.neutron, self.os_creds, self.network_settings)
382 self.assertEqual(self.network_settings.name, self.network.name)
384 subnet_settings = self.network_settings.subnet_settings[0]
385 self.assertEqual(1, len(self.network.subnets))
386 subnet = self.network.subnets[0]
388 self.assertEqual(self.network.id, subnet.network_id)
389 self.assertEqual(subnet_settings.name, subnet.name)
390 self.assertEqual(subnet_settings.start, subnet.start)
391 self.assertEqual(subnet_settings.end, subnet.end)
392 self.assertEqual('1:1::/64', subnet.cidr)
393 self.assertEqual(6, subnet.ip_version)
394 self.assertEqual(1, len(subnet.dns_nameservers))
396 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
397 self.assertTrue(subnet.enable_dhcp)
399 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
401 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
403 def test_create_network_no_dhcp_slaac(self):
405 Tests the neutron_utils.create_network() for a BadRequest when
406 DHCP is not enabled and the RA and address modes are both 'slaac'
408 sub_setting = SubnetConfig(
409 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
410 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
411 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
412 enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
413 self.network_settings = NetworkConfig(
414 name=self.guid + '-net', subnet_settings=[sub_setting])
416 with self.assertRaises(BadRequest):
417 self.network = neutron_utils.create_network(
418 self.neutron, self.os_creds, self.network_settings)
420 def test_create_network_invalid_start_ip(self):
422 Tests the neutron_utils.create_network() that contains one IPv6 subnet
423 with an invalid start IP to ensure Neutron assigns it the smallest IP
426 sub_setting = SubnetConfig(
427 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
429 self.network_settings = NetworkConfig(
430 name=self.guid + '-net', subnet_settings=[sub_setting])
432 self.network = neutron_utils.create_network(
433 self.neutron, self.os_creds, self.network_settings)
435 self.assertEqual('1:1::2', self.network.subnets[0].start)
437 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
439 def test_create_network_invalid_end_ip(self):
441 Tests the neutron_utils.create_network() that contains one IPv6 subnet
442 with an invalid end IP to ensure Neutron assigns it the largest IP
445 sub_setting = SubnetConfig(
446 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
448 self.network_settings = NetworkConfig(
449 name=self.guid + '-net', subnet_settings=[sub_setting])
451 self.network = neutron_utils.create_network(
452 self.neutron, self.os_creds, self.network_settings)
454 self.assertEqual('1:1::2', self.network.subnets[0].start)
456 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
458 def test_create_network_with_bad_cidr(self):
460 Tests the neutron_utils.create_network() for a BadRequest when
461 the subnet CIDR is invalid
463 sub_setting = SubnetConfig(
464 name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
465 self.network_settings = NetworkConfig(
466 name=self.guid + '-net', subnet_settings=[sub_setting])
468 with self.assertRaises(BadRequest):
469 self.network = neutron_utils.create_network(
470 self.neutron, self.os_creds, self.network_settings)
472 def test_create_network_invalid_gateway_ip(self):
474 Tests the neutron_utils.create_network() for a BadRequest when
475 the subnet gateway IP is invalid
477 sub_setting = SubnetConfig(
478 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
479 gateway_ip='192.168.0.1')
480 self.network_settings = NetworkConfig(
481 name=self.guid + '-net', subnet_settings=[sub_setting])
483 with self.assertRaises(BadRequest):
484 self.network = neutron_utils.create_network(
485 self.neutron, self.os_creds, self.network_settings)
487 def test_create_network_with_bad_dns(self):
489 Tests the neutron_utils.create_network() for a BadRequest when
490 the DNS IP is invalid
492 sub_setting = SubnetConfig(
493 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
494 dns_nameservers=['foo'])
495 self.network_settings = NetworkConfig(
496 name=self.guid + '-net', subnet_settings=[sub_setting])
498 with self.assertRaises(BadRequest):
499 self.network = neutron_utils.create_network(
500 self.neutron, self.os_creds, self.network_settings)
503 class NeutronUtilsRouterTests(OSComponentTestCase):
505 Test for creating routers via neutron_utils.py
509 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
510 self.port_name = str(guid) + '-port'
511 self.neutron = neutron_utils.neutron_client(
512 self.os_creds, self.os_session)
513 self.keystone = keystone_utils.keystone_client(
514 self.os_creds, self.os_session)
518 self.interface_router = None
519 self.net_config = openstack_tests.get_pub_net_config(
520 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
521 router_name=guid + '-pub-router', external_net=self.ext_net_name)
525 Cleans the remote OpenStack objects
527 if self.interface_router:
528 neutron_utils.remove_interface_router(
529 self.neutron, self.router, self.network.subnets[0])
533 neutron_utils.delete_router(self.neutron, self.router)
535 self.neutron, self.keystone, self.router.name,
536 self.os_creds.project_name, False)
542 neutron_utils.delete_port(self.neutron, self.port)
547 neutron_utils.delete_network(self.neutron, self.network)
549 super(self.__class__, self).__clean__()
551 def test_create_router_simple(self):
553 Tests the neutron_utils.create_router()
555 self.router = neutron_utils.create_router(
556 self.neutron, self.os_creds, self.net_config.router_settings)
558 self.neutron, self.keystone, self.net_config.router_settings.name,
559 self.os_creds.project_name, True)
561 def test_create_router_with_public_interface(self):
563 Tests the neutron_utils.create_router() function with a pubic interface
565 subnet_setting = self.net_config.network_settings.subnet_settings[0]
566 self.net_config = openstack_tests.OSNetworkConfig(
567 self.net_config.network_settings.name, subnet_setting.name,
568 subnet_setting.cidr, self.net_config.router_settings.name,
570 self.router = neutron_utils.create_router(
571 self.neutron, self.os_creds, self.net_config.router_settings)
573 self.neutron, self.keystone, self.net_config.router_settings.name,
574 self.os_creds.project_name, True)
576 ext_net = neutron_utils.get_network(
577 self.neutron, self.keystone, network_name=self.ext_net_name)
578 self.assertEqual(self.router.external_network_id, ext_net.id)
580 def test_add_interface_router(self):
582 Tests the neutron_utils.add_interface_router() function
584 self.network = neutron_utils.create_network(
585 self.neutron, self.os_creds, self.net_config.network_settings)
586 self.assertEqual(self.net_config.network_settings.name,
588 self.assertTrue(validate_network(
589 self.neutron, self.keystone,
590 self.net_config.network_settings.name, True,
591 self.os_creds.project_name))
593 subnet_setting = self.net_config.network_settings.subnet_settings[0]
594 self.assertTrue(validate_subnet(
595 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
597 self.router = neutron_utils.create_router(
598 self.neutron, self.os_creds, self.net_config.router_settings)
600 self.neutron, self.keystone, self.net_config.router_settings.name,
601 self.os_creds.project_name, True)
603 self.interface_router = neutron_utils.add_interface_router(
604 self.neutron, self.router, self.network.subnets[0])
605 validate_interface_router(self.interface_router, self.router,
606 self.network.subnets[0])
608 def test_add_interface_router_null_router(self):
610 Tests the neutron_utils.add_interface_router() function for an
611 Exception when the router value is None
613 self.network = neutron_utils.create_network(
614 self.neutron, self.os_creds, self.net_config.network_settings)
615 self.assertEqual(self.net_config.network_settings.name,
617 self.assertTrue(validate_network(
618 self.neutron, self.keystone,
619 self.net_config.network_settings.name, True,
620 self.os_creds.project_name))
622 subnet_setting = self.net_config.network_settings.subnet_settings[0]
623 self.assertTrue(validate_subnet(
624 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
626 with self.assertRaises(NeutronException):
627 self.interface_router = neutron_utils.add_interface_router(
628 self.neutron, self.router, self.network.subnets[0])
630 def test_add_interface_router_null_subnet(self):
632 Tests the neutron_utils.add_interface_router() function for an
633 Exception when the subnet value is None
635 self.network = neutron_utils.create_network(
636 self.neutron, self.os_creds, self.net_config.network_settings)
637 self.assertEqual(self.net_config.network_settings.name,
639 self.assertTrue(validate_network(
640 self.neutron, self.keystone,
641 self.net_config.network_settings.name, True,
642 self.os_creds.project_name))
644 self.router = neutron_utils.create_router(
645 self.neutron, self.os_creds, self.net_config.router_settings)
647 self.neutron, self.keystone, self.net_config.router_settings.name,
648 self.os_creds.project_name, True)
650 with self.assertRaises(NeutronException):
651 self.interface_router = neutron_utils.add_interface_router(
652 self.neutron, self.router, None)
654 def test_add_interface_router_missing_subnet(self):
656 Tests the neutron_utils.add_interface_router() function for an
657 Exception when the subnet object has been deleted
659 self.network = neutron_utils.create_network(
660 self.neutron, self.os_creds, self.net_config.network_settings)
661 self.assertEqual(self.net_config.network_settings.name,
663 self.assertTrue(validate_network(
664 self.neutron, self.keystone,
665 self.net_config.network_settings.name, True,
666 self.os_creds.project_name))
668 self.router = neutron_utils.create_router(
669 self.neutron, self.os_creds, self.net_config.router_settings)
671 self.neutron, self.keystone, self.net_config.router_settings.name,
672 self.os_creds.project_name, True)
674 for subnet in self.network.subnets:
675 neutron_utils.delete_subnet(self.neutron, subnet)
677 with self.assertRaises(NotFound):
678 self.interface_router = neutron_utils.add_interface_router(
679 self.neutron, self.router, self.network.subnets[0])
681 def test_create_port(self):
683 Tests the neutron_utils.create_port() function
685 self.network = neutron_utils.create_network(
686 self.neutron, self.os_creds, self.net_config.network_settings)
687 self.assertEqual(self.net_config.network_settings.name,
689 self.assertTrue(validate_network(
690 self.neutron, self.keystone,
691 self.net_config.network_settings.name, True,
692 self.os_creds.project_name))
694 subnet_setting = self.net_config.network_settings.subnet_settings[0]
695 self.assertTrue(validate_subnet(
696 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
698 self.port = neutron_utils.create_port(
699 self.neutron, self.os_creds, PortConfig(
702 'subnet_name': subnet_setting.name,
704 network_name=self.net_config.network_settings.name))
705 validate_port(self.neutron, self.port, self.port_name)
707 def test_create_port_empty_name(self):
709 Tests the neutron_utils.create_port() function
711 self.network = neutron_utils.create_network(
712 self.neutron, self.os_creds, self.net_config.network_settings)
713 self.assertEqual(self.net_config.network_settings.name,
715 self.assertTrue(validate_network(
716 self.neutron, self.keystone,
717 self.net_config.network_settings.name, True,
718 self.os_creds.project_name))
720 subnet_setting = self.net_config.network_settings.subnet_settings[0]
721 self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
722 subnet_setting.cidr, True))
724 self.port = neutron_utils.create_port(
725 self.neutron, self.os_creds, PortConfig(
727 network_name=self.net_config.network_settings.name,
729 'subnet_name': subnet_setting.name,
731 validate_port(self.neutron, self.port, self.port_name)
733 def test_create_port_null_name(self):
735 Tests the neutron_utils.create_port() when the port name value is None
737 self.network = neutron_utils.create_network(
738 self.neutron, self.os_creds, self.net_config.network_settings)
739 self.assertEqual(self.net_config.network_settings.name,
741 self.assertTrue(validate_network(
742 self.neutron, self.keystone,
743 self.net_config.network_settings.name, True,
744 self.os_creds.project_name))
746 subnet_setting = self.net_config.network_settings.subnet_settings[0]
747 self.assertTrue(validate_subnet(
748 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
750 self.port = neutron_utils.create_port(
751 self.neutron, self.os_creds,
753 network_name=self.net_config.network_settings.name,
755 'subnet_name': subnet_setting.name,
758 port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
759 self.assertEqual(self.port, port)
761 def test_create_port_null_network_object(self):
763 Tests the neutron_utils.create_port() function for an Exception when
764 the network object is None
766 with self.assertRaises(Exception):
767 self.port = neutron_utils.create_port(
768 self.neutron, self.os_creds,
771 network_name=self.net_config.network_settings.name,
774 self.net_config.network_settings.subnet_settings[
778 def test_create_port_null_ip(self):
780 Tests the neutron_utils.create_port() function for an Exception when
783 self.network = neutron_utils.create_network(
784 self.neutron, self.os_creds, self.net_config.network_settings)
785 self.assertEqual(self.net_config.network_settings.name,
787 self.assertTrue(validate_network(
788 self.neutron, self.keystone,
789 self.net_config.network_settings.name, True,
790 self.os_creds.project_name))
792 subnet_setting = self.net_config.network_settings.subnet_settings[0]
793 self.assertTrue(validate_subnet(
794 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
796 with self.assertRaises(Exception):
797 self.port = neutron_utils.create_port(
798 self.neutron, self.os_creds,
801 network_name=self.net_config.network_settings.name,
803 'subnet_name': subnet_setting.name,
806 def test_create_port_invalid_ip(self):
808 Tests the neutron_utils.create_port() function for an Exception when
811 self.network = neutron_utils.create_network(
812 self.neutron, self.os_creds, self.net_config.network_settings)
813 self.assertEqual(self.net_config.network_settings.name,
815 self.assertTrue(validate_network(
816 self.neutron, self.keystone,
817 self.net_config.network_settings.name, True,
818 self.os_creds.project_name))
820 subnet_setting = self.net_config.network_settings.subnet_settings[0]
821 self.assertTrue(validate_subnet(
822 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
824 with self.assertRaises(Exception):
825 self.port = neutron_utils.create_port(
826 self.neutron, self.os_creds,
829 network_name=self.net_config.network_settings.name,
831 'subnet_name': subnet_setting.name,
834 def test_create_port_invalid_ip_to_subnet(self):
836 Tests the neutron_utils.create_port() function for an Exception when
839 self.network = neutron_utils.create_network(
840 self.neutron, self.os_creds, self.net_config.network_settings)
841 self.assertEqual(self.net_config.network_settings.name,
843 self.assertTrue(validate_network(
844 self.neutron, self.keystone,
845 self.net_config.network_settings.name, True,
846 self.os_creds.project_name))
848 subnet_setting = self.net_config.network_settings.subnet_settings[0]
849 self.assertTrue(validate_subnet(
850 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
852 with self.assertRaises(Exception):
853 self.port = neutron_utils.create_port(
854 self.neutron, self.os_creds,
857 network_name=self.net_config.network_settings.name,
859 'subnet_name': subnet_setting.name,
860 'ip': '10.197.123.100'}]))
863 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
865 Test for creating security groups via neutron_utils.py
869 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
870 self.sec_grp_name = guid + 'name'
872 self.security_groups = list()
873 self.security_group_rules = list()
874 self.neutron = neutron_utils.neutron_client(
875 self.os_creds, self.os_session)
876 self.keystone = keystone_utils.keystone_client(
877 self.os_creds, self.os_session)
881 Cleans the remote OpenStack objects
883 for rule in self.security_group_rules:
884 neutron_utils.delete_security_group_rule(self.neutron, rule)
886 for security_group in self.security_groups:
888 neutron_utils.delete_security_group(self.neutron,
893 super(self.__class__, self).__clean__()
895 def test_create_delete_simple_sec_grp(self):
897 Tests the neutron_utils.create_security_group() function
899 sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
900 security_group = neutron_utils.create_security_group(
901 self.neutron, self.keystone, sec_grp_settings)
903 self.assertTrue(sec_grp_settings.name, security_group.name)
905 sec_grp_get = neutron_utils.get_security_group(
906 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
907 self.assertIsNotNone(sec_grp_get)
908 self.assertTrue(validation_utils.objects_equivalent(
909 security_group, sec_grp_get))
911 neutron_utils.delete_security_group(self.neutron, security_group)
912 sec_grp_get = neutron_utils.get_security_group(
913 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
914 self.assertIsNone(sec_grp_get)
916 def test_create_sec_grp_no_name(self):
918 Tests the SecurityGroupConfig constructor and
919 neutron_utils.create_security_group() function to ensure that
920 attempting to create a security group without a name will raise an
923 with self.assertRaises(Exception):
924 sec_grp_settings = SecurityGroupConfig()
925 self.security_groups.append(
926 neutron_utils.create_security_group(
927 self.neutron, self.keystone, sec_grp_settings))
929 def test_create_sec_grp_no_rules(self):
931 Tests the neutron_utils.create_security_group() function
933 sec_grp_settings = SecurityGroupConfig(
934 name=self.sec_grp_name, description='hello group')
935 self.security_groups.append(
936 neutron_utils.create_security_group(
937 self.neutron, self.keystone, sec_grp_settings))
939 self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
940 self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
942 sec_grp_get = neutron_utils.get_security_group(
943 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
944 self.assertIsNotNone(sec_grp_get)
945 self.assertEqual(self.security_groups[0], sec_grp_get)
947 def test_create_sec_grp_one_rule(self):
949 Tests the neutron_utils.create_security_group() function
952 sec_grp_rule_settings = SecurityGroupRuleConfig(
953 sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
954 sec_grp_settings = SecurityGroupConfig(
955 name=self.sec_grp_name, description='hello group',
956 rule_settings=[sec_grp_rule_settings])
958 self.security_groups.append(
959 neutron_utils.create_security_group(
960 self.neutron, self.keystone, sec_grp_settings))
961 free_rules = neutron_utils.get_rules_by_security_group(
962 self.neutron, self.security_groups[0])
963 for free_rule in free_rules:
964 self.security_group_rules.append(free_rule)
966 keystone = keystone_utils.keystone_client(
967 self.os_creds, self.os_session)
968 self.security_group_rules.append(
969 neutron_utils.create_security_group_rule(
970 self.neutron, keystone, sec_grp_settings.rule_settings[0],
971 self.os_creds.project_name))
973 # Refresh object so it is populated with the newly added rule
974 security_group = neutron_utils.get_security_group(
975 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
977 rules = neutron_utils.get_rules_by_security_group(
978 self.neutron, security_group)
981 validation_utils.objects_equivalent(
982 self.security_group_rules, rules))
984 self.assertTrue(sec_grp_settings.name, security_group.name)
986 sec_grp_get = neutron_utils.get_security_group(
987 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
988 self.assertIsNotNone(sec_grp_get)
989 self.assertEqual(security_group, sec_grp_get)
991 def test_get_sec_grp_by_id(self):
993 Tests the neutron_utils.create_security_group() function
996 self.security_groups.append(neutron_utils.create_security_group(
997 self.neutron, self.keystone,
999 name=self.sec_grp_name + '-1', description='hello group')))
1000 self.security_groups.append(neutron_utils.create_security_group(
1001 self.neutron, self.keystone,
1002 SecurityGroupConfig(
1003 name=self.sec_grp_name + '-2', description='hello group')))
1005 sec_grp_1b = neutron_utils.get_security_group_by_id(
1006 self.neutron, self.security_groups[0].id)
1007 sec_grp_2b = neutron_utils.get_security_group_by_id(
1008 self.neutron, self.security_groups[1].id)
1010 self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
1011 self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
1014 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
1016 Test basic nova keypair functionality
1021 Instantiates the CreateImage object that is responsible for downloading
1022 and creating an OS image file within OpenStack
1024 self.neutron = neutron_utils.neutron_client(
1025 self.os_creds, self.os_session)
1026 self.keystone = keystone_utils.keystone_client(
1027 self.os_creds, self.os_session)
1028 self.floating_ip = None
1032 Cleans the image and downloaded image file
1034 if self.floating_ip:
1036 neutron_utils.delete_floating_ip(
1037 self.neutron, self.floating_ip)
1041 super(self.__class__, self).__clean__()
1043 def test_floating_ips(self):
1045 Tests the creation of a floating IP
1048 initial_fips = neutron_utils.get_floating_ips(self.neutron)
1050 self.floating_ip = neutron_utils.create_floating_ip(
1051 self.neutron, self.keystone, self.ext_net_name)
1052 all_fips = neutron_utils.get_floating_ips(self.neutron)
1053 self.assertEqual(len(initial_fips) + 1, len(all_fips))
1054 returned = neutron_utils.get_floating_ip(self.neutron,
1056 self.assertEqual(self.floating_ip.id, returned.id)
1057 self.assertEqual(self.floating_ip.ip, returned.ip)
1065 def validate_network(neutron, keystone, name, exists, project_name):
1067 Returns true if a network for a given name DOES NOT exist if the exists
1068 parameter is false conversely true. Returns false if a network for a given
1069 name DOES exist if the exists parameter is true conversely false.
1070 :param neutron: The neutron client
1071 :param keystone: The keystone client
1072 :param name: The expected network name
1073 :param exists: Whether or not the network name should exist or not
1074 :param project_name: the associated project name
1077 network = neutron_utils.get_network(
1078 neutron, keystone, network_name=name, project_name=project_name)
1079 if exists and network:
1081 if not exists and not network:
1086 def validate_subnet(neutron, name, cidr, exists):
1088 Returns true if a subnet for a given name DOES NOT exist if the exists
1089 parameter is false conversely true. Returns false if a subnet for a given
1090 name DOES exist if the exists parameter is true conversely false.
1091 :param neutron: The neutron client
1092 :param name: The expected subnet name
1093 :param cidr: The expected CIDR value
1094 :param exists: Whether or not the network name should exist or not
1097 subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
1098 if exists and subnet and subnet.name == name:
1099 return subnet.cidr == cidr
1100 if not exists and not subnet:
1105 def validate_router(neutron, keystone, name, project_name, exists):
1107 Returns true if a router for a given name DOES NOT exist if the exists
1108 parameter is false conversely true. Returns false if a router for a given
1109 name DOES exist if the exists parameter is true conversely false.
1110 :param neutron: The neutron client
1111 :param keystone: The keystone client
1112 :param name: The expected router name
1113 :param project_name: The name of the project in which the router should
1115 :param exists: Whether or not the network name should exist or not
1118 router = neutron_utils.get_router(
1119 neutron, keystone, router_name=name, project_name=project_name)
1120 if exists and router:
1125 def validate_interface_router(interface_router, router, subnet):
1127 Returns true if the router ID & subnet ID have been properly included into
1128 the interface router object
1129 :param interface_router: the SNAPS-OO InterfaceRouter domain object
1130 :param router: to validate against the interface_router
1131 :param subnet: to validate against the interface_router
1132 :return: True if both IDs match else False
1134 subnet_id = interface_router.subnet_id
1135 router_id = interface_router.port_id
1137 return subnet.id == subnet_id and router.id == router_id
1140 def validate_port(neutron, port_obj, this_port_name):
1142 Returns true if a port for a given name DOES NOT exist if the exists
1143 parameter is false conversely true. Returns false if a port for a given
1144 name DOES exist if the exists parameter is true conversely false.
1145 :param neutron: The neutron client
1146 :param port_obj: The port object to lookup
1147 :param this_port_name: The expected router name
1150 os_ports = neutron.list_ports()
1151 for os_port, os_port_insts in os_ports.items():
1152 for os_inst in os_port_insts:
1153 if os_inst['id'] == port_obj.id:
1154 return os_inst['name'] == this_port_name