1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from snaps.openstack import create_router
18 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
20 from snaps.openstack.create_security_group import SecurityGroupSettings, \
21 SecurityGroupRuleSettings, Direction
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
29 __author__ = 'spisarski'
35 class NeutronSmokeTests(OSComponentTestCase):
37 Tests to ensure that the neutron client can communicate with the cloud
40 def test_neutron_connect_success(self):
42 Tests to ensure that the proper credentials can connect.
44 neutron = neutron_utils.neutron_client(self.os_creds)
46 networks = neutron.list_networks()
49 networks = networks.get('networks')
50 for network in networks:
51 if network.get('name') == self.ext_net_name:
53 self.assertTrue(found)
55 def test_neutron_connect_fail(self):
57 Tests to ensure that the improper credentials cannot connect.
59 from snaps.openstack.os_credentials import OSCreds
61 with self.assertRaises(Exception):
62 neutron = neutron_utils.neutron_client(
63 OSCreds(username='user', password='pass', auth_url='url',
64 project_name='project'))
65 neutron.list_networks()
67 def test_retrieve_ext_network_name(self):
69 Tests the neutron_utils.get_external_network_names to ensure the
70 configured self.ext_net_name is contained within the returned list
73 neutron = neutron_utils.neutron_client(self.os_creds)
74 ext_networks = neutron_utils.get_external_networks(neutron)
76 for network in ext_networks:
77 if network.name == self.ext_net_name:
80 self.assertTrue(found)
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
85 Test for creating networks via neutron_utils.py
89 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90 self.port_name = str(guid) + '-port'
91 self.neutron = neutron_utils.neutron_client(self.os_creds)
93 self.net_config = openstack_tests.get_pub_net_config(
94 net_name=guid + '-pub-net')
98 Cleans the remote OpenStack objects
101 neutron_utils.delete_network(self.neutron, self.network)
103 def test_create_network(self):
105 Tests the neutron_utils.create_neutron_net() function
107 self.network = neutron_utils.create_network(
108 self.neutron, self.os_creds, self.net_config.network_settings)
109 self.assertEqual(self.net_config.network_settings.name,
111 self.assertTrue(validate_network(
112 self.neutron, self.net_config.network_settings.name, True))
114 def test_create_network_empty_name(self):
116 Tests the neutron_utils.create_neutron_net() function with an empty
119 with self.assertRaises(Exception):
120 self.network = neutron_utils.create_network(
121 self.neutron, self.os_creds,
122 network_settings=NetworkSettings(name=''))
124 def test_create_network_null_name(self):
126 Tests the neutron_utils.create_neutron_net() function when the network
129 with self.assertRaises(Exception):
130 self.network = neutron_utils.create_network(
131 self.neutron, self.os_creds,
132 network_settings=NetworkSettings())
135 class NeutronUtilsSubnetTests(OSComponentTestCase):
137 Test for creating networks with subnets via neutron_utils.py
141 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
142 self.port_name = str(guid) + '-port'
143 self.neutron = neutron_utils.neutron_client(self.os_creds)
146 self.net_config = openstack_tests.get_pub_net_config(
147 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
148 external_net=self.ext_net_name)
152 Cleans the remote OpenStack objects
156 neutron_utils.delete_subnet(self.neutron, self.subnet)
161 neutron_utils.delete_network(self.neutron, self.network)
165 def test_create_subnet(self):
167 Tests the neutron_utils.create_neutron_net() function
169 self.network = neutron_utils.create_network(
170 self.neutron, self.os_creds, self.net_config.network_settings)
171 self.assertEqual(self.net_config.network_settings.name,
173 self.assertTrue(validate_network(
174 self.neutron, self.net_config.network_settings.name, True))
176 subnet_setting = self.net_config.network_settings.subnet_settings[0]
177 self.subnet = neutron_utils.create_subnet(
178 self.neutron, subnet_setting, self.os_creds, network=self.network)
179 self.assertTrue(validate_subnet(
180 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
182 subnet_query1 = neutron_utils.get_subnet(
183 self.neutron, subnet_name=subnet_setting.name)
184 self.assertEqual(self.subnet, subnet_query1)
186 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
188 self.assertIsNotNone(subnet_query2)
189 self.assertEqual(1, len(subnet_query2))
190 self.assertEqual(self.subnet, subnet_query2[0])
192 def test_create_subnet_null_name(self):
194 Tests the neutron_utils.create_neutron_subnet() function for an
195 Exception when the subnet name is None
197 self.network = neutron_utils.create_network(
198 self.neutron, self.os_creds, self.net_config.network_settings)
199 self.assertEqual(self.net_config.network_settings.name,
201 self.assertTrue(validate_network(
202 self.neutron, self.net_config.network_settings.name, True))
204 with self.assertRaises(Exception):
205 SubnetSettings(cidr=self.net_config.subnet_cidr)
207 def test_create_subnet_empty_name(self):
209 Tests the neutron_utils.create_neutron_net() function with an empty
212 self.network = neutron_utils.create_network(
213 self.neutron, self.os_creds, self.net_config.network_settings)
214 self.assertEqual(self.net_config.network_settings.name,
216 self.assertTrue(validate_network(
217 self.neutron, self.net_config.network_settings.name, True))
219 subnet_setting = self.net_config.network_settings.subnet_settings[0]
220 self.subnet = neutron_utils.create_subnet(
221 self.neutron, subnet_setting, self.os_creds, network=self.network)
222 self.assertTrue(validate_subnet(
223 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
224 self.assertFalse(validate_subnet(
225 self.neutron, '', subnet_setting.cidr, True))
227 subnet_query1 = neutron_utils.get_subnet(
228 self.neutron, subnet_name=subnet_setting.name)
229 self.assertEqual(self.subnet, subnet_query1)
231 subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
233 self.assertIsNotNone(subnet_query2)
234 self.assertEqual(1, len(subnet_query2))
235 self.assertEqual(self.subnet, subnet_query2[0])
237 def test_create_subnet_null_cidr(self):
239 Tests the neutron_utils.create_neutron_subnet() function for an
240 Exception when the subnet CIDR value is None
242 self.network = neutron_utils.create_network(
243 self.neutron, self.os_creds, self.net_config.network_settings)
244 self.assertEqual(self.net_config.network_settings.name,
246 self.assertTrue(validate_network(
247 self.neutron, self.net_config.network_settings.name, True))
249 with self.assertRaises(Exception):
250 sub_sets = SubnetSettings(
251 cidr=None, name=self.net_config.subnet_name)
252 neutron_utils.create_subnet(
253 self.neutron, sub_sets, self.os_creds, network=self.network)
255 def test_create_subnet_empty_cidr(self):
257 Tests the neutron_utils.create_neutron_subnet() function for an
258 Exception when the subnet CIDR value is empty
260 self.network = neutron_utils.create_network(
261 self.neutron, self.os_creds, self.net_config.network_settings)
262 self.assertEqual(self.net_config.network_settings.name,
264 self.assertTrue(validate_network(
265 self.neutron, self.net_config.network_settings.name, True))
267 with self.assertRaises(Exception):
268 sub_sets = SubnetSettings(
269 cidr='', name=self.net_config.subnet_name)
270 neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
271 network=self.network)
274 class NeutronUtilsRouterTests(OSComponentTestCase):
276 Test for creating routers via neutron_utils.py
280 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
281 self.port_name = str(guid) + '-port'
282 self.neutron = neutron_utils.neutron_client(self.os_creds)
287 self.interface_router = None
288 self.net_config = openstack_tests.get_pub_net_config(
289 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
290 router_name=guid + '-pub-router', external_net=self.ext_net_name)
294 Cleans the remote OpenStack objects
296 if self.interface_router:
297 neutron_utils.remove_interface_router(self.neutron, self.router,
302 neutron_utils.delete_router(self.neutron, self.router)
303 validate_router(self.neutron, self.router.name, False)
309 neutron_utils.delete_port(self.neutron, self.port)
315 neutron_utils.delete_subnet(self.neutron, self.subnet)
321 neutron_utils.delete_network(self.neutron, self.network)
325 def test_create_router_simple(self):
327 Tests the neutron_utils.create_neutron_net() function when an external
330 self.router = neutron_utils.create_router(
331 self.neutron, self.os_creds, self.net_config.router_settings)
332 validate_router(self.neutron, self.net_config.router_settings.name,
335 def test_create_router_with_public_interface(self):
337 Tests the neutron_utils.create_neutron_net() function when an external
340 subnet_setting = self.net_config.network_settings.subnet_settings[0]
341 self.net_config = openstack_tests.OSNetworkConfig(
342 self.net_config.network_settings.name,
345 self.net_config.router_settings.name,
347 self.router = neutron_utils.create_router(
348 self.neutron, self.os_creds, self.net_config.router_settings)
349 validate_router(self.neutron, self.net_config.router_settings.name,
352 ext_net = neutron_utils.get_network(
353 self.neutron, network_name=self.ext_net_name)
355 self.router.external_gateway_info['network_id'], ext_net.id)
357 def test_create_router_empty_name(self):
359 Tests the neutron_utils.create_neutron_net() function
361 with self.assertRaises(Exception):
362 this_router_settings = create_router.RouterSettings(name='')
363 self.router = neutron_utils.create_router(self.neutron,
365 this_router_settings)
367 def test_create_router_null_name(self):
369 Tests the neutron_utils.create_neutron_subnet() function when the
370 subnet CIDR value is None
372 with self.assertRaises(Exception):
373 this_router_settings = create_router.RouterSettings()
374 self.router = neutron_utils.create_router(self.neutron,
376 this_router_settings)
377 validate_router(self.neutron, None, True)
379 def test_add_interface_router(self):
381 Tests the neutron_utils.add_interface_router() function
383 self.network = neutron_utils.create_network(
384 self.neutron, self.os_creds, self.net_config.network_settings)
385 self.assertEqual(self.net_config.network_settings.name,
387 self.assertTrue(validate_network(
388 self.neutron, self.net_config.network_settings.name, True))
390 subnet_setting = self.net_config.network_settings.subnet_settings[0]
391 self.subnet = neutron_utils.create_subnet(
392 self.neutron, subnet_setting,
393 self.os_creds, self.network)
394 self.assertTrue(validate_subnet(
395 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
397 self.router = neutron_utils.create_router(
398 self.neutron, self.os_creds, self.net_config.router_settings)
399 validate_router(self.neutron, self.net_config.router_settings.name,
402 self.interface_router = neutron_utils.add_interface_router(
403 self.neutron, self.router, self.subnet)
404 validate_interface_router(self.interface_router, self.router,
407 def test_add_interface_router_null_router(self):
409 Tests the neutron_utils.add_interface_router() function for an
410 Exception when the router value is None
412 self.network = neutron_utils.create_network(
413 self.neutron, self.os_creds, self.net_config.network_settings)
414 self.assertEqual(self.net_config.network_settings.name,
416 self.assertTrue(validate_network(
417 self.neutron, self.net_config.network_settings.name, True))
419 subnet_setting = self.net_config.network_settings.subnet_settings[0]
420 self.subnet = neutron_utils.create_subnet(
421 self.neutron, subnet_setting,
422 self.os_creds, self.network)
423 self.assertTrue(validate_subnet(
424 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
426 with self.assertRaises(NeutronException):
427 self.interface_router = neutron_utils.add_interface_router(
428 self.neutron, self.router, self.subnet)
430 def test_add_interface_router_null_subnet(self):
432 Tests the neutron_utils.add_interface_router() function for an
433 Exception when the subnet value is None
435 self.network = neutron_utils.create_network(
436 self.neutron, self.os_creds, self.net_config.network_settings)
437 self.assertEqual(self.net_config.network_settings.name,
439 self.assertTrue(validate_network(
440 self.neutron, self.net_config.network_settings.name, True))
442 self.router = neutron_utils.create_router(
443 self.neutron, self.os_creds, self.net_config.router_settings)
444 validate_router(self.neutron, self.net_config.router_settings.name,
447 with self.assertRaises(NeutronException):
448 self.interface_router = neutron_utils.add_interface_router(
449 self.neutron, self.router, self.subnet)
451 def test_create_port(self):
453 Tests the neutron_utils.create_port() function
455 self.network = neutron_utils.create_network(
456 self.neutron, self.os_creds, self.net_config.network_settings)
457 self.assertEqual(self.net_config.network_settings.name,
459 self.assertTrue(validate_network(
460 self.neutron, self.net_config.network_settings.name, True))
462 subnet_setting = self.net_config.network_settings.subnet_settings[0]
463 self.subnet = neutron_utils.create_subnet(
464 self.neutron, subnet_setting, self.os_creds, self.network)
465 self.assertTrue(validate_subnet(
466 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
468 self.port = neutron_utils.create_port(
469 self.neutron, self.os_creds, PortSettings(
472 'subnet_name': subnet_setting.name,
474 network_name=self.net_config.network_settings.name))
475 validate_port(self.neutron, self.port, self.port_name)
477 def test_create_port_empty_name(self):
479 Tests the neutron_utils.create_port() function
481 self.network = neutron_utils.create_network(
482 self.neutron, self.os_creds, self.net_config.network_settings)
483 self.assertEqual(self.net_config.network_settings.name,
485 self.assertTrue(validate_network(
486 self.neutron, self.net_config.network_settings.name, True))
488 subnet_setting = self.net_config.network_settings.subnet_settings[0]
489 self.subnet = neutron_utils.create_subnet(
490 self.neutron, subnet_setting, self.os_creds, self.network)
491 self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
492 subnet_setting.cidr, True))
494 self.port = neutron_utils.create_port(
495 self.neutron, self.os_creds, PortSettings(
497 network_name=self.net_config.network_settings.name,
499 'subnet_name': subnet_setting.name,
501 validate_port(self.neutron, self.port, self.port_name)
503 def test_create_port_null_name(self):
505 Tests the neutron_utils.create_port() when the port name value is None
507 self.network = neutron_utils.create_network(
508 self.neutron, self.os_creds, self.net_config.network_settings)
509 self.assertEqual(self.net_config.network_settings.name,
511 self.assertTrue(validate_network(
512 self.neutron, self.net_config.network_settings.name, True))
514 subnet_setting = self.net_config.network_settings.subnet_settings[0]
515 self.subnet = neutron_utils.create_subnet(
516 self.neutron, subnet_setting,
517 self.os_creds, self.network)
518 self.assertTrue(validate_subnet(
519 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
521 self.port = neutron_utils.create_port(
522 self.neutron, self.os_creds,
524 network_name=self.net_config.network_settings.name,
526 'subnet_name': subnet_setting.name,
529 port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
530 self.assertEqual(self.port, port)
532 def test_create_port_null_network_object(self):
534 Tests the neutron_utils.create_port() function for an Exception when
535 the network object is None
537 with self.assertRaises(Exception):
538 self.port = neutron_utils.create_port(
539 self.neutron, self.os_creds,
542 network_name=self.net_config.network_settings.name,
545 self.net_config.network_settings.subnet_settings[
549 def test_create_port_null_ip(self):
551 Tests the neutron_utils.create_port() function for an Exception when
554 self.network = neutron_utils.create_network(
555 self.neutron, self.os_creds, self.net_config.network_settings)
556 self.assertEqual(self.net_config.network_settings.name,
558 self.assertTrue(validate_network(
559 self.neutron, self.net_config.network_settings.name, True))
561 subnet_setting = self.net_config.network_settings.subnet_settings[0]
562 self.subnet = neutron_utils.create_subnet(
563 self.neutron, subnet_setting,
564 self.os_creds, self.network)
565 self.assertTrue(validate_subnet(
566 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
568 with self.assertRaises(Exception):
569 self.port = neutron_utils.create_port(
570 self.neutron, self.os_creds,
573 network_name=self.net_config.network_settings.name,
575 'subnet_name': subnet_setting.name,
578 def test_create_port_invalid_ip(self):
580 Tests the neutron_utils.create_port() function for an Exception when
583 self.network = neutron_utils.create_network(
584 self.neutron, self.os_creds, self.net_config.network_settings)
585 self.assertEqual(self.net_config.network_settings.name,
587 self.assertTrue(validate_network(
588 self.neutron, self.net_config.network_settings.name, True))
590 subnet_setting = self.net_config.network_settings.subnet_settings[0]
591 self.subnet = neutron_utils.create_subnet(
592 self.neutron, subnet_setting, self.os_creds, self.network)
593 self.assertTrue(validate_subnet(
594 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
596 with self.assertRaises(Exception):
597 self.port = neutron_utils.create_port(
598 self.neutron, self.os_creds,
601 network_name=self.net_config.network_settings.name,
603 'subnet_name': subnet_setting.name,
606 def test_create_port_invalid_ip_to_subnet(self):
608 Tests the neutron_utils.create_port() function for an Exception when
611 self.network = neutron_utils.create_network(
612 self.neutron, self.os_creds, self.net_config.network_settings)
613 self.assertEqual(self.net_config.network_settings.name,
615 self.assertTrue(validate_network(
616 self.neutron, self.net_config.network_settings.name, True))
618 subnet_setting = self.net_config.network_settings.subnet_settings[0]
619 self.subnet = neutron_utils.create_subnet(
620 self.neutron, subnet_setting, self.os_creds, self.network)
621 self.assertTrue(validate_subnet(
622 self.neutron, subnet_setting.name, subnet_setting.cidr, True))
624 with self.assertRaises(Exception):
625 self.port = neutron_utils.create_port(
626 self.neutron, self.os_creds,
629 network_name=self.net_config.network_settings.name,
631 'subnet_name': subnet_setting.name,
632 'ip': '10.197.123.100'}]))
635 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
637 Test for creating security groups via neutron_utils.py
641 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
642 self.sec_grp_name = guid + 'name'
644 self.security_groups = list()
645 self.security_group_rules = list()
646 self.neutron = neutron_utils.neutron_client(self.os_creds)
647 self.keystone = keystone_utils.keystone_client(self.os_creds)
651 Cleans the remote OpenStack objects
653 for rule in self.security_group_rules:
654 neutron_utils.delete_security_group_rule(self.neutron, rule)
656 for security_group in self.security_groups:
658 neutron_utils.delete_security_group(self.neutron,
663 def test_create_delete_simple_sec_grp(self):
665 Tests the neutron_utils.create_security_group() function
667 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
668 security_group = neutron_utils.create_security_group(self.neutron,
672 self.assertTrue(sec_grp_settings.name, security_group.name)
674 sec_grp_get = neutron_utils.get_security_group(
675 self.neutron, sec_grp_settings=sec_grp_settings)
676 self.assertIsNotNone(sec_grp_get)
677 self.assertTrue(validation_utils.objects_equivalent(
678 security_group, sec_grp_get))
680 neutron_utils.delete_security_group(self.neutron, security_group)
681 sec_grp_get = neutron_utils.get_security_group(
682 self.neutron, sec_grp_settings=sec_grp_settings)
683 self.assertIsNone(sec_grp_get)
685 def test_create_sec_grp_no_name(self):
687 Tests the SecurityGroupSettings constructor and
688 neutron_utils.create_security_group() function to ensure that
689 attempting to create a security group without a name will raise an
692 with self.assertRaises(Exception):
693 sec_grp_settings = SecurityGroupSettings()
694 self.security_groups.append(
695 neutron_utils.create_security_group(self.neutron,
699 def test_create_sec_grp_no_rules(self):
701 Tests the neutron_utils.create_security_group() function
703 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
704 description='hello group')
705 self.security_groups.append(
706 neutron_utils.create_security_group(self.neutron, self.keystone,
709 self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
710 self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
712 sec_grp_get = neutron_utils.get_security_group(
713 self.neutron, sec_grp_settings=sec_grp_settings)
714 self.assertIsNotNone(sec_grp_get)
715 self.assertEqual(self.security_groups[0], sec_grp_get)
717 def test_create_sec_grp_one_rule(self):
719 Tests the neutron_utils.create_security_group() function
722 sec_grp_rule_settings = SecurityGroupRuleSettings(
723 sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
724 sec_grp_settings = SecurityGroupSettings(
725 name=self.sec_grp_name, description='hello group',
726 rule_settings=[sec_grp_rule_settings])
728 self.security_groups.append(
729 neutron_utils.create_security_group(self.neutron, self.keystone,
731 free_rules = neutron_utils.get_rules_by_security_group(
732 self.neutron, self.security_groups[0])
733 for free_rule in free_rules:
734 self.security_group_rules.append(free_rule)
736 self.security_group_rules.append(
737 neutron_utils.create_security_group_rule(
738 self.neutron, sec_grp_settings.rule_settings[0]))
740 # Refresh object so it is populated with the newly added rule
741 security_group = neutron_utils.get_security_group(
742 self.neutron, sec_grp_settings=sec_grp_settings)
744 rules = neutron_utils.get_rules_by_security_group(self.neutron,
748 validation_utils.objects_equivalent(
749 self.security_group_rules, rules))
751 self.assertTrue(sec_grp_settings.name, security_group.name)
753 sec_grp_get = neutron_utils.get_security_group(
754 self.neutron, sec_grp_settings=sec_grp_settings)
755 self.assertIsNotNone(sec_grp_get)
756 self.assertEqual(security_group, sec_grp_get)
758 def test_get_sec_grp_by_id(self):
760 Tests the neutron_utils.create_security_group() function
763 self.security_groups.append(neutron_utils.create_security_group(
764 self.neutron, self.keystone,
765 SecurityGroupSettings(name=self.sec_grp_name + '-1',
766 description='hello group')))
767 self.security_groups.append(neutron_utils.create_security_group(
768 self.neutron, self.keystone,
769 SecurityGroupSettings(name=self.sec_grp_name + '-2',
770 description='hello group')))
772 sec_grp_1b = neutron_utils.get_security_group_by_id(
773 self.neutron, self.security_groups[0].id)
774 sec_grp_2b = neutron_utils.get_security_group_by_id(
775 self.neutron, self.security_groups[1].id)
777 self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
778 self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
781 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
783 Test basic nova keypair functionality
788 Instantiates the CreateImage object that is responsible for downloading
789 and creating an OS image file within OpenStack
791 self.neutron = neutron_utils.neutron_client(self.os_creds)
792 self.floating_ip = None
796 Cleans the image and downloaded image file
800 neutron_utils.delete_floating_ip(
801 self.neutron, self.floating_ip)
805 def test_floating_ips(self):
807 Tests the creation of a floating IP
810 initial_fips = neutron_utils.get_floating_ips(self.neutron)
812 self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
814 all_fips = neutron_utils.get_floating_ips(self.neutron)
815 self.assertEqual(len(initial_fips) + 1, len(all_fips))
816 returned = neutron_utils.get_floating_ip(self.neutron,
818 self.assertEqual(self.floating_ip.id, returned.id)
819 self.assertEqual(self.floating_ip.ip, returned.ip)
827 def validate_network(neutron, name, exists):
829 Returns true if a network for a given name DOES NOT exist if the exists
830 parameter is false conversely true. Returns false if a network for a given
831 name DOES exist if the exists parameter is true conversely false.
832 :param neutron: The neutron client
833 :param name: The expected network name
834 :param exists: Whether or not the network name should exist or not
837 network = neutron_utils.get_network(neutron, network_name=name)
838 if exists and network:
840 if not exists and not network:
845 def validate_subnet(neutron, name, cidr, exists):
847 Returns true if a subnet for a given name DOES NOT exist if the exists
848 parameter is false conversely true. Returns false if a subnet for a given
849 name DOES exist if the exists parameter is true conversely false.
850 :param neutron: The neutron client
851 :param name: The expected subnet name
852 :param cidr: The expected CIDR value
853 :param exists: Whether or not the network name should exist or not
856 subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
857 if exists and subnet and subnet.name == name:
858 return subnet.cidr == cidr
859 if not exists and not subnet:
864 def validate_router(neutron, name, exists):
866 Returns true if a router for a given name DOES NOT exist if the exists
867 parameter is false conversely true. Returns false if a router for a given
868 name DOES exist if the exists parameter is true conversely false.
869 :param neutron: The neutron client
870 :param name: The expected router name
871 :param exists: Whether or not the network name should exist or not
874 router = neutron_utils.get_router(neutron, router_name=name)
875 if exists and router:
880 def validate_interface_router(interface_router, router, subnet):
882 Returns true if the router ID & subnet ID have been properly included into
883 the interface router object
884 :param interface_router: the SNAPS-OO InterfaceRouter domain object
885 :param router: to validate against the interface_router
886 :param subnet: to validate against the interface_router
887 :return: True if both IDs match else False
889 subnet_id = interface_router.subnet_id
890 router_id = interface_router.port_id
892 return subnet.id == subnet_id and router.id == router_id
895 def validate_port(neutron, port_obj, this_port_name):
897 Returns true if a port for a given name DOES NOT exist if the exists
898 parameter is false conversely true. Returns false if a port for a given
899 name DOES exist if the exists parameter is true conversely false.
900 :param neutron: The neutron client
901 :param port_obj: The port object to lookup
902 :param this_port_name: The expected router name
905 os_ports = neutron.list_ports()
906 for os_port, os_port_insts in os_ports.items():
907 for os_inst in os_port_insts:
908 if os_inst['id'] == port_obj.id:
909 return os_inst['name'] == this_port_name