1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from snaps.openstack import create_router
18 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
20 from snaps.openstack.create_security_group import SecurityGroupSettings, \
21 SecurityGroupRuleSettings, Direction
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
29 __author__ = 'spisarski'
35 class NeutronSmokeTests(OSComponentTestCase):
37 Tests to ensure that the neutron client can communicate with the cloud
40 def test_neutron_connect_success(self):
42 Tests to ensure that the proper credentials can connect.
44 neutron = neutron_utils.neutron_client(self.os_creds)
46 networks = neutron.list_networks()
49 networks = networks.get('networks')
50 for network in networks:
51 if network.get('name') == self.ext_net_name:
53 self.assertTrue(found)
55 def test_neutron_connect_fail(self):
57 Tests to ensure that the improper credentials cannot connect.
59 from snaps.openstack.os_credentials import OSCreds
61 with self.assertRaises(Exception):
62 neutron = neutron_utils.neutron_client(
63 OSCreds(username='user', password='pass', auth_url='url',
64 project_name='project'))
65 neutron.list_networks()
67 def test_retrieve_ext_network_name(self):
69 Tests the neutron_utils.get_external_network_names to ensure the
70 configured self.ext_net_name is contained within the returned list
73 neutron = neutron_utils.neutron_client(self.os_creds)
74 ext_networks = neutron_utils.get_external_networks(neutron)
76 for network in ext_networks:
77 if network.name == self.ext_net_name:
80 self.assertTrue(found)
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
85 Test for creating networks via neutron_utils.py
89 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90 self.port_name = str(guid) + '-port'
91 self.neutron = neutron_utils.neutron_client(self.os_creds)
93 self.net_config = openstack_tests.get_pub_net_config(
94 net_name=guid + '-pub-net')
98 Cleans the remote OpenStack objects
101 neutron_utils.delete_network(self.neutron, self.network)
102 validate_network(self.neutron, self.network.name, False)
104 def test_create_network(self):
106 Tests the neutron_utils.create_neutron_net() function
108 self.network = neutron_utils.create_network(
109 self.neutron, self.os_creds, self.net_config.network_settings)
110 self.assertEqual(self.net_config.network_settings.name,
112 self.assertTrue(validate_network(
113 self.neutron, self.net_config.network_settings.name, True))
115 def test_create_network_empty_name(self):
117 Tests the neutron_utils.create_neutron_net() function with an empty
120 with self.assertRaises(Exception):
121 self.network = neutron_utils.create_network(
122 self.neutron, self.os_creds,
123 network_settings=NetworkSettings(name=''))
125 def test_create_network_null_name(self):
127 Tests the neutron_utils.create_neutron_net() function when the network
130 with self.assertRaises(Exception):
131 self.network = neutron_utils.create_network(
132 self.neutron, self.os_creds,
133 network_settings=NetworkSettings())
136 class NeutronUtilsSubnetTests(OSComponentTestCase):
138 Test for creating networks with subnets via neutron_utils.py
142 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
143 self.port_name = str(guid) + '-port'
144 self.neutron = neutron_utils.neutron_client(self.os_creds)
147 self.net_config = openstack_tests.get_pub_net_config(
148 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
149 external_net=self.ext_net_name)
153 Cleans the remote OpenStack objects
156 neutron_utils.delete_subnet(self.neutron, self.subnet)
157 validate_subnet(self.neutron, self.subnet.name,
158 self.net_config.network_settings.subnet_settings[
162 neutron_utils.delete_network(self.neutron, self.network)
163 validate_network(self.neutron, self.network.name, False)
165 def test_create_subnet(self):
167 Tests the neutron_utils.create_neutron_net() function
169 self.network = neutron_utils.create_network(
170 self.neutron, self.os_creds, self.net_config.network_settings)
171 self.assertEqual(self.net_config.network_settings.name,
173 self.assertTrue(validate_network(
174 self.neutron, self.net_config.network_settings.name, True))
176 subnet_setting = self.net_config.network_settings.subnet_settings[0]
177 self.subnet = neutron_utils.create_subnet(
178 self.neutron, subnet_setting, self.os_creds, network=self.network)
180 self.neutron, subnet_setting.name, subnet_setting.cidr, True)
182 def test_create_subnet_null_name(self):
184 Tests the neutron_utils.create_neutron_subnet() function for an
185 Exception when the subnet name is None
187 self.network = neutron_utils.create_network(
188 self.neutron, self.os_creds, self.net_config.network_settings)
189 self.assertEqual(self.net_config.network_settings.name,
191 self.assertTrue(validate_network(
192 self.neutron, self.net_config.network_settings.name, True))
194 with self.assertRaises(Exception):
195 SubnetSettings(cidr=self.net_config.subnet_cidr)
197 def test_create_subnet_empty_name(self):
199 Tests the neutron_utils.create_neutron_net() function with an empty
202 self.network = neutron_utils.create_network(
203 self.neutron, self.os_creds, self.net_config.network_settings)
204 self.assertEqual(self.net_config.network_settings.name,
206 self.assertTrue(validate_network(
207 self.neutron, self.net_config.network_settings.name, True))
209 subnet_setting = self.net_config.network_settings.subnet_settings[0]
210 neutron_utils.create_subnet(
211 self.neutron, subnet_setting, self.os_creds, network=self.network)
212 validate_subnet(self.neutron, '', subnet_setting.cidr, True)
214 def test_create_subnet_null_cidr(self):
216 Tests the neutron_utils.create_neutron_subnet() function for an
217 Exception when the subnet CIDR value is None
219 self.network = neutron_utils.create_network(
220 self.neutron, self.os_creds, self.net_config.network_settings)
221 self.assertEqual(self.net_config.network_settings.name,
223 self.assertTrue(validate_network(
224 self.neutron, self.net_config.network_settings.name, True))
226 with self.assertRaises(Exception):
227 sub_sets = SubnetSettings(
228 cidr=None, name=self.net_config.subnet_name)
229 neutron_utils.create_subnet(
230 self.neutron, sub_sets, self.os_creds, network=self.network)
232 def test_create_subnet_empty_cidr(self):
234 Tests the neutron_utils.create_neutron_subnet() function for an
235 Exception when the subnet CIDR value is empty
237 self.network = neutron_utils.create_network(
238 self.neutron, self.os_creds, self.net_config.network_settings)
239 self.assertEqual(self.net_config.network_settings.name,
241 self.assertTrue(validate_network(
242 self.neutron, self.net_config.network_settings.name, True))
244 with self.assertRaises(Exception):
245 sub_sets = SubnetSettings(
246 cidr='', name=self.net_config.subnet_name)
247 neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
248 network=self.network)
251 class NeutronUtilsRouterTests(OSComponentTestCase):
253 Test for creating routers via neutron_utils.py
257 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
258 self.port_name = str(guid) + '-port'
259 self.neutron = neutron_utils.neutron_client(self.os_creds)
264 self.interface_router = None
265 self.net_config = openstack_tests.get_pub_net_config(
266 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
267 router_name=guid + '-pub-router', external_net=self.ext_net_name)
271 Cleans the remote OpenStack objects
273 if self.interface_router:
274 neutron_utils.remove_interface_router(self.neutron, self.router,
278 neutron_utils.delete_router(self.neutron, self.router)
279 validate_router(self.neutron, self.router.name, False)
282 neutron_utils.delete_port(self.neutron, self.port)
285 neutron_utils.delete_subnet(self.neutron, self.subnet)
287 self.neutron, self.subnet.name,
288 self.net_config.network_settings.subnet_settings[0].cidr,
292 neutron_utils.delete_network(self.neutron, self.network)
293 validate_network(self.neutron, self.network.name, False)
295 def test_create_router_simple(self):
297 Tests the neutron_utils.create_neutron_net() function when an external
300 self.router = neutron_utils.create_router(
301 self.neutron, self.os_creds, self.net_config.router_settings)
302 validate_router(self.neutron, self.net_config.router_settings.name,
305 def test_create_router_with_public_interface(self):
307 Tests the neutron_utils.create_neutron_net() function when an external
310 subnet_setting = self.net_config.network_settings.subnet_settings[0]
311 self.net_config = openstack_tests.OSNetworkConfig(
312 self.net_config.network_settings.name,
315 self.net_config.router_settings.name,
317 self.router = neutron_utils.create_router(
318 self.neutron, self.os_creds, self.net_config.router_settings)
319 validate_router(self.neutron, self.net_config.router_settings.name,
322 ext_net = neutron_utils.get_network(
323 self.neutron, network_name=self.ext_net_name)
325 self.router.external_gateway_info['network_id'], ext_net.id)
327 def test_create_router_empty_name(self):
329 Tests the neutron_utils.create_neutron_net() function
331 with self.assertRaises(Exception):
332 this_router_settings = create_router.RouterSettings(name='')
333 self.router = neutron_utils.create_router(self.neutron,
335 this_router_settings)
337 def test_create_router_null_name(self):
339 Tests the neutron_utils.create_neutron_subnet() function when the
340 subnet CIDR value is None
342 with self.assertRaises(Exception):
343 this_router_settings = create_router.RouterSettings()
344 self.router = neutron_utils.create_router(self.neutron,
346 this_router_settings)
347 validate_router(self.neutron, None, True)
349 def test_add_interface_router(self):
351 Tests the neutron_utils.add_interface_router() function
353 self.network = neutron_utils.create_network(
354 self.neutron, self.os_creds, self.net_config.network_settings)
355 self.assertEqual(self.net_config.network_settings.name,
357 self.assertTrue(validate_network(
358 self.neutron, self.net_config.network_settings.name, True))
360 subnet_setting = self.net_config.network_settings.subnet_settings[0]
361 self.subnet = neutron_utils.create_subnet(
362 self.neutron, subnet_setting,
363 self.os_creds, self.network)
367 subnet_setting.cidr, True)
369 self.router = neutron_utils.create_router(
370 self.neutron, self.os_creds, self.net_config.router_settings)
371 validate_router(self.neutron, self.net_config.router_settings.name,
374 self.interface_router = neutron_utils.add_interface_router(
375 self.neutron, self.router, self.subnet)
376 validate_interface_router(self.interface_router, self.router,
379 def test_add_interface_router_null_router(self):
381 Tests the neutron_utils.add_interface_router() function for an
382 Exception when the router value is None
384 self.network = neutron_utils.create_network(
385 self.neutron, self.os_creds, self.net_config.network_settings)
386 self.assertEqual(self.net_config.network_settings.name,
388 self.assertTrue(validate_network(
389 self.neutron, self.net_config.network_settings.name, True))
391 subnet_setting = self.net_config.network_settings.subnet_settings[0]
392 self.subnet = neutron_utils.create_subnet(
393 self.neutron, subnet_setting,
394 self.os_creds, self.network)
396 self.neutron, subnet_setting.name, subnet_setting.cidr, True)
398 with self.assertRaises(NeutronException):
399 self.interface_router = neutron_utils.add_interface_router(
400 self.neutron, self.router, self.subnet)
402 def test_add_interface_router_null_subnet(self):
404 Tests the neutron_utils.add_interface_router() function for an
405 Exception when the subnet value is None
407 self.network = neutron_utils.create_network(
408 self.neutron, self.os_creds, self.net_config.network_settings)
409 self.assertEqual(self.net_config.network_settings.name,
411 self.assertTrue(validate_network(
412 self.neutron, self.net_config.network_settings.name, True))
414 self.router = neutron_utils.create_router(
415 self.neutron, self.os_creds, self.net_config.router_settings)
416 validate_router(self.neutron, self.net_config.router_settings.name,
419 with self.assertRaises(NeutronException):
420 self.interface_router = neutron_utils.add_interface_router(
421 self.neutron, self.router, self.subnet)
423 def test_create_port(self):
425 Tests the neutron_utils.create_port() function
427 self.network = neutron_utils.create_network(
428 self.neutron, self.os_creds, self.net_config.network_settings)
429 self.assertEqual(self.net_config.network_settings.name,
431 self.assertTrue(validate_network(
432 self.neutron, self.net_config.network_settings.name, True))
434 subnet_setting = self.net_config.network_settings.subnet_settings[0]
435 self.subnet = neutron_utils.create_subnet(
436 self.neutron, subnet_setting, self.os_creds, self.network)
437 validate_subnet(self.neutron, subnet_setting.name,
438 subnet_setting.cidr, True)
440 self.port = neutron_utils.create_port(
441 self.neutron, self.os_creds, PortSettings(
444 'subnet_name': subnet_setting.name,
446 network_name=self.net_config.network_settings.name))
447 validate_port(self.neutron, self.port, self.port_name)
449 def test_create_port_empty_name(self):
451 Tests the neutron_utils.create_port() function
453 self.network = neutron_utils.create_network(
454 self.neutron, self.os_creds, self.net_config.network_settings)
455 self.assertEqual(self.net_config.network_settings.name,
457 self.assertTrue(validate_network(
458 self.neutron, self.net_config.network_settings.name, True))
460 subnet_setting = self.net_config.network_settings.subnet_settings[0]
461 self.subnet = neutron_utils.create_subnet(
462 self.neutron, subnet_setting, self.os_creds, self.network)
463 validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr,
466 self.port = neutron_utils.create_port(
467 self.neutron, self.os_creds, PortSettings(
469 network_name=self.net_config.network_settings.name,
471 'subnet_name': subnet_setting.name,
473 validate_port(self.neutron, self.port, self.port_name)
475 def test_create_port_null_name(self):
477 Tests the neutron_utils.create_port() function for an Exception when
478 the port name value is None
480 self.network = neutron_utils.create_network(
481 self.neutron, self.os_creds, self.net_config.network_settings)
482 self.assertEqual(self.net_config.network_settings.name,
484 self.assertTrue(validate_network(
485 self.neutron, self.net_config.network_settings.name, True))
487 subnet_setting = self.net_config.network_settings.subnet_settings[0]
488 self.subnet = neutron_utils.create_subnet(
489 self.neutron, subnet_setting,
490 self.os_creds, self.network)
494 subnet_setting.cidr, True)
496 with self.assertRaises(Exception):
497 self.port = neutron_utils.create_port(
498 self.neutron, self.os_creds,
500 network_name=self.net_config.network_settings.name,
502 'subnet_name': subnet_setting.name,
505 def test_create_port_null_network_object(self):
507 Tests the neutron_utils.create_port() function for an Exception when
508 the network object is None
510 with self.assertRaises(Exception):
511 self.port = neutron_utils.create_port(
512 self.neutron, self.os_creds,
515 network_name=self.net_config.network_settings.name,
518 self.net_config.network_settings.subnet_settings[
522 def test_create_port_null_ip(self):
524 Tests the neutron_utils.create_port() function for an Exception when
527 self.network = neutron_utils.create_network(
528 self.neutron, self.os_creds, self.net_config.network_settings)
529 self.assertEqual(self.net_config.network_settings.name,
531 self.assertTrue(validate_network(
532 self.neutron, self.net_config.network_settings.name, True))
534 subnet_setting = self.net_config.network_settings.subnet_settings[0]
535 self.subnet = neutron_utils.create_subnet(
536 self.neutron, subnet_setting,
537 self.os_creds, self.network)
541 subnet_setting.cidr, True)
543 with self.assertRaises(Exception):
544 self.port = neutron_utils.create_port(
545 self.neutron, self.os_creds,
548 network_name=self.net_config.network_settings.name,
550 'subnet_name': subnet_setting.name,
553 def test_create_port_invalid_ip(self):
555 Tests the neutron_utils.create_port() function for an Exception when
558 self.network = neutron_utils.create_network(
559 self.neutron, self.os_creds, self.net_config.network_settings)
560 self.assertEqual(self.net_config.network_settings.name,
562 self.assertTrue(validate_network(
563 self.neutron, self.net_config.network_settings.name, True))
565 subnet_setting = self.net_config.network_settings.subnet_settings[0]
566 self.subnet = neutron_utils.create_subnet(
567 self.neutron, subnet_setting, self.os_creds, self.network)
568 validate_subnet(self.neutron,
570 subnet_setting.cidr, True)
572 with self.assertRaises(Exception):
573 self.port = neutron_utils.create_port(
574 self.neutron, self.os_creds,
577 network_name=self.net_config.network_settings.name,
579 'subnet_name': subnet_setting.name,
582 def test_create_port_invalid_ip_to_subnet(self):
584 Tests the neutron_utils.create_port() function for an Exception when
587 self.network = neutron_utils.create_network(
588 self.neutron, self.os_creds, self.net_config.network_settings)
589 self.assertEqual(self.net_config.network_settings.name,
591 self.assertTrue(validate_network(
592 self.neutron, self.net_config.network_settings.name, True))
594 subnet_setting = self.net_config.network_settings.subnet_settings[0]
595 self.subnet = neutron_utils.create_subnet(
596 self.neutron, subnet_setting, self.os_creds, self.network)
598 self.neutron, subnet_setting.name, subnet_setting.cidr, True)
600 with self.assertRaises(Exception):
601 self.port = neutron_utils.create_port(
602 self.neutron, self.os_creds,
605 network_name=self.net_config.network_settings.name,
607 'subnet_name': subnet_setting.name,
608 'ip': '10.197.123.100'}]))
611 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
613 Test for creating security groups via neutron_utils.py
617 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
618 self.sec_grp_name = guid + 'name'
620 self.security_groups = list()
621 self.security_group_rules = list()
622 self.neutron = neutron_utils.neutron_client(self.os_creds)
623 self.keystone = keystone_utils.keystone_client(self.os_creds)
627 Cleans the remote OpenStack objects
629 for rule in self.security_group_rules:
630 neutron_utils.delete_security_group_rule(self.neutron, rule)
632 for security_group in self.security_groups:
634 neutron_utils.delete_security_group(self.neutron,
639 def test_create_delete_simple_sec_grp(self):
641 Tests the neutron_utils.create_security_group() function
643 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
644 security_group = neutron_utils.create_security_group(self.neutron,
648 self.assertTrue(sec_grp_settings.name, security_group.name)
650 sec_grp_get = neutron_utils.get_security_group(self.neutron,
651 sec_grp_settings.name)
652 self.assertIsNotNone(sec_grp_get)
653 self.assertTrue(validation_utils.objects_equivalent(
654 security_group, sec_grp_get))
656 neutron_utils.delete_security_group(self.neutron, security_group)
657 sec_grp_get = neutron_utils.get_security_group(self.neutron,
658 sec_grp_settings.name)
659 self.assertIsNone(sec_grp_get)
661 def test_create_sec_grp_no_name(self):
663 Tests the SecurityGroupSettings constructor and
664 neutron_utils.create_security_group() function to ensure that
665 attempting to create a security group without a name will raise an
668 with self.assertRaises(Exception):
669 sec_grp_settings = SecurityGroupSettings()
670 self.security_groups.append(
671 neutron_utils.create_security_group(self.neutron,
675 def test_create_sec_grp_no_rules(self):
677 Tests the neutron_utils.create_security_group() function
679 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
680 description='hello group')
681 self.security_groups.append(
682 neutron_utils.create_security_group(self.neutron, self.keystone,
685 self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
686 self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
688 sec_grp_get = neutron_utils.get_security_group(self.neutron,
689 sec_grp_settings.name)
690 self.assertIsNotNone(sec_grp_get)
691 self.assertEqual(self.security_groups[0], sec_grp_get)
693 def test_create_sec_grp_one_rule(self):
695 Tests the neutron_utils.create_security_group() function
698 sec_grp_rule_settings = SecurityGroupRuleSettings(
699 sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
700 sec_grp_settings = SecurityGroupSettings(
701 name=self.sec_grp_name, description='hello group',
702 rule_settings=[sec_grp_rule_settings])
704 self.security_groups.append(
705 neutron_utils.create_security_group(self.neutron, self.keystone,
707 free_rules = neutron_utils.get_rules_by_security_group(
708 self.neutron, self.security_groups[0])
709 for free_rule in free_rules:
710 self.security_group_rules.append(free_rule)
712 self.security_group_rules.append(
713 neutron_utils.create_security_group_rule(
714 self.neutron, sec_grp_settings.rule_settings[0]))
716 # Refresh object so it is populated with the newly added rule
717 security_group = neutron_utils.get_security_group(
718 self.neutron, sec_grp_settings.name)
720 rules = neutron_utils.get_rules_by_security_group(self.neutron,
724 validation_utils.objects_equivalent(
725 self.security_group_rules, rules))
727 self.assertTrue(sec_grp_settings.name, security_group.name)
729 sec_grp_get = neutron_utils.get_security_group(self.neutron,
730 sec_grp_settings.name)
731 self.assertIsNotNone(sec_grp_get)
732 self.assertEqual(security_group, sec_grp_get)
734 def test_get_sec_grp_by_id(self):
736 Tests the neutron_utils.create_security_group() function
739 self.security_groups.append(neutron_utils.create_security_group(
740 self.neutron, self.keystone,
741 SecurityGroupSettings(name=self.sec_grp_name + '-1',
742 description='hello group')))
743 self.security_groups.append(neutron_utils.create_security_group(
744 self.neutron, self.keystone,
745 SecurityGroupSettings(name=self.sec_grp_name + '-2',
746 description='hello group')))
748 sec_grp_1b = neutron_utils.get_security_group_by_id(
749 self.neutron, self.security_groups[0].id)
750 sec_grp_2b = neutron_utils.get_security_group_by_id(
751 self.neutron, self.security_groups[1].id)
753 self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
754 self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
757 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
759 Test basic nova keypair functionality
764 Instantiates the CreateImage object that is responsible for downloading
765 and creating an OS image file within OpenStack
767 self.neutron = neutron_utils.neutron_client(self.os_creds)
768 self.floating_ip = None
772 Cleans the image and downloaded image file
775 neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
777 def test_floating_ips(self):
779 Tests the creation of a floating IP
782 initial_fips = neutron_utils.get_floating_ips(self.neutron)
784 self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
786 all_fips = neutron_utils.get_floating_ips(self.neutron)
787 self.assertEqual(len(initial_fips) + 1, len(all_fips))
788 returned = neutron_utils.get_floating_ip(self.neutron,
790 self.assertEqual(self.floating_ip.id, returned.id)
791 self.assertEqual(self.floating_ip.ip, returned.ip)
799 def validate_network(neutron, name, exists):
801 Returns true if a network for a given name DOES NOT exist if the exists
802 parameter is false conversely true. Returns false if a network for a given
803 name DOES exist if the exists parameter is true conversely false.
804 :param neutron: The neutron client
805 :param name: The expected network name
806 :param exists: Whether or not the network name should exist or not
809 network = neutron_utils.get_network(neutron, network_name=name)
810 if exists and network:
812 if not exists and not network:
817 def validate_subnet(neutron, name, cidr, exists):
819 Returns true if a subnet for a given name DOES NOT exist if the exists
820 parameter is false conversely true. Returns false if a subnet for a given
821 name DOES exist if the exists parameter is true conversely false.
822 :param neutron: The neutron client
823 :param name: The expected subnet name
824 :param cidr: The expected CIDR value
825 :param exists: Whether or not the network name should exist or not
828 subnet = neutron_utils.get_subnet_by_name(neutron, name)
829 if exists and subnet:
830 return subnet.cidr == cidr
831 if not exists and not subnet:
836 def validate_router(neutron, name, exists):
838 Returns true if a router for a given name DOES NOT exist if the exists
839 parameter is false conversely true. Returns false if a router for a given
840 name DOES exist if the exists parameter is true conversely false.
841 :param neutron: The neutron client
842 :param name: The expected router name
843 :param exists: Whether or not the network name should exist or not
846 router = neutron_utils.get_router_by_name(neutron, name)
847 if exists and router:
852 def validate_interface_router(interface_router, router, subnet):
854 Returns true if the router ID & subnet ID have been properly included into
855 the interface router object
856 :param interface_router: the SNAPS-OO InterfaceRouter domain object
857 :param router: to validate against the interface_router
858 :param subnet: to validate against the interface_router
859 :return: True if both IDs match else False
861 subnet_id = interface_router.subnet_id
862 router_id = interface_router.port_id
864 return subnet.id == subnet_id and router.id == router_id
867 def validate_port(neutron, port_obj, this_port_name):
869 Returns true if a port for a given name DOES NOT exist if the exists
870 parameter is false conversely true. Returns false if a port for a given
871 name DOES exist if the exists parameter is true conversely false.
872 :param neutron: The neutron client
873 :param port_obj: The port object to lookup
874 :param this_port_name: The expected router name
877 os_ports = neutron.list_ports()
878 for os_port, os_port_insts in os_ports.items():
879 for os_inst in os_port_insts:
880 if os_inst['id'] == port_obj.id:
881 return os_inst['name'] == this_port_name