1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from neutronclient.common.exceptions import NotFound, BadRequest
19 from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
20 from snaps.config.security_group import (
21 SecurityGroupConfig, SecurityGroupRuleConfig, Direction)
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
29 __author__ = 'spisarski'
35 class NeutronSmokeTests(OSComponentTestCase):
37 Tests to ensure that the neutron client can communicate with the cloud
40 def test_neutron_connect_success(self):
42 Tests to ensure that the proper credentials can connect.
44 neutron = neutron_utils.neutron_client(self.os_creds)
46 networks = neutron.list_networks()
49 networks = networks.get('networks')
50 for network in networks:
51 if network.get('name') == self.ext_net_name:
53 self.assertTrue(found)
55 def test_neutron_connect_fail(self):
57 Tests to ensure that the improper credentials cannot connect.
59 from snaps.openstack.os_credentials import OSCreds
61 with self.assertRaises(Exception):
62 neutron = neutron_utils.neutron_client(
63 OSCreds(username='user', password='pass', auth_url='url',
64 project_name='project'))
65 neutron.list_networks()
67 def test_retrieve_ext_network_name(self):
69 Tests the neutron_utils.get_external_network_names to ensure the
70 configured self.ext_net_name is contained within the returned list
73 neutron = neutron_utils.neutron_client(self.os_creds)
74 ext_networks = neutron_utils.get_external_networks(neutron)
76 for network in ext_networks:
77 if network.name == self.ext_net_name:
80 self.assertTrue(found)
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
85 Test for creating networks via neutron_utils.py
89 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90 self.port_name = str(guid) + '-port'
91 self.neutron = neutron_utils.neutron_client(self.os_creds)
92 self.keystone = keystone_utils.keystone_client(self.os_creds)
94 self.net_config = openstack_tests.get_pub_net_config(
95 net_name=guid + '-pub-net')
99 Cleans the remote OpenStack objects
102 neutron_utils.delete_network(self.neutron, self.network)
104 def test_create_network(self):
106 Tests the neutron_utils.create_network() function
108 self.network = neutron_utils.create_network(
109 self.neutron, self.os_creds, self.net_config.network_settings)
110 self.assertEqual(self.net_config.network_settings.name,
112 self.assertTrue(validate_network(
113 self.neutron, self.keystone,
114 self.net_config.network_settings.name, True,
115 self.os_creds.project_name))
116 self.assertEqual(len(self.net_config.network_settings.subnet_settings),
117 len(self.network.subnets))
119 def test_create_network_empty_name(self):
121 Tests the neutron_utils.create_network() function with an empty
124 with self.assertRaises(Exception):
125 self.network = neutron_utils.create_network(
126 self.neutron, self.os_creds,
127 network_settings=NetworkConfig(name=''))
129 def test_create_network_null_name(self):
131 Tests the neutron_utils.create_network() function when the network
134 with self.assertRaises(Exception):
135 self.network = neutron_utils.create_network(
136 self.neutron, self.os_creds,
137 network_settings=NetworkConfig())
140 class NeutronUtilsSubnetTests(OSComponentTestCase):
142 Test for creating networks with subnets via neutron_utils.py
146 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
147 self.port_name = str(guid) + '-port'
148 self.neutron = neutron_utils.neutron_client(self.os_creds)
149 self.keystone = keystone_utils.keystone_client(self.os_creds)
151 self.net_config = openstack_tests.get_pub_net_config(
152 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
153 external_net=self.ext_net_name)
157 Cleans the remote OpenStack objects
161 neutron_utils.delete_network(self.neutron, self.network)
165 def test_create_subnet(self):
167 Tests the neutron_utils.create_network() function
169 self.network = neutron_utils.create_network(
170 self.neutron, self.os_creds, self.net_config.network_settings)
171 self.assertEqual(self.net_config.network_settings.name,
173 self.assertTrue(validate_network(
174 self.neutron, self.keystone,
175 self.net_config.network_settings.name, True,
176 self.os_creds.project_name))
178 subnet_setting = self.net_config.network_settings.subnet_settings[0]
179 self.assertTrue(validate_subnet(
180 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
182 subnet_query1 = neutron_utils.get_subnet(
183 self.neutron, subnet_name=subnet_setting.name)
184 self.assertEqual(self.network.subnets[0], subnet_query1)
186 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
188 self.assertIsNotNone(subnet_query2)
189 self.assertEqual(1, len(subnet_query2))
190 self.assertEqual(self.network.subnets[0], subnet_query2[0])
192 def test_create_subnet_null_name(self):
194 Tests the neutron_utils.create_neutron_subnet() function for an
195 Exception when the subnet name is None
197 self.network = neutron_utils.create_network(
198 self.neutron, self.os_creds, self.net_config.network_settings)
199 self.assertEqual(self.net_config.network_settings.name,
201 self.assertTrue(validate_network(
202 self.neutron, self.keystone,
203 self.net_config.network_settings.name, True,
204 self.os_creds.project_name))
206 with self.assertRaises(Exception):
207 SubnetConfig(cidr=self.net_config.subnet_cidr)
209 def test_create_subnet_empty_name(self):
211 Tests the neutron_utils.create_network() function with an empty
214 self.network = neutron_utils.create_network(
215 self.neutron, self.os_creds, self.net_config.network_settings)
216 self.assertEqual(self.net_config.network_settings.name,
218 self.assertTrue(validate_network(
219 self.neutron, self.keystone,
220 self.net_config.network_settings.name, True,
221 self.os_creds.project_name))
223 subnet_setting = self.net_config.network_settings.subnet_settings[0]
224 self.assertTrue(validate_subnet(
225 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
226 self.assertFalse(validate_subnet(
227 self.neutron, '', subnet_setting.cidr, True))
229 subnet_query1 = neutron_utils.get_subnet(
230 self.neutron, subnet_name=subnet_setting.name)
231 self.assertEqual(self.network.subnets[0], subnet_query1)
233 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
235 self.assertIsNotNone(subnet_query2)
236 self.assertEqual(1, len(subnet_query2))
237 self.assertEqual(self.network.subnets[0], subnet_query2[0])
239 def test_create_subnet_null_cidr(self):
241 Tests the neutron_utils.create_neutron_subnet() function for an
242 Exception when the subnet CIDR value is None
244 self.net_config.network_settings.subnet_settings[0].cidr = None
245 with self.assertRaises(Exception):
246 self.network = neutron_utils.create_network(
247 self.neutron, self.os_creds, self.net_config.network_settings)
249 def test_create_subnet_empty_cidr(self):
251 Tests the neutron_utils.create_neutron_subnet() function for an
252 Exception when the subnet CIDR value is empty
254 self.net_config.network_settings.subnet_settings[0].cidr = ''
255 with self.assertRaises(Exception):
256 self.network = neutron_utils.create_network(
257 self.neutron, self.os_creds, self.net_config.network_settings)
260 class NeutronUtilsIPv6Tests(OSComponentTestCase):
262 Test for creating IPv6 networks with subnets via neutron_utils.py
266 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
267 self.neutron = neutron_utils.neutron_client(self.os_creds)
272 Cleans the remote OpenStack objects
276 neutron_utils.delete_network(self.neutron, self.network)
280 def test_create_network_slaac(self):
282 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
283 is True and IPv6 modes are slaac
285 sub_setting = SubnetConfig(
286 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
287 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
288 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
289 enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
290 self.network_settings = NetworkConfig(
291 name=self.guid + '-net', subnet_settings=[sub_setting])
293 self.network = neutron_utils.create_network(
294 self.neutron, self.os_creds, self.network_settings)
295 self.assertEqual(self.network_settings.name, self.network.name)
297 subnet_settings = self.network_settings.subnet_settings[0]
298 self.assertEqual(1, len(self.network.subnets))
299 subnet = self.network.subnets[0]
301 self.assertEqual(self.network.id, subnet.network_id)
302 self.assertEqual(subnet_settings.name, subnet.name)
303 self.assertEqual(subnet_settings.start, subnet.start)
304 self.assertEqual(subnet_settings.end, subnet.end)
305 self.assertEqual('1:1::/64', subnet.cidr)
306 self.assertEqual(6, subnet.ip_version)
307 self.assertEqual(1, len(subnet.dns_nameservers))
309 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
310 self.assertTrue(subnet.enable_dhcp)
312 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
314 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
316 def test_create_network_stateful(self):
318 Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
319 is True and IPv6 modes are stateful
321 sub_setting = SubnetConfig(
322 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
323 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
324 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
325 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
326 ipv6_address_mode='dhcpv6-stateful')
327 self.network_settings = NetworkConfig(
328 name=self.guid + '-net', subnet_settings=[sub_setting])
330 self.network = neutron_utils.create_network(
331 self.neutron, self.os_creds, self.network_settings)
333 self.assertEqual(self.network_settings.name, self.network.name)
335 subnet_settings = self.network_settings.subnet_settings[0]
336 self.assertEqual(1, len(self.network.subnets))
337 subnet = self.network.subnets[0]
339 self.assertEqual(self.network.id, subnet.network_id)
340 self.assertEqual(subnet_settings.name, subnet.name)
341 self.assertEqual(subnet_settings.start, subnet.start)
342 self.assertEqual(subnet_settings.end, subnet.end)
343 self.assertEqual('1:1::/64', subnet.cidr)
344 self.assertEqual(6, subnet.ip_version)
345 self.assertEqual(1, len(subnet.dns_nameservers))
347 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
348 self.assertTrue(subnet.enable_dhcp)
350 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
352 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
354 def test_create_network_stateless(self):
356 Tests the neutron_utils.create_network() when DHCP is enabled and
357 the RA and address modes are both 'slaac'
359 sub_setting = SubnetConfig(
360 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
361 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
362 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
363 enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
364 ipv6_address_mode='dhcpv6-stateless')
365 self.network_settings = NetworkConfig(
366 name=self.guid + '-net', subnet_settings=[sub_setting])
368 self.network = neutron_utils.create_network(
369 self.neutron, self.os_creds, self.network_settings)
371 self.assertEqual(self.network_settings.name, self.network.name)
373 subnet_settings = self.network_settings.subnet_settings[0]
374 self.assertEqual(1, len(self.network.subnets))
375 subnet = self.network.subnets[0]
377 self.assertEqual(self.network.id, subnet.network_id)
378 self.assertEqual(subnet_settings.name, subnet.name)
379 self.assertEqual(subnet_settings.start, subnet.start)
380 self.assertEqual(subnet_settings.end, subnet.end)
381 self.assertEqual('1:1::/64', subnet.cidr)
382 self.assertEqual(6, subnet.ip_version)
383 self.assertEqual(1, len(subnet.dns_nameservers))
385 sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
386 self.assertTrue(subnet.enable_dhcp)
388 subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
390 subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
392 def test_create_network_no_dhcp_slaac(self):
394 Tests the neutron_utils.create_network() for a BadRequest when
395 DHCP is not enabled and the RA and address modes are both 'slaac'
397 sub_setting = SubnetConfig(
398 name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
399 ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
400 gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
401 enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
402 self.network_settings = NetworkConfig(
403 name=self.guid + '-net', subnet_settings=[sub_setting])
405 with self.assertRaises(BadRequest):
406 self.network = neutron_utils.create_network(
407 self.neutron, self.os_creds, self.network_settings)
409 def test_create_network_invalid_start_ip(self):
411 Tests the neutron_utils.create_network() that contains one IPv6 subnet
412 with an invalid start IP to ensure Neutron assigns it the smallest IP
415 sub_setting = SubnetConfig(
416 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
418 self.network_settings = NetworkConfig(
419 name=self.guid + '-net', subnet_settings=[sub_setting])
421 self.network = neutron_utils.create_network(
422 self.neutron, self.os_creds, self.network_settings)
424 self.assertEqual('1:1::2', self.network.subnets[0].start)
426 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
428 def test_create_network_invalid_end_ip(self):
430 Tests the neutron_utils.create_network() that contains one IPv6 subnet
431 with an invalid end IP to ensure Neutron assigns it the largest IP
434 sub_setting = SubnetConfig(
435 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
437 self.network_settings = NetworkConfig(
438 name=self.guid + '-net', subnet_settings=[sub_setting])
440 self.network = neutron_utils.create_network(
441 self.neutron, self.os_creds, self.network_settings)
443 self.assertEqual('1:1::2', self.network.subnets[0].start)
445 '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
447 def test_create_network_with_bad_cidr(self):
449 Tests the neutron_utils.create_network() for a BadRequest when
450 the subnet CIDR is invalid
452 sub_setting = SubnetConfig(
453 name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
454 self.network_settings = NetworkConfig(
455 name=self.guid + '-net', subnet_settings=[sub_setting])
457 with self.assertRaises(BadRequest):
458 self.network = neutron_utils.create_network(
459 self.neutron, self.os_creds, self.network_settings)
461 def test_create_network_invalid_gateway_ip(self):
463 Tests the neutron_utils.create_network() for a BadRequest when
464 the subnet gateway IP is invalid
466 sub_setting = SubnetConfig(
467 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
468 gateway_ip='192.168.0.1')
469 self.network_settings = NetworkConfig(
470 name=self.guid + '-net', subnet_settings=[sub_setting])
472 with self.assertRaises(BadRequest):
473 self.network = neutron_utils.create_network(
474 self.neutron, self.os_creds, self.network_settings)
476 def test_create_network_with_bad_dns(self):
478 Tests the neutron_utils.create_network() for a BadRequest when
479 the DNS IP is invalid
481 sub_setting = SubnetConfig(
482 name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
483 dns_nameservers=['foo'])
484 self.network_settings = NetworkConfig(
485 name=self.guid + '-net', subnet_settings=[sub_setting])
487 with self.assertRaises(BadRequest):
488 self.network = neutron_utils.create_network(
489 self.neutron, self.os_creds, self.network_settings)
492 class NeutronUtilsRouterTests(OSComponentTestCase):
494 Test for creating routers via neutron_utils.py
498 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
499 self.port_name = str(guid) + '-port'
500 self.neutron = neutron_utils.neutron_client(self.os_creds)
501 self.keystone = keystone_utils.keystone_client(self.os_creds)
505 self.interface_router = None
506 self.net_config = openstack_tests.get_pub_net_config(
507 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
508 router_name=guid + '-pub-router', external_net=self.ext_net_name)
512 Cleans the remote OpenStack objects
514 if self.interface_router:
515 neutron_utils.remove_interface_router(
516 self.neutron, self.router, self.network.subnets[0])
520 neutron_utils.delete_router(self.neutron, self.router)
522 self.neutron, self.keystone, self.router.name,
523 self.os_creds.project_name, False)
529 neutron_utils.delete_port(self.neutron, self.port)
534 neutron_utils.delete_network(self.neutron, self.network)
536 def test_create_router_simple(self):
538 Tests the neutron_utils.create_router()
540 self.router = neutron_utils.create_router(
541 self.neutron, self.os_creds, self.net_config.router_settings)
543 self.neutron, self.keystone, self.net_config.router_settings.name,
544 self.os_creds.project_name, True)
546 def test_create_router_with_public_interface(self):
548 Tests the neutron_utils.create_router() function with a pubic interface
550 subnet_setting = self.net_config.network_settings.subnet_settings[0]
551 self.net_config = openstack_tests.OSNetworkConfig(
552 self.net_config.network_settings.name, subnet_setting.name,
553 subnet_setting.cidr, self.net_config.router_settings.name,
555 self.router = neutron_utils.create_router(
556 self.neutron, self.os_creds, self.net_config.router_settings)
558 self.neutron, self.keystone, self.net_config.router_settings.name,
559 self.os_creds.project_name, True)
561 ext_net = neutron_utils.get_network(
562 self.neutron, self.keystone, network_name=self.ext_net_name)
563 self.assertEqual(self.router.external_network_id, ext_net.id)
565 def test_add_interface_router(self):
567 Tests the neutron_utils.add_interface_router() function
569 self.network = neutron_utils.create_network(
570 self.neutron, self.os_creds, self.net_config.network_settings)
571 self.assertEqual(self.net_config.network_settings.name,
573 self.assertTrue(validate_network(
574 self.neutron, self.keystone,
575 self.net_config.network_settings.name, True,
576 self.os_creds.project_name))
578 subnet_setting = self.net_config.network_settings.subnet_settings[0]
579 self.assertTrue(validate_subnet(
580 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
582 self.router = neutron_utils.create_router(
583 self.neutron, self.os_creds, self.net_config.router_settings)
585 self.neutron, self.keystone, self.net_config.router_settings.name,
586 self.os_creds.project_name, True)
588 self.interface_router = neutron_utils.add_interface_router(
589 self.neutron, self.router, self.network.subnets[0])
590 validate_interface_router(self.interface_router, self.router,
591 self.network.subnets[0])
593 def test_add_interface_router_null_router(self):
595 Tests the neutron_utils.add_interface_router() function for an
596 Exception when the router value is None
598 self.network = neutron_utils.create_network(
599 self.neutron, self.os_creds, self.net_config.network_settings)
600 self.assertEqual(self.net_config.network_settings.name,
602 self.assertTrue(validate_network(
603 self.neutron, self.keystone,
604 self.net_config.network_settings.name, True,
605 self.os_creds.project_name))
607 subnet_setting = self.net_config.network_settings.subnet_settings[0]
608 self.assertTrue(validate_subnet(
609 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
611 with self.assertRaises(NeutronException):
612 self.interface_router = neutron_utils.add_interface_router(
613 self.neutron, self.router, self.network.subnets[0])
615 def test_add_interface_router_null_subnet(self):
617 Tests the neutron_utils.add_interface_router() function for an
618 Exception when the subnet value is None
620 self.network = neutron_utils.create_network(
621 self.neutron, self.os_creds, self.net_config.network_settings)
622 self.assertEqual(self.net_config.network_settings.name,
624 self.assertTrue(validate_network(
625 self.neutron, self.keystone,
626 self.net_config.network_settings.name, True,
627 self.os_creds.project_name))
629 self.router = neutron_utils.create_router(
630 self.neutron, self.os_creds, self.net_config.router_settings)
632 self.neutron, self.keystone, self.net_config.router_settings.name,
633 self.os_creds.project_name, True)
635 with self.assertRaises(NeutronException):
636 self.interface_router = neutron_utils.add_interface_router(
637 self.neutron, self.router, None)
639 def test_add_interface_router_missing_subnet(self):
641 Tests the neutron_utils.add_interface_router() function for an
642 Exception when the subnet object has been deleted
644 self.network = neutron_utils.create_network(
645 self.neutron, self.os_creds, self.net_config.network_settings)
646 self.assertEqual(self.net_config.network_settings.name,
648 self.assertTrue(validate_network(
649 self.neutron, self.keystone,
650 self.net_config.network_settings.name, True,
651 self.os_creds.project_name))
653 self.router = neutron_utils.create_router(
654 self.neutron, self.os_creds, self.net_config.router_settings)
656 self.neutron, self.keystone, self.net_config.router_settings.name,
657 self.os_creds.project_name, True)
659 for subnet in self.network.subnets:
660 neutron_utils.delete_subnet(self.neutron, subnet)
662 with self.assertRaises(NotFound):
663 self.interface_router = neutron_utils.add_interface_router(
664 self.neutron, self.router, self.network.subnets[0])
666 def test_create_port(self):
668 Tests the neutron_utils.create_port() function
670 self.network = neutron_utils.create_network(
671 self.neutron, self.os_creds, self.net_config.network_settings)
672 self.assertEqual(self.net_config.network_settings.name,
674 self.assertTrue(validate_network(
675 self.neutron, self.keystone,
676 self.net_config.network_settings.name, True,
677 self.os_creds.project_name))
679 subnet_setting = self.net_config.network_settings.subnet_settings[0]
680 self.assertTrue(validate_subnet(
681 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
683 self.port = neutron_utils.create_port(
684 self.neutron, self.os_creds, PortConfig(
687 'subnet_name': subnet_setting.name,
689 network_name=self.net_config.network_settings.name))
690 validate_port(self.neutron, self.port, self.port_name)
692 def test_create_port_empty_name(self):
694 Tests the neutron_utils.create_port() function
696 self.network = neutron_utils.create_network(
697 self.neutron, self.os_creds, self.net_config.network_settings)
698 self.assertEqual(self.net_config.network_settings.name,
700 self.assertTrue(validate_network(
701 self.neutron, self.keystone,
702 self.net_config.network_settings.name, True,
703 self.os_creds.project_name))
705 subnet_setting = self.net_config.network_settings.subnet_settings[0]
706 self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
707 subnet_setting.cidr, True))
709 self.port = neutron_utils.create_port(
710 self.neutron, self.os_creds, PortConfig(
712 network_name=self.net_config.network_settings.name,
714 'subnet_name': subnet_setting.name,
716 validate_port(self.neutron, self.port, self.port_name)
718 def test_create_port_null_name(self):
720 Tests the neutron_utils.create_port() when the port name value is None
722 self.network = neutron_utils.create_network(
723 self.neutron, self.os_creds, self.net_config.network_settings)
724 self.assertEqual(self.net_config.network_settings.name,
726 self.assertTrue(validate_network(
727 self.neutron, self.keystone,
728 self.net_config.network_settings.name, True,
729 self.os_creds.project_name))
731 subnet_setting = self.net_config.network_settings.subnet_settings[0]
732 self.assertTrue(validate_subnet(
733 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
735 self.port = neutron_utils.create_port(
736 self.neutron, self.os_creds,
738 network_name=self.net_config.network_settings.name,
740 'subnet_name': subnet_setting.name,
743 port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
744 self.assertEqual(self.port, port)
746 def test_create_port_null_network_object(self):
748 Tests the neutron_utils.create_port() function for an Exception when
749 the network object is None
751 with self.assertRaises(Exception):
752 self.port = neutron_utils.create_port(
753 self.neutron, self.os_creds,
756 network_name=self.net_config.network_settings.name,
759 self.net_config.network_settings.subnet_settings[
763 def test_create_port_null_ip(self):
765 Tests the neutron_utils.create_port() function for an Exception when
768 self.network = neutron_utils.create_network(
769 self.neutron, self.os_creds, self.net_config.network_settings)
770 self.assertEqual(self.net_config.network_settings.name,
772 self.assertTrue(validate_network(
773 self.neutron, self.keystone,
774 self.net_config.network_settings.name, True,
775 self.os_creds.project_name))
777 subnet_setting = self.net_config.network_settings.subnet_settings[0]
778 self.assertTrue(validate_subnet(
779 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
781 with self.assertRaises(Exception):
782 self.port = neutron_utils.create_port(
783 self.neutron, self.os_creds,
786 network_name=self.net_config.network_settings.name,
788 'subnet_name': subnet_setting.name,
791 def test_create_port_invalid_ip(self):
793 Tests the neutron_utils.create_port() function for an Exception when
796 self.network = neutron_utils.create_network(
797 self.neutron, self.os_creds, self.net_config.network_settings)
798 self.assertEqual(self.net_config.network_settings.name,
800 self.assertTrue(validate_network(
801 self.neutron, self.keystone,
802 self.net_config.network_settings.name, True,
803 self.os_creds.project_name))
805 subnet_setting = self.net_config.network_settings.subnet_settings[0]
806 self.assertTrue(validate_subnet(
807 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
809 with self.assertRaises(Exception):
810 self.port = neutron_utils.create_port(
811 self.neutron, self.os_creds,
814 network_name=self.net_config.network_settings.name,
816 'subnet_name': subnet_setting.name,
819 def test_create_port_invalid_ip_to_subnet(self):
821 Tests the neutron_utils.create_port() function for an Exception when
824 self.network = neutron_utils.create_network(
825 self.neutron, self.os_creds, self.net_config.network_settings)
826 self.assertEqual(self.net_config.network_settings.name,
828 self.assertTrue(validate_network(
829 self.neutron, self.keystone,
830 self.net_config.network_settings.name, True,
831 self.os_creds.project_name))
833 subnet_setting = self.net_config.network_settings.subnet_settings[0]
834 self.assertTrue(validate_subnet(
835 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
837 with self.assertRaises(Exception):
838 self.port = neutron_utils.create_port(
839 self.neutron, self.os_creds,
842 network_name=self.net_config.network_settings.name,
844 'subnet_name': subnet_setting.name,
845 'ip': '10.197.123.100'}]))
848 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
850 Test for creating security groups via neutron_utils.py
854 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
855 self.sec_grp_name = guid + 'name'
857 self.security_groups = list()
858 self.security_group_rules = list()
859 self.neutron = neutron_utils.neutron_client(self.os_creds)
860 self.keystone = keystone_utils.keystone_client(self.os_creds)
864 Cleans the remote OpenStack objects
866 for rule in self.security_group_rules:
867 neutron_utils.delete_security_group_rule(self.neutron, rule)
869 for security_group in self.security_groups:
871 neutron_utils.delete_security_group(self.neutron,
876 def test_create_delete_simple_sec_grp(self):
878 Tests the neutron_utils.create_security_group() function
880 sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
881 security_group = neutron_utils.create_security_group(
882 self.neutron, self.keystone, sec_grp_settings)
884 self.assertTrue(sec_grp_settings.name, security_group.name)
886 sec_grp_get = neutron_utils.get_security_group(
887 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
888 self.assertIsNotNone(sec_grp_get)
889 self.assertTrue(validation_utils.objects_equivalent(
890 security_group, sec_grp_get))
892 neutron_utils.delete_security_group(self.neutron, security_group)
893 sec_grp_get = neutron_utils.get_security_group(
894 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
895 self.assertIsNone(sec_grp_get)
897 def test_create_sec_grp_no_name(self):
899 Tests the SecurityGroupConfig constructor and
900 neutron_utils.create_security_group() function to ensure that
901 attempting to create a security group without a name will raise an
904 with self.assertRaises(Exception):
905 sec_grp_settings = SecurityGroupConfig()
906 self.security_groups.append(
907 neutron_utils.create_security_group(
908 self.neutron, self.keystone, sec_grp_settings))
910 def test_create_sec_grp_no_rules(self):
912 Tests the neutron_utils.create_security_group() function
914 sec_grp_settings = SecurityGroupConfig(
915 name=self.sec_grp_name, description='hello group')
916 self.security_groups.append(
917 neutron_utils.create_security_group(
918 self.neutron, self.keystone, sec_grp_settings))
920 self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
921 self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
923 sec_grp_get = neutron_utils.get_security_group(
924 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
925 self.assertIsNotNone(sec_grp_get)
926 self.assertEqual(self.security_groups[0], sec_grp_get)
928 def test_create_sec_grp_one_rule(self):
930 Tests the neutron_utils.create_security_group() function
933 sec_grp_rule_settings = SecurityGroupRuleConfig(
934 sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
935 sec_grp_settings = SecurityGroupConfig(
936 name=self.sec_grp_name, description='hello group',
937 rule_settings=[sec_grp_rule_settings])
939 self.security_groups.append(
940 neutron_utils.create_security_group(
941 self.neutron, self.keystone, sec_grp_settings))
942 free_rules = neutron_utils.get_rules_by_security_group(
943 self.neutron, self.security_groups[0])
944 for free_rule in free_rules:
945 self.security_group_rules.append(free_rule)
947 keystone = keystone_utils.keystone_client(self.os_creds)
948 self.security_group_rules.append(
949 neutron_utils.create_security_group_rule(
950 self.neutron, keystone, sec_grp_settings.rule_settings[0],
951 self.os_creds.project_name))
953 # Refresh object so it is populated with the newly added rule
954 security_group = neutron_utils.get_security_group(
955 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
957 rules = neutron_utils.get_rules_by_security_group(
958 self.neutron, security_group)
961 validation_utils.objects_equivalent(
962 self.security_group_rules, rules))
964 self.assertTrue(sec_grp_settings.name, security_group.name)
966 sec_grp_get = neutron_utils.get_security_group(
967 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
968 self.assertIsNotNone(sec_grp_get)
969 self.assertEqual(security_group, sec_grp_get)
971 def test_get_sec_grp_by_id(self):
973 Tests the neutron_utils.create_security_group() function
976 self.security_groups.append(neutron_utils.create_security_group(
977 self.neutron, self.keystone,
979 name=self.sec_grp_name + '-1', description='hello group')))
980 self.security_groups.append(neutron_utils.create_security_group(
981 self.neutron, self.keystone,
983 name=self.sec_grp_name + '-2', description='hello group')))
985 sec_grp_1b = neutron_utils.get_security_group_by_id(
986 self.neutron, self.security_groups[0].id)
987 sec_grp_2b = neutron_utils.get_security_group_by_id(
988 self.neutron, self.security_groups[1].id)
990 self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
991 self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
994 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
996 Test basic nova keypair functionality
1001 Instantiates the CreateImage object that is responsible for downloading
1002 and creating an OS image file within OpenStack
1004 self.neutron = neutron_utils.neutron_client(self.os_creds)
1005 self.keystone = keystone_utils.keystone_client(self.os_creds)
1006 self.floating_ip = None
1010 Cleans the image and downloaded image file
1012 if self.floating_ip:
1014 neutron_utils.delete_floating_ip(
1015 self.neutron, self.floating_ip)
1019 def test_floating_ips(self):
1021 Tests the creation of a floating IP
1024 initial_fips = neutron_utils.get_floating_ips(self.neutron)
1026 self.floating_ip = neutron_utils.create_floating_ip(
1027 self.neutron, self.keystone, self.ext_net_name)
1028 all_fips = neutron_utils.get_floating_ips(self.neutron)
1029 self.assertEqual(len(initial_fips) + 1, len(all_fips))
1030 returned = neutron_utils.get_floating_ip(self.neutron,
1032 self.assertEqual(self.floating_ip.id, returned.id)
1033 self.assertEqual(self.floating_ip.ip, returned.ip)
1041 def validate_network(neutron, keystone, name, exists, project_name):
1043 Returns true if a network for a given name DOES NOT exist if the exists
1044 parameter is false conversely true. Returns false if a network for a given
1045 name DOES exist if the exists parameter is true conversely false.
1046 :param neutron: The neutron client
1047 :param keystone: The keystone client
1048 :param name: The expected network name
1049 :param exists: Whether or not the network name should exist or not
1050 :param project_name: the associated project name
1053 network = neutron_utils.get_network(
1054 neutron, keystone, network_name=name, project_name=project_name)
1055 if exists and network:
1057 if not exists and not network:
1062 def validate_subnet(neutron, name, cidr, exists):
1064 Returns true if a subnet for a given name DOES NOT exist if the exists
1065 parameter is false conversely true. Returns false if a subnet for a given
1066 name DOES exist if the exists parameter is true conversely false.
1067 :param neutron: The neutron client
1068 :param name: The expected subnet name
1069 :param cidr: The expected CIDR value
1070 :param exists: Whether or not the network name should exist or not
1073 subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
1074 if exists and subnet and subnet.name == name:
1075 return subnet.cidr == cidr
1076 if not exists and not subnet:
1081 def validate_router(neutron, keystone, name, project_name, exists):
1083 Returns true if a router for a given name DOES NOT exist if the exists
1084 parameter is false conversely true. Returns false if a router for a given
1085 name DOES exist if the exists parameter is true conversely false.
1086 :param neutron: The neutron client
1087 :param keystone: The keystone client
1088 :param name: The expected router name
1089 :param project_name: The name of the project in which the router should
1091 :param exists: Whether or not the network name should exist or not
1094 router = neutron_utils.get_router(
1095 neutron, keystone, router_name=name, project_name=project_name)
1096 if exists and router:
1101 def validate_interface_router(interface_router, router, subnet):
1103 Returns true if the router ID & subnet ID have been properly included into
1104 the interface router object
1105 :param interface_router: the SNAPS-OO InterfaceRouter domain object
1106 :param router: to validate against the interface_router
1107 :param subnet: to validate against the interface_router
1108 :return: True if both IDs match else False
1110 subnet_id = interface_router.subnet_id
1111 router_id = interface_router.port_id
1113 return subnet.id == subnet_id and router.id == router_id
1116 def validate_port(neutron, port_obj, this_port_name):
1118 Returns true if a port for a given name DOES NOT exist if the exists
1119 parameter is false conversely true. Returns false if a port for a given
1120 name DOES exist if the exists parameter is true conversely false.
1121 :param neutron: The neutron client
1122 :param port_obj: The port object to lookup
1123 :param this_port_name: The expected router name
1126 os_ports = neutron.list_ports()
1127 for os_port, os_port_insts in os_ports.items():
1128 for os_inst in os_port_insts:
1129 if os_inst['id'] == port_obj.id:
1130 return os_inst['name'] == this_port_name