1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from snaps.openstack import create_router
18 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
20 from snaps.openstack.create_security_group import SecurityGroupSettings, \
21 SecurityGroupRuleSettings, Direction
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
28 __author__ = 'spisarski'
34 class NeutronSmokeTests(OSComponentTestCase):
36 Tests to ensure that the neutron client can communicate with the cloud
39 def test_neutron_connect_success(self):
41 Tests to ensure that the proper credentials can connect.
43 neutron = neutron_utils.neutron_client(self.os_creds)
45 networks = neutron.list_networks()
48 networks = networks.get('networks')
49 for network in networks:
50 if network.get('name') == self.ext_net_name:
52 self.assertTrue(found)
54 def test_neutron_connect_fail(self):
56 Tests to ensure that the improper credentials cannot connect.
58 from snaps.openstack.os_credentials import OSCreds
60 with self.assertRaises(Exception):
61 neutron = neutron_utils.neutron_client(
62 OSCreds(username='user', password='pass', auth_url='url',
63 project_name='project'))
64 neutron.list_networks()
66 def test_retrieve_ext_network_name(self):
68 Tests the neutron_utils.get_external_network_names to ensure the
69 configured self.ext_net_name is contained within the returned list
72 neutron = neutron_utils.neutron_client(self.os_creds)
73 ext_networks = neutron_utils.get_external_networks(neutron)
75 for network in ext_networks:
76 if network['network']['name'] == self.ext_net_name:
79 self.assertTrue(found)
82 class NeutronUtilsNetworkTests(OSComponentTestCase):
84 Test for creating networks via neutron_utils.py
88 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
89 self.port_name = str(guid) + '-port'
90 self.neutron = neutron_utils.neutron_client(self.os_creds)
92 self.net_config = openstack_tests.get_pub_net_config(
93 net_name=guid + '-pub-net')
97 Cleans the remote OpenStack objects
100 neutron_utils.delete_network(self.neutron, self.network)
101 validate_network(self.neutron, self.network['network']['name'],
104 def test_create_network(self):
106 Tests the neutron_utils.create_neutron_net() function
108 self.network = neutron_utils.create_network(
109 self.neutron, self.os_creds, self.net_config.network_settings)
110 self.assertEqual(self.net_config.network_settings.name,
111 self.network['network']['name'])
112 self.assertTrue(validate_network(self.neutron,
113 self.net_config.network_settings.name,
116 def test_create_network_empty_name(self):
118 Tests the neutron_utils.create_neutron_net() function with an empty
121 with self.assertRaises(Exception):
122 self.network = neutron_utils.create_network(
123 self.neutron, self.os_creds,
124 network_settings=NetworkSettings(name=''))
126 def test_create_network_null_name(self):
128 Tests the neutron_utils.create_neutron_net() function when the network
131 with self.assertRaises(Exception):
132 self.network = neutron_utils.create_network(
133 self.neutron, self.os_creds,
134 network_settings=NetworkSettings())
137 class NeutronUtilsSubnetTests(OSComponentTestCase):
139 Test for creating networks with subnets via neutron_utils.py
143 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
144 self.port_name = str(guid) + '-port'
145 self.neutron = neutron_utils.neutron_client(self.os_creds)
148 self.net_config = openstack_tests.get_pub_net_config(
149 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
150 external_net=self.ext_net_name)
154 Cleans the remote OpenStack objects
157 neutron_utils.delete_subnet(self.neutron, self.subnet)
158 validate_subnet(self.neutron, self.subnet.get('name'),
159 self.net_config.network_settings.subnet_settings[
163 neutron_utils.delete_network(self.neutron, self.network)
164 validate_network(self.neutron, self.network['network']['name'],
167 def test_create_subnet(self):
169 Tests the neutron_utils.create_neutron_net() function
171 self.network = neutron_utils.create_network(
172 self.neutron, self.os_creds, self.net_config.network_settings)
173 self.assertEqual(self.net_config.network_settings.name,
174 self.network['network']['name'])
175 self.assertTrue(validate_network(self.neutron,
176 self.net_config.network_settings.name,
179 subnet_setting = self.net_config.network_settings.subnet_settings[0]
180 self.subnet = neutron_utils.create_subnet(
181 self.neutron, subnet_setting,
182 self.os_creds, network=self.network)
186 subnet_setting.cidr, True)
188 def test_create_subnet_null_name(self):
190 Tests the neutron_utils.create_neutron_subnet() function for an
191 Exception when the subnet name is None
193 self.network = neutron_utils.create_network(
194 self.neutron, self.os_creds, self.net_config.network_settings)
195 self.assertEqual(self.net_config.network_settings.name,
196 self.network['network']['name'])
197 self.assertTrue(validate_network(self.neutron,
198 self.net_config.network_settings.name,
201 with self.assertRaises(Exception):
202 SubnetSettings(cidr=self.net_config.subnet_cidr)
204 def test_create_subnet_empty_name(self):
206 Tests the neutron_utils.create_neutron_net() function with an empty
209 self.network = neutron_utils.create_network(
210 self.neutron, self.os_creds, self.net_config.network_settings)
211 self.assertEqual(self.net_config.network_settings.name,
212 self.network['network']['name'])
213 self.assertTrue(validate_network(self.neutron,
214 self.net_config.network_settings.name,
217 subnet_setting = self.net_config.network_settings.subnet_settings[0]
218 neutron_utils.create_subnet(
219 self.neutron, subnet_setting,
220 self.os_creds, network=self.network)
221 validate_subnet(self.neutron, '',
222 subnet_setting.cidr, True)
224 def test_create_subnet_null_cidr(self):
226 Tests the neutron_utils.create_neutron_subnet() function for an
227 Exception when the subnet CIDR value is None
229 self.network = neutron_utils.create_network(
230 self.neutron, self.os_creds, self.net_config.network_settings)
231 self.assertEqual(self.net_config.network_settings.name,
232 self.network['network']['name'])
233 self.assertTrue(validate_network(self.neutron,
234 self.net_config.network_settings.name,
237 with self.assertRaises(Exception):
238 sub_sets = SubnetSettings(cidr=None,
239 name=self.net_config.subnet_name)
240 neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
241 network=self.network)
243 def test_create_subnet_empty_cidr(self):
245 Tests the neutron_utils.create_neutron_subnet() function for an
246 Exception when the subnet CIDR value is empty
248 self.network = neutron_utils.create_network(
249 self.neutron, self.os_creds, self.net_config.network_settings)
250 self.assertEqual(self.net_config.network_settings.name,
251 self.network['network']['name'])
252 self.assertTrue(validate_network(self.neutron,
253 self.net_config.network_settings.name,
256 with self.assertRaises(Exception):
257 sub_sets = SubnetSettings(cidr='',
258 name=self.net_config.subnet_name)
259 neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
260 network=self.network)
263 class NeutronUtilsRouterTests(OSComponentTestCase):
265 Test for creating routers via neutron_utils.py
269 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
270 self.port_name = str(guid) + '-port'
271 self.neutron = neutron_utils.neutron_client(self.os_creds)
276 self.interface_router = None
277 self.net_config = openstack_tests.get_pub_net_config(
278 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
279 router_name=guid + '-pub-router', external_net=self.ext_net_name)
283 Cleans the remote OpenStack objects
285 if self.interface_router:
286 neutron_utils.remove_interface_router(self.neutron, self.router,
290 neutron_utils.delete_router(self.neutron, self.router)
291 validate_router(self.neutron, self.router.get('name'), False)
294 neutron_utils.delete_port(self.neutron, self.port)
297 neutron_utils.delete_subnet(self.neutron, self.subnet)
298 validate_subnet(self.neutron, self.subnet.get('name'),
299 self.net_config.network_settings.subnet_settings[
303 neutron_utils.delete_network(self.neutron, self.network)
304 validate_network(self.neutron, self.network['network']['name'],
307 def test_create_router_simple(self):
309 Tests the neutron_utils.create_neutron_net() function when an external
312 self.router = neutron_utils.create_router(
313 self.neutron, self.os_creds, self.net_config.router_settings)
314 validate_router(self.neutron, self.net_config.router_settings.name,
317 def test_create_router_with_public_interface(self):
319 Tests the neutron_utils.create_neutron_net() function when an external
322 subnet_setting = self.net_config.network_settings.subnet_settings[0]
323 self.net_config = openstack_tests.OSNetworkConfig(
324 self.net_config.network_settings.name,
327 self.net_config.router_settings.name,
329 self.router = neutron_utils.create_router(
330 self.neutron, self.os_creds, self.net_config.router_settings)
331 validate_router(self.neutron, self.net_config.router_settings.name,
333 # TODO - Add validation that the router gatway has been set
335 def test_create_router_empty_name(self):
337 Tests the neutron_utils.create_neutron_net() function
339 with self.assertRaises(Exception):
340 this_router_settings = create_router.RouterSettings(name='')
341 self.router = neutron_utils.create_router(self.neutron,
343 this_router_settings)
345 def test_create_router_null_name(self):
347 Tests the neutron_utils.create_neutron_subnet() function when the
348 subnet CIDR value is None
350 with self.assertRaises(Exception):
351 this_router_settings = create_router.RouterSettings()
352 self.router = neutron_utils.create_router(self.neutron,
354 this_router_settings)
355 validate_router(self.neutron, None, True)
357 def test_add_interface_router(self):
359 Tests the neutron_utils.add_interface_router() function
361 self.network = neutron_utils.create_network(
362 self.neutron, self.os_creds, self.net_config.network_settings)
363 self.assertEqual(self.net_config.network_settings.name,
364 self.network['network']['name'])
365 self.assertTrue(validate_network(self.neutron,
366 self.net_config.network_settings.name,
369 subnet_setting = self.net_config.network_settings.subnet_settings[0]
370 self.subnet = neutron_utils.create_subnet(
371 self.neutron, subnet_setting,
372 self.os_creds, self.network)
376 subnet_setting.cidr, True)
378 self.router = neutron_utils.create_router(
379 self.neutron, self.os_creds, self.net_config.router_settings)
380 validate_router(self.neutron, self.net_config.router_settings.name,
383 self.interface_router = neutron_utils.add_interface_router(
384 self.neutron, self.router, self.subnet)
385 validate_interface_router(self.interface_router, self.router,
388 def test_add_interface_router_null_router(self):
390 Tests the neutron_utils.add_interface_router() function for an
391 Exception when the router value is None
393 self.network = neutron_utils.create_network(
394 self.neutron, self.os_creds, self.net_config.network_settings)
395 self.assertEqual(self.net_config.network_settings.name,
396 self.network['network']['name'])
397 self.assertTrue(validate_network(self.neutron,
398 self.net_config.network_settings.name,
401 subnet_setting = self.net_config.network_settings.subnet_settings[0]
402 self.subnet = neutron_utils.create_subnet(
403 self.neutron, subnet_setting,
404 self.os_creds, self.network)
408 subnet_setting.cidr, True)
410 with self.assertRaises(Exception):
411 self.interface_router = neutron_utils.add_interface_router(
412 self.neutron, self.router, self.subnet)
414 def test_add_interface_router_null_subnet(self):
416 Tests the neutron_utils.add_interface_router() function for an
417 Exception when the subnet value is None
419 self.network = neutron_utils.create_network(
420 self.neutron, self.os_creds, self.net_config.network_settings)
421 self.assertEqual(self.net_config.network_settings.name,
422 self.network['network']['name'])
423 self.assertTrue(validate_network(self.neutron,
424 self.net_config.network_settings.name,
427 self.router = neutron_utils.create_router(
428 self.neutron, self.os_creds, self.net_config.router_settings)
429 validate_router(self.neutron, self.net_config.router_settings.name,
432 with self.assertRaises(Exception):
433 self.interface_router = neutron_utils.add_interface_router(
434 self.neutron, self.router, self.subnet)
436 def test_create_port(self):
438 Tests the neutron_utils.create_port() function
440 self.network = neutron_utils.create_network(
441 self.neutron, self.os_creds, self.net_config.network_settings)
442 self.assertEqual(self.net_config.network_settings.name,
443 self.network['network']['name'])
444 self.assertTrue(validate_network(self.neutron,
445 self.net_config.network_settings.name,
448 subnet_setting = self.net_config.network_settings.subnet_settings[0]
449 self.subnet = neutron_utils.create_subnet(
450 self.neutron, subnet_setting, self.os_creds, self.network)
451 validate_subnet(self.neutron, subnet_setting.name,
452 subnet_setting.cidr, True)
454 self.port = neutron_utils.create_port(
455 self.neutron, self.os_creds, PortSettings(
458 'subnet_name': subnet_setting.name,
460 network_name=self.net_config.network_settings.name))
461 validate_port(self.neutron, self.port, self.port_name)
463 def test_create_port_empty_name(self):
465 Tests the neutron_utils.create_port() function
467 self.network = neutron_utils.create_network(
468 self.neutron, self.os_creds, self.net_config.network_settings)
469 self.assertEqual(self.net_config.network_settings.name,
470 self.network['network']['name'])
471 self.assertTrue(validate_network(self.neutron,
472 self.net_config.network_settings.name,
475 subnet_setting = self.net_config.network_settings.subnet_settings[0]
476 self.subnet = neutron_utils.create_subnet(
477 self.neutron, subnet_setting, self.os_creds, self.network)
478 validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr,
481 self.port = neutron_utils.create_port(
482 self.neutron, self.os_creds, PortSettings(
484 network_name=self.net_config.network_settings.name,
486 'subnet_name': subnet_setting.name,
488 validate_port(self.neutron, self.port, self.port_name)
490 def test_create_port_null_name(self):
492 Tests the neutron_utils.create_port() function for an Exception when
493 the port name value is None
495 self.network = neutron_utils.create_network(
496 self.neutron, self.os_creds, self.net_config.network_settings)
497 self.assertEqual(self.net_config.network_settings.name,
498 self.network['network']['name'])
499 self.assertTrue(validate_network(self.neutron,
500 self.net_config.network_settings.name,
503 subnet_setting = self.net_config.network_settings.subnet_settings[0]
504 self.subnet = neutron_utils.create_subnet(
505 self.neutron, subnet_setting,
506 self.os_creds, self.network)
510 subnet_setting.cidr, True)
512 with self.assertRaises(Exception):
513 self.port = neutron_utils.create_port(
514 self.neutron, self.os_creds,
516 network_name=self.net_config.network_settings.name,
518 'subnet_name': subnet_setting.name,
521 def test_create_port_null_network_object(self):
523 Tests the neutron_utils.create_port() function for an Exception when
524 the network object is None
526 with self.assertRaises(Exception):
527 self.port = neutron_utils.create_port(
528 self.neutron, self.os_creds,
531 network_name=self.net_config.network_settings.name,
534 self.net_config.network_settings.subnet_settings[
538 def test_create_port_null_ip(self):
540 Tests the neutron_utils.create_port() function for an Exception when
543 self.network = neutron_utils.create_network(
544 self.neutron, self.os_creds, self.net_config.network_settings)
545 self.assertEqual(self.net_config.network_settings.name,
546 self.network['network']['name'])
547 self.assertTrue(validate_network(self.neutron,
548 self.net_config.network_settings.name,
551 subnet_setting = self.net_config.network_settings.subnet_settings[0]
552 self.subnet = neutron_utils.create_subnet(
553 self.neutron, subnet_setting,
554 self.os_creds, self.network)
558 subnet_setting.cidr, True)
560 with self.assertRaises(Exception):
561 self.port = neutron_utils.create_port(
562 self.neutron, self.os_creds,
565 network_name=self.net_config.network_settings.name,
567 'subnet_name': subnet_setting.name,
570 def test_create_port_invalid_ip(self):
572 Tests the neutron_utils.create_port() function for an Exception when
575 self.network = neutron_utils.create_network(
576 self.neutron, self.os_creds, self.net_config.network_settings)
577 self.assertEqual(self.net_config.network_settings.name,
578 self.network['network']['name'])
579 self.assertTrue(validate_network(self.neutron,
580 self.net_config.network_settings.name,
583 subnet_setting = self.net_config.network_settings.subnet_settings[0]
584 self.subnet = neutron_utils.create_subnet(
587 self.os_creds, self.network)
588 validate_subnet(self.neutron,
590 subnet_setting.cidr, True)
592 with self.assertRaises(Exception):
593 self.port = neutron_utils.create_port(
594 self.neutron, self.os_creds,
597 network_name=self.net_config.network_settings.name,
599 'subnet_name': subnet_setting.name,
602 def test_create_port_invalid_ip_to_subnet(self):
604 Tests the neutron_utils.create_port() function for an Exception when
607 self.network = neutron_utils.create_network(
608 self.neutron, self.os_creds, self.net_config.network_settings)
609 self.assertEqual(self.net_config.network_settings.name,
610 self.network['network']['name'])
611 self.assertTrue(validate_network(self.neutron,
612 self.net_config.network_settings.name,
615 subnet_setting = self.net_config.network_settings.subnet_settings[0]
616 self.subnet = neutron_utils.create_subnet(
619 self.os_creds, self.network)
620 validate_subnet(self.neutron,
622 subnet_setting.cidr, True)
624 with self.assertRaises(Exception):
625 self.port = neutron_utils.create_port(
626 self.neutron, self.os_creds,
629 network_name=self.net_config.network_settings.name,
631 'subnet_name': subnet_setting.name,
632 'ip': '10.197.123.100'}]))
635 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
637 Test for creating security groups via neutron_utils.py
641 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
642 self.sec_grp_name = guid + 'name'
644 self.security_groups = list()
645 self.security_group_rules = list()
646 self.neutron = neutron_utils.neutron_client(self.os_creds)
647 self.keystone = keystone_utils.keystone_client(self.os_creds)
651 Cleans the remote OpenStack objects
653 for rule in self.security_group_rules:
654 neutron_utils.delete_security_group_rule(self.neutron, rule)
656 for security_group in self.security_groups:
658 neutron_utils.delete_security_group(self.neutron,
663 def test_create_delete_simple_sec_grp(self):
665 Tests the neutron_utils.create_security_group() function
667 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
668 security_group = neutron_utils.create_security_group(self.neutron,
672 self.assertTrue(sec_grp_settings.name, security_group.name)
674 sec_grp_get = neutron_utils.get_security_group(self.neutron,
675 sec_grp_settings.name)
676 self.assertIsNotNone(sec_grp_get)
677 self.assertTrue(validation_utils.objects_equivalent(
678 security_group, sec_grp_get))
680 neutron_utils.delete_security_group(self.neutron, security_group)
681 sec_grp_get = neutron_utils.get_security_group(self.neutron,
682 sec_grp_settings.name)
683 self.assertIsNone(sec_grp_get)
685 def test_create_sec_grp_no_name(self):
687 Tests the SecurityGroupSettings constructor and
688 neutron_utils.create_security_group() function to ensure that
689 attempting to create a security group without a name will raise an
692 with self.assertRaises(Exception):
693 sec_grp_settings = SecurityGroupSettings()
694 self.security_groups.append(
695 neutron_utils.create_security_group(self.neutron,
699 def test_create_sec_grp_no_rules(self):
701 Tests the neutron_utils.create_security_group() function
703 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
704 description='hello group')
705 self.security_groups.append(
706 neutron_utils.create_security_group(self.neutron, self.keystone,
709 self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
710 self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
712 sec_grp_get = neutron_utils.get_security_group(self.neutron,
713 sec_grp_settings.name)
714 self.assertIsNotNone(sec_grp_get)
715 self.assertEqual(self.security_groups[0], sec_grp_get)
717 def test_create_sec_grp_one_rule(self):
719 Tests the neutron_utils.create_security_group() function
722 sec_grp_rule_settings = SecurityGroupRuleSettings(
723 sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
724 sec_grp_settings = SecurityGroupSettings(
725 name=self.sec_grp_name, description='hello group',
726 rule_settings=[sec_grp_rule_settings])
728 self.security_groups.append(
729 neutron_utils.create_security_group(self.neutron, self.keystone,
731 free_rules = neutron_utils.get_rules_by_security_group(
732 self.neutron, self.security_groups[0])
733 for free_rule in free_rules:
734 self.security_group_rules.append(free_rule)
736 self.security_group_rules.append(
737 neutron_utils.create_security_group_rule(
738 self.neutron, sec_grp_settings.rule_settings[0]))
740 # Refresh object so it is populated with the newly added rule
741 security_group = neutron_utils.get_security_group(
742 self.neutron, sec_grp_settings.name)
744 rules = neutron_utils.get_rules_by_security_group(self.neutron,
748 validation_utils.objects_equivalent(
749 self.security_group_rules, rules))
751 self.assertTrue(sec_grp_settings.name, security_group.name)
753 sec_grp_get = neutron_utils.get_security_group(self.neutron,
754 sec_grp_settings.name)
755 self.assertIsNotNone(sec_grp_get)
756 self.assertEqual(security_group, sec_grp_get)
758 def test_get_sec_grp_by_id(self):
760 Tests the neutron_utils.create_security_group() function
763 self.security_groups.append(neutron_utils.create_security_group(
764 self.neutron, self.keystone,
765 SecurityGroupSettings(name=self.sec_grp_name + '-1',
766 description='hello group')))
767 self.security_groups.append(neutron_utils.create_security_group(
768 self.neutron, self.keystone,
769 SecurityGroupSettings(name=self.sec_grp_name + '-2',
770 description='hello group')))
772 sec_grp_1b = neutron_utils.get_security_group_by_id(
773 self.neutron, self.security_groups[0].id)
774 sec_grp_2b = neutron_utils.get_security_group_by_id(
775 self.neutron, self.security_groups[1].id)
777 self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
778 self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
781 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
783 Test basic nova keypair functionality
788 Instantiates the CreateImage object that is responsible for downloading
789 and creating an OS image file within OpenStack
791 self.neutron = neutron_utils.neutron_client(self.os_creds)
792 self.floating_ip = None
796 Cleans the image and downloaded image file
799 neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
801 def test_floating_ips(self):
803 Tests the creation of a floating IP
806 initial_fips = neutron_utils.get_floating_ips(self.neutron)
808 self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
810 all_fips = neutron_utils.get_floating_ips(self.neutron)
811 self.assertEqual(len(initial_fips) + 1, len(all_fips))
812 returned = neutron_utils.get_floating_ip(self.neutron,
814 self.assertEqual(self.floating_ip.id, returned.id)
815 self.assertEqual(self.floating_ip.ip, returned.ip)
823 def validate_network(neutron, name, exists):
825 Returns true if a network for a given name DOES NOT exist if the exists
826 parameter is false conversely true. Returns false if a network for a given
827 name DOES exist if the exists parameter is true conversely false.
828 :param neutron: The neutron client
829 :param name: The expected network name
830 :param exists: Whether or not the network name should exist or not
833 network = neutron_utils.get_network(neutron, name)
834 if exists and network:
836 if not exists and not network:
841 def validate_subnet(neutron, name, cidr, exists):
843 Returns true if a subnet for a given name DOES NOT exist if the exists
844 parameter is false conversely true. Returns false if a subnet for a given
845 name DOES exist if the exists parameter is true conversely false.
846 :param neutron: The neutron client
847 :param name: The expected subnet name
848 :param cidr: The expected CIDR value
849 :param exists: Whether or not the network name should exist or not
852 subnet = neutron_utils.get_subnet_by_name(neutron, name)
853 if exists and subnet:
854 return subnet.get('cidr') == cidr
855 if not exists and not subnet:
860 def validate_router(neutron, name, exists):
862 Returns true if a router for a given name DOES NOT exist if the exists
863 parameter is false conversely true. Returns false if a router for a given
864 name DOES exist if the exists parameter is true conversely false.
865 :param neutron: The neutron client
866 :param name: The expected router name
867 :param exists: Whether or not the network name should exist or not
870 router = neutron_utils.get_router_by_name(neutron, name)
871 if exists and router:
876 def validate_interface_router(interface_router, router, subnet):
878 Returns true if the router ID & subnet ID have been properly included into
879 the interface router object
880 :param interface_router: the object to validate
881 :param router: to validate against the interface_router
882 :param subnet: to validate against the interface_router
883 :return: True if both IDs match else False
885 subnet_id = interface_router.get('subnet_id')
886 router_id = interface_router.get('port_id')
888 return subnet.get('id') == subnet_id and router.get('id') == router_id
891 def validate_port(neutron, port_obj, this_port_name):
893 Returns true if a port for a given name DOES NOT exist if the exists
894 parameter is false conversely true. Returns false if a port for a given
895 name DOES exist if the exists parameter is true conversely false.
896 :param neutron: The neutron client
897 :param port_obj: The port object to lookup
898 :param this_port_name: The expected router name
901 ports = neutron.list_ports()
902 for port, port_insts in ports.items():
903 for inst in port_insts:
904 if inst['id'] == port_obj['port']['id']:
905 return inst['name'] == this_port_name