1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from snaps.openstack import create_router
18 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
20 from snaps.openstack.create_security_group import SecurityGroupSettings, \
21 SecurityGroupRuleSettings, Direction
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
28 __author__ = 'spisarski'
34 class NeutronSmokeTests(OSComponentTestCase):
36 Tests to ensure that the neutron client can communicate with the cloud
39 def test_neutron_connect_success(self):
41 Tests to ensure that the proper credentials can connect.
43 neutron = neutron_utils.neutron_client(self.os_creds)
45 networks = neutron.list_networks()
48 networks = networks.get('networks')
49 for network in networks:
50 if network.get('name') == self.ext_net_name:
52 self.assertTrue(found)
54 def test_neutron_connect_fail(self):
56 Tests to ensure that the improper credentials cannot connect.
58 from snaps.openstack.os_credentials import OSCreds
60 with self.assertRaises(Exception):
61 neutron = neutron_utils.neutron_client(
62 OSCreds(username='user', password='pass', auth_url='url',
63 project_name='project'))
64 neutron.list_networks()
66 def test_retrieve_ext_network_name(self):
68 Tests the neutron_utils.get_external_network_names to ensure the
69 configured self.ext_net_name is contained within the returned list
72 neutron = neutron_utils.neutron_client(self.os_creds)
73 ext_networks = neutron_utils.get_external_networks(neutron)
75 for network in ext_networks:
76 if network.name == self.ext_net_name:
79 self.assertTrue(found)
82 class NeutronUtilsNetworkTests(OSComponentTestCase):
84 Test for creating networks via neutron_utils.py
88 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
89 self.port_name = str(guid) + '-port'
90 self.neutron = neutron_utils.neutron_client(self.os_creds)
92 self.net_config = openstack_tests.get_pub_net_config(
93 net_name=guid + '-pub-net')
97 Cleans the remote OpenStack objects
100 neutron_utils.delete_network(self.neutron, self.network)
101 validate_network(self.neutron, self.network.name, False)
103 def test_create_network(self):
105 Tests the neutron_utils.create_neutron_net() function
107 self.network = neutron_utils.create_network(
108 self.neutron, self.os_creds, self.net_config.network_settings)
109 self.assertEqual(self.net_config.network_settings.name,
111 self.assertTrue(validate_network(
112 self.neutron, self.net_config.network_settings.name, True))
114 def test_create_network_empty_name(self):
116 Tests the neutron_utils.create_neutron_net() function with an empty
119 with self.assertRaises(Exception):
120 self.network = neutron_utils.create_network(
121 self.neutron, self.os_creds,
122 network_settings=NetworkSettings(name=''))
124 def test_create_network_null_name(self):
126 Tests the neutron_utils.create_neutron_net() function when the network
129 with self.assertRaises(Exception):
130 self.network = neutron_utils.create_network(
131 self.neutron, self.os_creds,
132 network_settings=NetworkSettings())
135 class NeutronUtilsSubnetTests(OSComponentTestCase):
137 Test for creating networks with subnets via neutron_utils.py
141 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
142 self.port_name = str(guid) + '-port'
143 self.neutron = neutron_utils.neutron_client(self.os_creds)
146 self.net_config = openstack_tests.get_pub_net_config(
147 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
148 external_net=self.ext_net_name)
152 Cleans the remote OpenStack objects
155 neutron_utils.delete_subnet(self.neutron, self.subnet)
156 validate_subnet(self.neutron, self.subnet.name,
157 self.net_config.network_settings.subnet_settings[
161 neutron_utils.delete_network(self.neutron, self.network)
162 validate_network(self.neutron, self.network.name, False)
164 def test_create_subnet(self):
166 Tests the neutron_utils.create_neutron_net() function
168 self.network = neutron_utils.create_network(
169 self.neutron, self.os_creds, self.net_config.network_settings)
170 self.assertEqual(self.net_config.network_settings.name,
172 self.assertTrue(validate_network(
173 self.neutron, self.net_config.network_settings.name, True))
175 subnet_setting = self.net_config.network_settings.subnet_settings[0]
176 self.subnet = neutron_utils.create_subnet(
177 self.neutron, subnet_setting, self.os_creds, network=self.network)
179 self.neutron, subnet_setting.name, subnet_setting.cidr, True)
181 def test_create_subnet_null_name(self):
183 Tests the neutron_utils.create_neutron_subnet() function for an
184 Exception when the subnet name is None
186 self.network = neutron_utils.create_network(
187 self.neutron, self.os_creds, self.net_config.network_settings)
188 self.assertEqual(self.net_config.network_settings.name,
190 self.assertTrue(validate_network(
191 self.neutron, self.net_config.network_settings.name, True))
193 with self.assertRaises(Exception):
194 SubnetSettings(cidr=self.net_config.subnet_cidr)
196 def test_create_subnet_empty_name(self):
198 Tests the neutron_utils.create_neutron_net() function with an empty
201 self.network = neutron_utils.create_network(
202 self.neutron, self.os_creds, self.net_config.network_settings)
203 self.assertEqual(self.net_config.network_settings.name,
205 self.assertTrue(validate_network(
206 self.neutron, self.net_config.network_settings.name, True))
208 subnet_setting = self.net_config.network_settings.subnet_settings[0]
209 neutron_utils.create_subnet(
210 self.neutron, subnet_setting, self.os_creds, network=self.network)
211 validate_subnet(self.neutron, '', subnet_setting.cidr, True)
213 def test_create_subnet_null_cidr(self):
215 Tests the neutron_utils.create_neutron_subnet() function for an
216 Exception when the subnet CIDR value is None
218 self.network = neutron_utils.create_network(
219 self.neutron, self.os_creds, self.net_config.network_settings)
220 self.assertEqual(self.net_config.network_settings.name,
222 self.assertTrue(validate_network(
223 self.neutron, self.net_config.network_settings.name, True))
225 with self.assertRaises(Exception):
226 sub_sets = SubnetSettings(
227 cidr=None, name=self.net_config.subnet_name)
228 neutron_utils.create_subnet(
229 self.neutron, sub_sets, self.os_creds, network=self.network)
231 def test_create_subnet_empty_cidr(self):
233 Tests the neutron_utils.create_neutron_subnet() function for an
234 Exception when the subnet CIDR value is empty
236 self.network = neutron_utils.create_network(
237 self.neutron, self.os_creds, self.net_config.network_settings)
238 self.assertEqual(self.net_config.network_settings.name,
240 self.assertTrue(validate_network(
241 self.neutron, self.net_config.network_settings.name, True))
243 with self.assertRaises(Exception):
244 sub_sets = SubnetSettings(
245 cidr='', name=self.net_config.subnet_name)
246 neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
247 network=self.network)
250 class NeutronUtilsRouterTests(OSComponentTestCase):
252 Test for creating routers via neutron_utils.py
256 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
257 self.port_name = str(guid) + '-port'
258 self.neutron = neutron_utils.neutron_client(self.os_creds)
263 self.interface_router = None
264 self.net_config = openstack_tests.get_pub_net_config(
265 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
266 router_name=guid + '-pub-router', external_net=self.ext_net_name)
270 Cleans the remote OpenStack objects
272 if self.interface_router:
273 neutron_utils.remove_interface_router(self.neutron, self.router,
277 neutron_utils.delete_router(self.neutron, self.router)
278 validate_router(self.neutron, self.router.name, False)
281 neutron_utils.delete_port(self.neutron, self.port)
284 neutron_utils.delete_subnet(self.neutron, self.subnet)
286 self.neutron, self.subnet.name,
287 self.net_config.network_settings.subnet_settings[0].cidr,
291 neutron_utils.delete_network(self.neutron, self.network)
292 validate_network(self.neutron, self.network.name, False)
294 def test_create_router_simple(self):
296 Tests the neutron_utils.create_neutron_net() function when an external
299 self.router = neutron_utils.create_router(
300 self.neutron, self.os_creds, self.net_config.router_settings)
301 validate_router(self.neutron, self.net_config.router_settings.name,
304 def test_create_router_with_public_interface(self):
306 Tests the neutron_utils.create_neutron_net() function when an external
309 subnet_setting = self.net_config.network_settings.subnet_settings[0]
310 self.net_config = openstack_tests.OSNetworkConfig(
311 self.net_config.network_settings.name,
314 self.net_config.router_settings.name,
316 self.router = neutron_utils.create_router(
317 self.neutron, self.os_creds, self.net_config.router_settings)
318 validate_router(self.neutron, self.net_config.router_settings.name,
320 # TODO - Add validation that the router gatway has been set
322 def test_create_router_empty_name(self):
324 Tests the neutron_utils.create_neutron_net() function
326 with self.assertRaises(Exception):
327 this_router_settings = create_router.RouterSettings(name='')
328 self.router = neutron_utils.create_router(self.neutron,
330 this_router_settings)
332 def test_create_router_null_name(self):
334 Tests the neutron_utils.create_neutron_subnet() function when the
335 subnet CIDR value is None
337 with self.assertRaises(Exception):
338 this_router_settings = create_router.RouterSettings()
339 self.router = neutron_utils.create_router(self.neutron,
341 this_router_settings)
342 validate_router(self.neutron, None, True)
344 def test_add_interface_router(self):
346 Tests the neutron_utils.add_interface_router() function
348 self.network = neutron_utils.create_network(
349 self.neutron, self.os_creds, self.net_config.network_settings)
350 self.assertEqual(self.net_config.network_settings.name,
352 self.assertTrue(validate_network(
353 self.neutron, self.net_config.network_settings.name, True))
355 subnet_setting = self.net_config.network_settings.subnet_settings[0]
356 self.subnet = neutron_utils.create_subnet(
357 self.neutron, subnet_setting,
358 self.os_creds, self.network)
362 subnet_setting.cidr, True)
364 self.router = neutron_utils.create_router(
365 self.neutron, self.os_creds, self.net_config.router_settings)
366 validate_router(self.neutron, self.net_config.router_settings.name,
369 self.interface_router = neutron_utils.add_interface_router(
370 self.neutron, self.router, self.subnet)
371 validate_interface_router(self.interface_router, self.router,
374 def test_add_interface_router_null_router(self):
376 Tests the neutron_utils.add_interface_router() function for an
377 Exception when the router value is None
379 self.network = neutron_utils.create_network(
380 self.neutron, self.os_creds, self.net_config.network_settings)
381 self.assertEqual(self.net_config.network_settings.name,
383 self.assertTrue(validate_network(
384 self.neutron, self.net_config.network_settings.name, True))
386 subnet_setting = self.net_config.network_settings.subnet_settings[0]
387 self.subnet = neutron_utils.create_subnet(
388 self.neutron, subnet_setting,
389 self.os_creds, self.network)
391 self.neutron, subnet_setting.name, subnet_setting.cidr, True)
393 with self.assertRaises(Exception):
394 self.interface_router = neutron_utils.add_interface_router(
395 self.neutron, self.router, self.subnet)
397 def test_add_interface_router_null_subnet(self):
399 Tests the neutron_utils.add_interface_router() function for an
400 Exception when the subnet value is None
402 self.network = neutron_utils.create_network(
403 self.neutron, self.os_creds, self.net_config.network_settings)
404 self.assertEqual(self.net_config.network_settings.name,
406 self.assertTrue(validate_network(
407 self.neutron, self.net_config.network_settings.name, True))
409 self.router = neutron_utils.create_router(
410 self.neutron, self.os_creds, self.net_config.router_settings)
411 validate_router(self.neutron, self.net_config.router_settings.name,
414 with self.assertRaises(Exception):
415 self.interface_router = neutron_utils.add_interface_router(
416 self.neutron, self.router, self.subnet)
418 def test_create_port(self):
420 Tests the neutron_utils.create_port() function
422 self.network = neutron_utils.create_network(
423 self.neutron, self.os_creds, self.net_config.network_settings)
424 self.assertEqual(self.net_config.network_settings.name,
426 self.assertTrue(validate_network(
427 self.neutron, self.net_config.network_settings.name, True))
429 subnet_setting = self.net_config.network_settings.subnet_settings[0]
430 self.subnet = neutron_utils.create_subnet(
431 self.neutron, subnet_setting, self.os_creds, self.network)
432 validate_subnet(self.neutron, subnet_setting.name,
433 subnet_setting.cidr, True)
435 self.port = neutron_utils.create_port(
436 self.neutron, self.os_creds, PortSettings(
439 'subnet_name': subnet_setting.name,
441 network_name=self.net_config.network_settings.name))
442 validate_port(self.neutron, self.port, self.port_name)
444 def test_create_port_empty_name(self):
446 Tests the neutron_utils.create_port() function
448 self.network = neutron_utils.create_network(
449 self.neutron, self.os_creds, self.net_config.network_settings)
450 self.assertEqual(self.net_config.network_settings.name,
452 self.assertTrue(validate_network(
453 self.neutron, self.net_config.network_settings.name, True))
455 subnet_setting = self.net_config.network_settings.subnet_settings[0]
456 self.subnet = neutron_utils.create_subnet(
457 self.neutron, subnet_setting, self.os_creds, self.network)
458 validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr,
461 self.port = neutron_utils.create_port(
462 self.neutron, self.os_creds, PortSettings(
464 network_name=self.net_config.network_settings.name,
466 'subnet_name': subnet_setting.name,
468 validate_port(self.neutron, self.port, self.port_name)
470 def test_create_port_null_name(self):
472 Tests the neutron_utils.create_port() function for an Exception when
473 the port name value is None
475 self.network = neutron_utils.create_network(
476 self.neutron, self.os_creds, self.net_config.network_settings)
477 self.assertEqual(self.net_config.network_settings.name,
479 self.assertTrue(validate_network(
480 self.neutron, self.net_config.network_settings.name, True))
482 subnet_setting = self.net_config.network_settings.subnet_settings[0]
483 self.subnet = neutron_utils.create_subnet(
484 self.neutron, subnet_setting,
485 self.os_creds, self.network)
489 subnet_setting.cidr, True)
491 with self.assertRaises(Exception):
492 self.port = neutron_utils.create_port(
493 self.neutron, self.os_creds,
495 network_name=self.net_config.network_settings.name,
497 'subnet_name': subnet_setting.name,
500 def test_create_port_null_network_object(self):
502 Tests the neutron_utils.create_port() function for an Exception when
503 the network object is None
505 with self.assertRaises(Exception):
506 self.port = neutron_utils.create_port(
507 self.neutron, self.os_creds,
510 network_name=self.net_config.network_settings.name,
513 self.net_config.network_settings.subnet_settings[
517 def test_create_port_null_ip(self):
519 Tests the neutron_utils.create_port() function for an Exception when
522 self.network = neutron_utils.create_network(
523 self.neutron, self.os_creds, self.net_config.network_settings)
524 self.assertEqual(self.net_config.network_settings.name,
526 self.assertTrue(validate_network(
527 self.neutron, self.net_config.network_settings.name, True))
529 subnet_setting = self.net_config.network_settings.subnet_settings[0]
530 self.subnet = neutron_utils.create_subnet(
531 self.neutron, subnet_setting,
532 self.os_creds, self.network)
536 subnet_setting.cidr, True)
538 with self.assertRaises(Exception):
539 self.port = neutron_utils.create_port(
540 self.neutron, self.os_creds,
543 network_name=self.net_config.network_settings.name,
545 'subnet_name': subnet_setting.name,
548 def test_create_port_invalid_ip(self):
550 Tests the neutron_utils.create_port() function for an Exception when
553 self.network = neutron_utils.create_network(
554 self.neutron, self.os_creds, self.net_config.network_settings)
555 self.assertEqual(self.net_config.network_settings.name,
557 self.assertTrue(validate_network(
558 self.neutron, self.net_config.network_settings.name, True))
560 subnet_setting = self.net_config.network_settings.subnet_settings[0]
561 self.subnet = neutron_utils.create_subnet(
562 self.neutron, subnet_setting, self.os_creds, self.network)
563 validate_subnet(self.neutron,
565 subnet_setting.cidr, True)
567 with self.assertRaises(Exception):
568 self.port = neutron_utils.create_port(
569 self.neutron, self.os_creds,
572 network_name=self.net_config.network_settings.name,
574 'subnet_name': subnet_setting.name,
577 def test_create_port_invalid_ip_to_subnet(self):
579 Tests the neutron_utils.create_port() function for an Exception when
582 self.network = neutron_utils.create_network(
583 self.neutron, self.os_creds, self.net_config.network_settings)
584 self.assertEqual(self.net_config.network_settings.name,
586 self.assertTrue(validate_network(
587 self.neutron, self.net_config.network_settings.name, True))
589 subnet_setting = self.net_config.network_settings.subnet_settings[0]
590 self.subnet = neutron_utils.create_subnet(
591 self.neutron, subnet_setting, self.os_creds, self.network)
593 self.neutron, subnet_setting.name, subnet_setting.cidr, True)
595 with self.assertRaises(Exception):
596 self.port = neutron_utils.create_port(
597 self.neutron, self.os_creds,
600 network_name=self.net_config.network_settings.name,
602 'subnet_name': subnet_setting.name,
603 'ip': '10.197.123.100'}]))
606 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
608 Test for creating security groups via neutron_utils.py
612 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
613 self.sec_grp_name = guid + 'name'
615 self.security_groups = list()
616 self.security_group_rules = list()
617 self.neutron = neutron_utils.neutron_client(self.os_creds)
618 self.keystone = keystone_utils.keystone_client(self.os_creds)
622 Cleans the remote OpenStack objects
624 for rule in self.security_group_rules:
625 neutron_utils.delete_security_group_rule(self.neutron, rule)
627 for security_group in self.security_groups:
629 neutron_utils.delete_security_group(self.neutron,
634 def test_create_delete_simple_sec_grp(self):
636 Tests the neutron_utils.create_security_group() function
638 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
639 security_group = neutron_utils.create_security_group(self.neutron,
643 self.assertTrue(sec_grp_settings.name, security_group.name)
645 sec_grp_get = neutron_utils.get_security_group(self.neutron,
646 sec_grp_settings.name)
647 self.assertIsNotNone(sec_grp_get)
648 self.assertTrue(validation_utils.objects_equivalent(
649 security_group, sec_grp_get))
651 neutron_utils.delete_security_group(self.neutron, security_group)
652 sec_grp_get = neutron_utils.get_security_group(self.neutron,
653 sec_grp_settings.name)
654 self.assertIsNone(sec_grp_get)
656 def test_create_sec_grp_no_name(self):
658 Tests the SecurityGroupSettings constructor and
659 neutron_utils.create_security_group() function to ensure that
660 attempting to create a security group without a name will raise an
663 with self.assertRaises(Exception):
664 sec_grp_settings = SecurityGroupSettings()
665 self.security_groups.append(
666 neutron_utils.create_security_group(self.neutron,
670 def test_create_sec_grp_no_rules(self):
672 Tests the neutron_utils.create_security_group() function
674 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
675 description='hello group')
676 self.security_groups.append(
677 neutron_utils.create_security_group(self.neutron, self.keystone,
680 self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
681 self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
683 sec_grp_get = neutron_utils.get_security_group(self.neutron,
684 sec_grp_settings.name)
685 self.assertIsNotNone(sec_grp_get)
686 self.assertEqual(self.security_groups[0], sec_grp_get)
688 def test_create_sec_grp_one_rule(self):
690 Tests the neutron_utils.create_security_group() function
693 sec_grp_rule_settings = SecurityGroupRuleSettings(
694 sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
695 sec_grp_settings = SecurityGroupSettings(
696 name=self.sec_grp_name, description='hello group',
697 rule_settings=[sec_grp_rule_settings])
699 self.security_groups.append(
700 neutron_utils.create_security_group(self.neutron, self.keystone,
702 free_rules = neutron_utils.get_rules_by_security_group(
703 self.neutron, self.security_groups[0])
704 for free_rule in free_rules:
705 self.security_group_rules.append(free_rule)
707 self.security_group_rules.append(
708 neutron_utils.create_security_group_rule(
709 self.neutron, sec_grp_settings.rule_settings[0]))
711 # Refresh object so it is populated with the newly added rule
712 security_group = neutron_utils.get_security_group(
713 self.neutron, sec_grp_settings.name)
715 rules = neutron_utils.get_rules_by_security_group(self.neutron,
719 validation_utils.objects_equivalent(
720 self.security_group_rules, rules))
722 self.assertTrue(sec_grp_settings.name, security_group.name)
724 sec_grp_get = neutron_utils.get_security_group(self.neutron,
725 sec_grp_settings.name)
726 self.assertIsNotNone(sec_grp_get)
727 self.assertEqual(security_group, sec_grp_get)
729 def test_get_sec_grp_by_id(self):
731 Tests the neutron_utils.create_security_group() function
734 self.security_groups.append(neutron_utils.create_security_group(
735 self.neutron, self.keystone,
736 SecurityGroupSettings(name=self.sec_grp_name + '-1',
737 description='hello group')))
738 self.security_groups.append(neutron_utils.create_security_group(
739 self.neutron, self.keystone,
740 SecurityGroupSettings(name=self.sec_grp_name + '-2',
741 description='hello group')))
743 sec_grp_1b = neutron_utils.get_security_group_by_id(
744 self.neutron, self.security_groups[0].id)
745 sec_grp_2b = neutron_utils.get_security_group_by_id(
746 self.neutron, self.security_groups[1].id)
748 self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
749 self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
752 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
754 Test basic nova keypair functionality
759 Instantiates the CreateImage object that is responsible for downloading
760 and creating an OS image file within OpenStack
762 self.neutron = neutron_utils.neutron_client(self.os_creds)
763 self.floating_ip = None
767 Cleans the image and downloaded image file
770 neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
772 def test_floating_ips(self):
774 Tests the creation of a floating IP
777 initial_fips = neutron_utils.get_floating_ips(self.neutron)
779 self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
781 all_fips = neutron_utils.get_floating_ips(self.neutron)
782 self.assertEqual(len(initial_fips) + 1, len(all_fips))
783 returned = neutron_utils.get_floating_ip(self.neutron,
785 self.assertEqual(self.floating_ip.id, returned.id)
786 self.assertEqual(self.floating_ip.ip, returned.ip)
794 def validate_network(neutron, name, exists):
796 Returns true if a network for a given name DOES NOT exist if the exists
797 parameter is false conversely true. Returns false if a network for a given
798 name DOES exist if the exists parameter is true conversely false.
799 :param neutron: The neutron client
800 :param name: The expected network name
801 :param exists: Whether or not the network name should exist or not
804 network = neutron_utils.get_network(neutron, name)
805 if exists and network:
807 if not exists and not network:
812 def validate_subnet(neutron, name, cidr, exists):
814 Returns true if a subnet for a given name DOES NOT exist if the exists
815 parameter is false conversely true. Returns false if a subnet for a given
816 name DOES exist if the exists parameter is true conversely false.
817 :param neutron: The neutron client
818 :param name: The expected subnet name
819 :param cidr: The expected CIDR value
820 :param exists: Whether or not the network name should exist or not
823 subnet = neutron_utils.get_subnet_by_name(neutron, name)
824 if exists and subnet:
825 return subnet.cidr == cidr
826 if not exists and not subnet:
831 def validate_router(neutron, name, exists):
833 Returns true if a router for a given name DOES NOT exist if the exists
834 parameter is false conversely true. Returns false if a router for a given
835 name DOES exist if the exists parameter is true conversely false.
836 :param neutron: The neutron client
837 :param name: The expected router name
838 :param exists: Whether or not the network name should exist or not
841 router = neutron_utils.get_router_by_name(neutron, name)
842 if exists and router:
847 def validate_interface_router(interface_router, router, subnet):
849 Returns true if the router ID & subnet ID have been properly included into
850 the interface router object
851 :param interface_router: the SNAPS-OO InterfaceRouter domain object
852 :param router: to validate against the interface_router
853 :param subnet: to validate against the interface_router
854 :return: True if both IDs match else False
856 subnet_id = interface_router.subnet_id
857 router_id = interface_router.port_id
859 return subnet.id == subnet_id and router.id == router_id
862 def validate_port(neutron, port_obj, this_port_name):
864 Returns true if a port for a given name DOES NOT exist if the exists
865 parameter is false conversely true. Returns false if a port for a given
866 name DOES exist if the exists parameter is true conversely false.
867 :param neutron: The neutron client
868 :param port_obj: The port object to lookup
869 :param this_port_name: The expected router name
872 os_ports = neutron.list_ports()
873 for os_port, os_port_insts in os_ports.items():
874 for os_inst in os_port_insts:
875 if os_inst['id'] == port_obj.id:
876 return os_inst['name'] == this_port_name