1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from snaps.openstack import create_router
18 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
20 from snaps.openstack.create_security_group import SecurityGroupSettings, \
21 SecurityGroupRuleSettings, Direction
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
29 __author__ = 'spisarski'
35 class NeutronSmokeTests(OSComponentTestCase):
37 Tests to ensure that the neutron client can communicate with the cloud
40 def test_neutron_connect_success(self):
42 Tests to ensure that the proper credentials can connect.
44 neutron = neutron_utils.neutron_client(self.os_creds)
46 networks = neutron.list_networks()
49 networks = networks.get('networks')
50 for network in networks:
51 if network.get('name') == self.ext_net_name:
53 self.assertTrue(found)
55 def test_neutron_connect_fail(self):
57 Tests to ensure that the improper credentials cannot connect.
59 from snaps.openstack.os_credentials import OSCreds
61 with self.assertRaises(Exception):
62 neutron = neutron_utils.neutron_client(
63 OSCreds(username='user', password='pass', auth_url='url',
64 project_name='project'))
65 neutron.list_networks()
67 def test_retrieve_ext_network_name(self):
69 Tests the neutron_utils.get_external_network_names to ensure the
70 configured self.ext_net_name is contained within the returned list
73 neutron = neutron_utils.neutron_client(self.os_creds)
74 ext_networks = neutron_utils.get_external_networks(neutron)
76 for network in ext_networks:
77 if network.name == self.ext_net_name:
80 self.assertTrue(found)
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
85 Test for creating networks via neutron_utils.py
89 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90 self.port_name = str(guid) + '-port'
91 self.neutron = neutron_utils.neutron_client(self.os_creds)
93 self.net_config = openstack_tests.get_pub_net_config(
94 net_name=guid + '-pub-net')
98 Cleans the remote OpenStack objects
101 neutron_utils.delete_network(self.neutron, self.network)
103 def test_create_network(self):
105 Tests the neutron_utils.create_neutron_net() function
107 self.network = neutron_utils.create_network(
108 self.neutron, self.os_creds, self.net_config.network_settings)
109 self.assertEqual(self.net_config.network_settings.name,
111 self.assertTrue(validate_network(
112 self.neutron, self.net_config.network_settings.name, True))
114 def test_create_network_empty_name(self):
116 Tests the neutron_utils.create_neutron_net() function with an empty
119 with self.assertRaises(Exception):
120 self.network = neutron_utils.create_network(
121 self.neutron, self.os_creds,
122 network_settings=NetworkSettings(name=''))
124 def test_create_network_null_name(self):
126 Tests the neutron_utils.create_neutron_net() function when the network
129 with self.assertRaises(Exception):
130 self.network = neutron_utils.create_network(
131 self.neutron, self.os_creds,
132 network_settings=NetworkSettings())
135 class NeutronUtilsSubnetTests(OSComponentTestCase):
137 Test for creating networks with subnets via neutron_utils.py
141 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
142 self.port_name = str(guid) + '-port'
143 self.neutron = neutron_utils.neutron_client(self.os_creds)
146 self.net_config = openstack_tests.get_pub_net_config(
147 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
148 external_net=self.ext_net_name)
152 Cleans the remote OpenStack objects
155 neutron_utils.delete_subnet(self.neutron, self.subnet)
157 neutron_utils.delete_network(self.neutron, self.network)
159 def test_create_subnet(self):
161 Tests the neutron_utils.create_neutron_net() function
163 self.network = neutron_utils.create_network(
164 self.neutron, self.os_creds, self.net_config.network_settings)
165 self.assertEqual(self.net_config.network_settings.name,
167 self.assertTrue(validate_network(
168 self.neutron, self.net_config.network_settings.name, True))
170 subnet_setting = self.net_config.network_settings.subnet_settings[0]
171 self.subnet = neutron_utils.create_subnet(
172 self.neutron, subnet_setting, self.os_creds, network=self.network)
173 self.assertTrue(validate_subnet(
174 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
176 def test_create_subnet_null_name(self):
178 Tests the neutron_utils.create_neutron_subnet() function for an
179 Exception when the subnet name is None
181 self.network = neutron_utils.create_network(
182 self.neutron, self.os_creds, self.net_config.network_settings)
183 self.assertEqual(self.net_config.network_settings.name,
185 self.assertTrue(validate_network(
186 self.neutron, self.net_config.network_settings.name, True))
188 with self.assertRaises(Exception):
189 SubnetSettings(cidr=self.net_config.subnet_cidr)
191 def test_create_subnet_empty_name(self):
193 Tests the neutron_utils.create_neutron_net() function with an empty
196 self.network = neutron_utils.create_network(
197 self.neutron, self.os_creds, self.net_config.network_settings)
198 self.assertEqual(self.net_config.network_settings.name,
200 self.assertTrue(validate_network(
201 self.neutron, self.net_config.network_settings.name, True))
203 subnet_setting = self.net_config.network_settings.subnet_settings[0]
204 neutron_utils.create_subnet(
205 self.neutron, subnet_setting, self.os_creds, network=self.network)
206 self.assertTrue(validate_subnet(
207 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
208 self.assertFalse(validate_subnet(
209 self.neutron, '', subnet_setting.cidr, True))
211 def test_create_subnet_null_cidr(self):
213 Tests the neutron_utils.create_neutron_subnet() function for an
214 Exception when the subnet CIDR value is None
216 self.network = neutron_utils.create_network(
217 self.neutron, self.os_creds, self.net_config.network_settings)
218 self.assertEqual(self.net_config.network_settings.name,
220 self.assertTrue(validate_network(
221 self.neutron, self.net_config.network_settings.name, True))
223 with self.assertRaises(Exception):
224 sub_sets = SubnetSettings(
225 cidr=None, name=self.net_config.subnet_name)
226 neutron_utils.create_subnet(
227 self.neutron, sub_sets, self.os_creds, network=self.network)
229 def test_create_subnet_empty_cidr(self):
231 Tests the neutron_utils.create_neutron_subnet() function for an
232 Exception when the subnet CIDR value is empty
234 self.network = neutron_utils.create_network(
235 self.neutron, self.os_creds, self.net_config.network_settings)
236 self.assertEqual(self.net_config.network_settings.name,
238 self.assertTrue(validate_network(
239 self.neutron, self.net_config.network_settings.name, True))
241 with self.assertRaises(Exception):
242 sub_sets = SubnetSettings(
243 cidr='', name=self.net_config.subnet_name)
244 neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
245 network=self.network)
248 class NeutronUtilsRouterTests(OSComponentTestCase):
250 Test for creating routers via neutron_utils.py
254 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
255 self.port_name = str(guid) + '-port'
256 self.neutron = neutron_utils.neutron_client(self.os_creds)
261 self.interface_router = None
262 self.net_config = openstack_tests.get_pub_net_config(
263 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
264 router_name=guid + '-pub-router', external_net=self.ext_net_name)
268 Cleans the remote OpenStack objects
270 if self.interface_router:
271 neutron_utils.remove_interface_router(self.neutron, self.router,
275 neutron_utils.delete_router(self.neutron, self.router)
276 validate_router(self.neutron, self.router.name, False)
279 neutron_utils.delete_port(self.neutron, self.port)
282 neutron_utils.delete_subnet(self.neutron, self.subnet)
285 neutron_utils.delete_network(self.neutron, self.network)
287 def test_create_router_simple(self):
289 Tests the neutron_utils.create_neutron_net() function when an external
292 self.router = neutron_utils.create_router(
293 self.neutron, self.os_creds, self.net_config.router_settings)
294 validate_router(self.neutron, self.net_config.router_settings.name,
297 def test_create_router_with_public_interface(self):
299 Tests the neutron_utils.create_neutron_net() function when an external
302 subnet_setting = self.net_config.network_settings.subnet_settings[0]
303 self.net_config = openstack_tests.OSNetworkConfig(
304 self.net_config.network_settings.name,
307 self.net_config.router_settings.name,
309 self.router = neutron_utils.create_router(
310 self.neutron, self.os_creds, self.net_config.router_settings)
311 validate_router(self.neutron, self.net_config.router_settings.name,
314 ext_net = neutron_utils.get_network(
315 self.neutron, network_name=self.ext_net_name)
317 self.router.external_gateway_info['network_id'], ext_net.id)
319 def test_create_router_empty_name(self):
321 Tests the neutron_utils.create_neutron_net() function
323 with self.assertRaises(Exception):
324 this_router_settings = create_router.RouterSettings(name='')
325 self.router = neutron_utils.create_router(self.neutron,
327 this_router_settings)
329 def test_create_router_null_name(self):
331 Tests the neutron_utils.create_neutron_subnet() function when the
332 subnet CIDR value is None
334 with self.assertRaises(Exception):
335 this_router_settings = create_router.RouterSettings()
336 self.router = neutron_utils.create_router(self.neutron,
338 this_router_settings)
339 validate_router(self.neutron, None, True)
341 def test_add_interface_router(self):
343 Tests the neutron_utils.add_interface_router() function
345 self.network = neutron_utils.create_network(
346 self.neutron, self.os_creds, self.net_config.network_settings)
347 self.assertEqual(self.net_config.network_settings.name,
349 self.assertTrue(validate_network(
350 self.neutron, self.net_config.network_settings.name, True))
352 subnet_setting = self.net_config.network_settings.subnet_settings[0]
353 self.subnet = neutron_utils.create_subnet(
354 self.neutron, subnet_setting,
355 self.os_creds, self.network)
356 self.assertTrue(validate_subnet(
357 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
359 self.router = neutron_utils.create_router(
360 self.neutron, self.os_creds, self.net_config.router_settings)
361 validate_router(self.neutron, self.net_config.router_settings.name,
364 self.interface_router = neutron_utils.add_interface_router(
365 self.neutron, self.router, self.subnet)
366 validate_interface_router(self.interface_router, self.router,
369 def test_add_interface_router_null_router(self):
371 Tests the neutron_utils.add_interface_router() function for an
372 Exception when the router value is None
374 self.network = neutron_utils.create_network(
375 self.neutron, self.os_creds, self.net_config.network_settings)
376 self.assertEqual(self.net_config.network_settings.name,
378 self.assertTrue(validate_network(
379 self.neutron, self.net_config.network_settings.name, True))
381 subnet_setting = self.net_config.network_settings.subnet_settings[0]
382 self.subnet = neutron_utils.create_subnet(
383 self.neutron, subnet_setting,
384 self.os_creds, self.network)
385 self.assertTrue(validate_subnet(
386 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
388 with self.assertRaises(NeutronException):
389 self.interface_router = neutron_utils.add_interface_router(
390 self.neutron, self.router, self.subnet)
392 def test_add_interface_router_null_subnet(self):
394 Tests the neutron_utils.add_interface_router() function for an
395 Exception when the subnet value is None
397 self.network = neutron_utils.create_network(
398 self.neutron, self.os_creds, self.net_config.network_settings)
399 self.assertEqual(self.net_config.network_settings.name,
401 self.assertTrue(validate_network(
402 self.neutron, self.net_config.network_settings.name, True))
404 self.router = neutron_utils.create_router(
405 self.neutron, self.os_creds, self.net_config.router_settings)
406 validate_router(self.neutron, self.net_config.router_settings.name,
409 with self.assertRaises(NeutronException):
410 self.interface_router = neutron_utils.add_interface_router(
411 self.neutron, self.router, self.subnet)
413 def test_create_port(self):
415 Tests the neutron_utils.create_port() function
417 self.network = neutron_utils.create_network(
418 self.neutron, self.os_creds, self.net_config.network_settings)
419 self.assertEqual(self.net_config.network_settings.name,
421 self.assertTrue(validate_network(
422 self.neutron, self.net_config.network_settings.name, True))
424 subnet_setting = self.net_config.network_settings.subnet_settings[0]
425 self.subnet = neutron_utils.create_subnet(
426 self.neutron, subnet_setting, self.os_creds, self.network)
427 self.assertTrue(validate_subnet(
428 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
430 self.port = neutron_utils.create_port(
431 self.neutron, self.os_creds, PortSettings(
434 'subnet_name': subnet_setting.name,
436 network_name=self.net_config.network_settings.name))
437 validate_port(self.neutron, self.port, self.port_name)
439 def test_create_port_empty_name(self):
441 Tests the neutron_utils.create_port() function
443 self.network = neutron_utils.create_network(
444 self.neutron, self.os_creds, self.net_config.network_settings)
445 self.assertEqual(self.net_config.network_settings.name,
447 self.assertTrue(validate_network(
448 self.neutron, self.net_config.network_settings.name, True))
450 subnet_setting = self.net_config.network_settings.subnet_settings[0]
451 self.subnet = neutron_utils.create_subnet(
452 self.neutron, subnet_setting, self.os_creds, self.network)
453 self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
454 subnet_setting.cidr, True))
456 self.port = neutron_utils.create_port(
457 self.neutron, self.os_creds, PortSettings(
459 network_name=self.net_config.network_settings.name,
461 'subnet_name': subnet_setting.name,
463 validate_port(self.neutron, self.port, self.port_name)
465 def test_create_port_null_name(self):
467 Tests the neutron_utils.create_port() function for an Exception when
468 the port name value is None
470 self.network = neutron_utils.create_network(
471 self.neutron, self.os_creds, self.net_config.network_settings)
472 self.assertEqual(self.net_config.network_settings.name,
474 self.assertTrue(validate_network(
475 self.neutron, self.net_config.network_settings.name, True))
477 subnet_setting = self.net_config.network_settings.subnet_settings[0]
478 self.subnet = neutron_utils.create_subnet(
479 self.neutron, subnet_setting,
480 self.os_creds, self.network)
481 self.assertTrue(validate_subnet(
482 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
484 with self.assertRaises(Exception):
485 self.port = neutron_utils.create_port(
486 self.neutron, self.os_creds,
488 network_name=self.net_config.network_settings.name,
490 'subnet_name': subnet_setting.name,
493 def test_create_port_null_network_object(self):
495 Tests the neutron_utils.create_port() function for an Exception when
496 the network object is None
498 with self.assertRaises(Exception):
499 self.port = neutron_utils.create_port(
500 self.neutron, self.os_creds,
503 network_name=self.net_config.network_settings.name,
506 self.net_config.network_settings.subnet_settings[
510 def test_create_port_null_ip(self):
512 Tests the neutron_utils.create_port() function for an Exception when
515 self.network = neutron_utils.create_network(
516 self.neutron, self.os_creds, self.net_config.network_settings)
517 self.assertEqual(self.net_config.network_settings.name,
519 self.assertTrue(validate_network(
520 self.neutron, self.net_config.network_settings.name, True))
522 subnet_setting = self.net_config.network_settings.subnet_settings[0]
523 self.subnet = neutron_utils.create_subnet(
524 self.neutron, subnet_setting,
525 self.os_creds, self.network)
526 self.assertTrue(validate_subnet(
527 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
529 with self.assertRaises(Exception):
530 self.port = neutron_utils.create_port(
531 self.neutron, self.os_creds,
534 network_name=self.net_config.network_settings.name,
536 'subnet_name': subnet_setting.name,
539 def test_create_port_invalid_ip(self):
541 Tests the neutron_utils.create_port() function for an Exception when
544 self.network = neutron_utils.create_network(
545 self.neutron, self.os_creds, self.net_config.network_settings)
546 self.assertEqual(self.net_config.network_settings.name,
548 self.assertTrue(validate_network(
549 self.neutron, self.net_config.network_settings.name, True))
551 subnet_setting = self.net_config.network_settings.subnet_settings[0]
552 self.subnet = neutron_utils.create_subnet(
553 self.neutron, subnet_setting, self.os_creds, self.network)
554 self.assertTrue(validate_subnet(
555 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
557 with self.assertRaises(Exception):
558 self.port = neutron_utils.create_port(
559 self.neutron, self.os_creds,
562 network_name=self.net_config.network_settings.name,
564 'subnet_name': subnet_setting.name,
567 def test_create_port_invalid_ip_to_subnet(self):
569 Tests the neutron_utils.create_port() function for an Exception when
572 self.network = neutron_utils.create_network(
573 self.neutron, self.os_creds, self.net_config.network_settings)
574 self.assertEqual(self.net_config.network_settings.name,
576 self.assertTrue(validate_network(
577 self.neutron, self.net_config.network_settings.name, True))
579 subnet_setting = self.net_config.network_settings.subnet_settings[0]
580 self.subnet = neutron_utils.create_subnet(
581 self.neutron, subnet_setting, self.os_creds, self.network)
582 self.assertTrue(validate_subnet(
583 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
585 with self.assertRaises(Exception):
586 self.port = neutron_utils.create_port(
587 self.neutron, self.os_creds,
590 network_name=self.net_config.network_settings.name,
592 'subnet_name': subnet_setting.name,
593 'ip': '10.197.123.100'}]))
596 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
598 Test for creating security groups via neutron_utils.py
602 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
603 self.sec_grp_name = guid + 'name'
605 self.security_groups = list()
606 self.security_group_rules = list()
607 self.neutron = neutron_utils.neutron_client(self.os_creds)
608 self.keystone = keystone_utils.keystone_client(self.os_creds)
612 Cleans the remote OpenStack objects
614 for rule in self.security_group_rules:
615 neutron_utils.delete_security_group_rule(self.neutron, rule)
617 for security_group in self.security_groups:
619 neutron_utils.delete_security_group(self.neutron,
624 def test_create_delete_simple_sec_grp(self):
626 Tests the neutron_utils.create_security_group() function
628 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
629 security_group = neutron_utils.create_security_group(self.neutron,
633 self.assertTrue(sec_grp_settings.name, security_group.name)
635 sec_grp_get = neutron_utils.get_security_group(self.neutron,
636 sec_grp_settings.name)
637 self.assertIsNotNone(sec_grp_get)
638 self.assertTrue(validation_utils.objects_equivalent(
639 security_group, sec_grp_get))
641 neutron_utils.delete_security_group(self.neutron, security_group)
642 sec_grp_get = neutron_utils.get_security_group(self.neutron,
643 sec_grp_settings.name)
644 self.assertIsNone(sec_grp_get)
646 def test_create_sec_grp_no_name(self):
648 Tests the SecurityGroupSettings constructor and
649 neutron_utils.create_security_group() function to ensure that
650 attempting to create a security group without a name will raise an
653 with self.assertRaises(Exception):
654 sec_grp_settings = SecurityGroupSettings()
655 self.security_groups.append(
656 neutron_utils.create_security_group(self.neutron,
660 def test_create_sec_grp_no_rules(self):
662 Tests the neutron_utils.create_security_group() function
664 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
665 description='hello group')
666 self.security_groups.append(
667 neutron_utils.create_security_group(self.neutron, self.keystone,
670 self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
671 self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
673 sec_grp_get = neutron_utils.get_security_group(self.neutron,
674 sec_grp_settings.name)
675 self.assertIsNotNone(sec_grp_get)
676 self.assertEqual(self.security_groups[0], sec_grp_get)
678 def test_create_sec_grp_one_rule(self):
680 Tests the neutron_utils.create_security_group() function
683 sec_grp_rule_settings = SecurityGroupRuleSettings(
684 sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
685 sec_grp_settings = SecurityGroupSettings(
686 name=self.sec_grp_name, description='hello group',
687 rule_settings=[sec_grp_rule_settings])
689 self.security_groups.append(
690 neutron_utils.create_security_group(self.neutron, self.keystone,
692 free_rules = neutron_utils.get_rules_by_security_group(
693 self.neutron, self.security_groups[0])
694 for free_rule in free_rules:
695 self.security_group_rules.append(free_rule)
697 self.security_group_rules.append(
698 neutron_utils.create_security_group_rule(
699 self.neutron, sec_grp_settings.rule_settings[0]))
701 # Refresh object so it is populated with the newly added rule
702 security_group = neutron_utils.get_security_group(
703 self.neutron, sec_grp_settings.name)
705 rules = neutron_utils.get_rules_by_security_group(self.neutron,
709 validation_utils.objects_equivalent(
710 self.security_group_rules, rules))
712 self.assertTrue(sec_grp_settings.name, security_group.name)
714 sec_grp_get = neutron_utils.get_security_group(self.neutron,
715 sec_grp_settings.name)
716 self.assertIsNotNone(sec_grp_get)
717 self.assertEqual(security_group, sec_grp_get)
719 def test_get_sec_grp_by_id(self):
721 Tests the neutron_utils.create_security_group() function
724 self.security_groups.append(neutron_utils.create_security_group(
725 self.neutron, self.keystone,
726 SecurityGroupSettings(name=self.sec_grp_name + '-1',
727 description='hello group')))
728 self.security_groups.append(neutron_utils.create_security_group(
729 self.neutron, self.keystone,
730 SecurityGroupSettings(name=self.sec_grp_name + '-2',
731 description='hello group')))
733 sec_grp_1b = neutron_utils.get_security_group_by_id(
734 self.neutron, self.security_groups[0].id)
735 sec_grp_2b = neutron_utils.get_security_group_by_id(
736 self.neutron, self.security_groups[1].id)
738 self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
739 self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
742 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
744 Test basic nova keypair functionality
749 Instantiates the CreateImage object that is responsible for downloading
750 and creating an OS image file within OpenStack
752 self.neutron = neutron_utils.neutron_client(self.os_creds)
753 self.floating_ip = None
757 Cleans the image and downloaded image file
760 neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
762 def test_floating_ips(self):
764 Tests the creation of a floating IP
767 initial_fips = neutron_utils.get_floating_ips(self.neutron)
769 self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
771 all_fips = neutron_utils.get_floating_ips(self.neutron)
772 self.assertEqual(len(initial_fips) + 1, len(all_fips))
773 returned = neutron_utils.get_floating_ip(self.neutron,
775 self.assertEqual(self.floating_ip.id, returned.id)
776 self.assertEqual(self.floating_ip.ip, returned.ip)
784 def validate_network(neutron, name, exists):
786 Returns true if a network for a given name DOES NOT exist if the exists
787 parameter is false conversely true. Returns false if a network for a given
788 name DOES exist if the exists parameter is true conversely false.
789 :param neutron: The neutron client
790 :param name: The expected network name
791 :param exists: Whether or not the network name should exist or not
794 network = neutron_utils.get_network(neutron, network_name=name)
795 if exists and network:
797 if not exists and not network:
802 def validate_subnet(neutron, name, cidr, exists):
804 Returns true if a subnet for a given name DOES NOT exist if the exists
805 parameter is false conversely true. Returns false if a subnet for a given
806 name DOES exist if the exists parameter is true conversely false.
807 :param neutron: The neutron client
808 :param name: The expected subnet name
809 :param cidr: The expected CIDR value
810 :param exists: Whether or not the network name should exist or not
813 subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
814 if exists and subnet and subnet.name == name:
815 return subnet.cidr == cidr
816 if not exists and not subnet:
821 def validate_router(neutron, name, exists):
823 Returns true if a router for a given name DOES NOT exist if the exists
824 parameter is false conversely true. Returns false if a router for a given
825 name DOES exist if the exists parameter is true conversely false.
826 :param neutron: The neutron client
827 :param name: The expected router name
828 :param exists: Whether or not the network name should exist or not
831 router = neutron_utils.get_router_by_name(neutron, name)
832 if exists and router:
837 def validate_interface_router(interface_router, router, subnet):
839 Returns true if the router ID & subnet ID have been properly included into
840 the interface router object
841 :param interface_router: the SNAPS-OO InterfaceRouter domain object
842 :param router: to validate against the interface_router
843 :param subnet: to validate against the interface_router
844 :return: True if both IDs match else False
846 subnet_id = interface_router.subnet_id
847 router_id = interface_router.port_id
849 return subnet.id == subnet_id and router.id == router_id
852 def validate_port(neutron, port_obj, this_port_name):
854 Returns true if a port for a given name DOES NOT exist if the exists
855 parameter is false conversely true. Returns false if a port for a given
856 name DOES exist if the exists parameter is true conversely false.
857 :param neutron: The neutron client
858 :param port_obj: The port object to lookup
859 :param this_port_name: The expected router name
862 os_ports = neutron.list_ports()
863 for os_port, os_port_insts in os_ports.items():
864 for os_inst in os_port_insts:
865 if os_inst['id'] == port_obj.id:
866 return os_inst['name'] == this_port_name