1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from snaps.openstack.utils import keystone_utils
18 from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction
19 from snaps.openstack.tests import openstack_tests
20 from snaps.openstack.utils import neutron_utils
21 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, PortSettings
22 from snaps.openstack import create_router
23 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
24 from snaps.openstack.tests import validation_utils
26 __author__ = 'spisarski'
32 class NeutronSmokeTests(OSComponentTestCase):
34 Tests to ensure that the neutron client can communicate with the cloud
37 def test_neutron_connect_success(self):
39 Tests to ensure that the proper credentials can connect.
41 neutron = neutron_utils.neutron_client(self.os_creds)
43 networks = neutron.list_networks()
46 networks = networks.get('networks')
47 for network in networks:
48 if network.get('name') == self.ext_net_name:
50 self.assertTrue(found)
52 def test_neutron_connect_fail(self):
54 Tests to ensure that the improper credentials cannot connect.
56 from snaps.openstack.os_credentials import OSCreds
58 with self.assertRaises(Exception):
59 neutron = neutron_utils.neutron_client(
60 OSCreds(username='user', password='pass', auth_url='url', project_name='project'))
61 neutron.list_networks()
63 def test_retrieve_ext_network_name(self):
65 Tests the neutron_utils.get_external_network_names to ensure the configured self.ext_net_name is contained
66 within the returned list
69 neutron = neutron_utils.neutron_client(self.os_creds)
70 ext_networks = neutron_utils.get_external_networks(neutron)
72 for network in ext_networks:
73 if network['network']['name'] == self.ext_net_name:
76 self.assertTrue(found)
79 class NeutronUtilsNetworkTests(OSComponentTestCase):
81 Test for creating networks via neutron_utils.py
85 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
86 self.port_name = str(guid) + '-port'
87 self.neutron = neutron_utils.neutron_client(self.os_creds)
89 self.net_config = openstack_tests.get_pub_net_config(net_name=guid + '-pub-net')
93 Cleans the remote OpenStack objects
96 neutron_utils.delete_network(self.neutron, self.network)
97 validate_network(self.neutron, self.network['network']['name'], False)
99 def test_create_network(self):
101 Tests the neutron_utils.create_neutron_net() function
103 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
104 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
105 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
107 def test_create_network_empty_name(self):
109 Tests the neutron_utils.create_neutron_net() function with an empty network name
111 with self.assertRaises(Exception):
112 self.network = neutron_utils.create_network(self.neutron, self.os_creds,
113 network_settings=NetworkSettings(name=''))
115 def test_create_network_null_name(self):
117 Tests the neutron_utils.create_neutron_net() function when the network name is None
119 with self.assertRaises(Exception):
120 self.network = neutron_utils.create_network(self.neutron, self.os_creds,
121 network_settings=NetworkSettings())
124 class NeutronUtilsSubnetTests(OSComponentTestCase):
126 Test for creating networks with subnets via neutron_utils.py
130 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
131 self.port_name = str(guid) + '-port'
132 self.neutron = neutron_utils.neutron_client(self.os_creds)
135 self.net_config = openstack_tests.get_pub_net_config(
136 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', external_net=self.ext_net_name)
140 Cleans the remote OpenStack objects
143 neutron_utils.delete_subnet(self.neutron, self.subnet)
144 validate_subnet(self.neutron, self.subnet.get('name'),
145 self.net_config.network_settings.subnet_settings[0].cidr, False)
148 neutron_utils.delete_network(self.neutron, self.network)
149 validate_network(self.neutron, self.network['network']['name'], False)
151 def test_create_subnet(self):
153 Tests the neutron_utils.create_neutron_net() function
155 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
156 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
157 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
159 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
160 self.os_creds, network=self.network)
161 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
162 self.net_config.network_settings.subnet_settings[0].cidr, True)
164 def test_create_subnet_null_name(self):
166 Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet name is None
168 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
169 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
170 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
172 with self.assertRaises(Exception):
173 SubnetSettings(cidr=self.net_config.subnet_cidr)
175 def test_create_subnet_empty_name(self):
177 Tests the neutron_utils.create_neutron_net() function with an empty name
179 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
180 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
181 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
183 neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
184 self.os_creds, network=self.network)
185 validate_subnet(self.neutron, '', self.net_config.network_settings.subnet_settings[0].cidr, True)
187 def test_create_subnet_null_cidr(self):
189 Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is None
191 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
192 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
193 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
195 with self.assertRaises(Exception):
196 sub_sets = SubnetSettings(cidr=None, name=self.net_config.subnet_name)
197 neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
199 def test_create_subnet_empty_cidr(self):
201 Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is empty
203 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
204 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
205 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
207 with self.assertRaises(Exception):
208 sub_sets = SubnetSettings(cidr='', name=self.net_config.subnet_name)
209 neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
212 class NeutronUtilsRouterTests(OSComponentTestCase):
214 Test for creating routers via neutron_utils.py
218 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
219 self.port_name = str(guid) + '-port'
220 self.neutron = neutron_utils.neutron_client(self.os_creds)
225 self.interface_router = None
226 self.net_config = openstack_tests.get_pub_net_config(
227 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
228 router_name=guid + '-pub-router', external_net=self.ext_net_name)
232 Cleans the remote OpenStack objects
234 if self.interface_router:
235 neutron_utils.remove_interface_router(self.neutron, self.router, self.subnet)
238 neutron_utils.delete_router(self.neutron, self.router)
239 validate_router(self.neutron, self.router.get('name'), False)
242 neutron_utils.delete_port(self.neutron, self.port)
245 neutron_utils.delete_subnet(self.neutron, self.subnet)
246 validate_subnet(self.neutron, self.subnet.get('name'),
247 self.net_config.network_settings.subnet_settings[0].cidr, False)
250 neutron_utils.delete_network(self.neutron, self.network)
251 validate_network(self.neutron, self.network['network']['name'], False)
253 def test_create_router_simple(self):
255 Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
257 self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
258 validate_router(self.neutron, self.net_config.router_settings.name, True)
260 def test_create_router_with_public_interface(self):
262 Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
264 self.net_config = openstack_tests.OSNetworkConfig(
265 self.net_config.network_settings.name,
266 self.net_config.network_settings.subnet_settings[0].name,
267 self.net_config.network_settings.subnet_settings[0].cidr, self.net_config.router_settings.name,
269 self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
270 validate_router(self.neutron, self.net_config.router_settings.name, True)
271 # TODO - Add validation that the router gatway has been set
273 def test_create_router_empty_name(self):
275 Tests the neutron_utils.create_neutron_net() function
277 with self.assertRaises(Exception):
278 this_router_settings = create_router.RouterSettings(name='')
279 self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
281 def test_create_router_null_name(self):
283 Tests the neutron_utils.create_neutron_subnet() function when the subnet CIDR value is None
285 with self.assertRaises(Exception):
286 this_router_settings = create_router.RouterSettings()
287 self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
288 validate_router(self.neutron, None, True)
290 def test_add_interface_router(self):
292 Tests the neutron_utils.add_interface_router() function
294 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
295 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
296 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
298 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
299 self.os_creds, self.network)
300 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
301 self.net_config.network_settings.subnet_settings[0].cidr, True)
303 self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
304 validate_router(self.neutron, self.net_config.router_settings.name, True)
306 self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
307 validate_interface_router(self.interface_router, self.router, self.subnet)
309 def test_add_interface_router_null_router(self):
311 Tests the neutron_utils.add_interface_router() function for an Exception when the router value is None
313 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
314 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
315 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
317 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
318 self.os_creds, self.network)
319 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
320 self.net_config.network_settings.subnet_settings[0].cidr, True)
322 with self.assertRaises(Exception):
323 self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
325 def test_add_interface_router_null_subnet(self):
327 Tests the neutron_utils.add_interface_router() function for an Exception when the subnet value is None
329 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
330 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
331 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
333 self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
334 validate_router(self.neutron, self.net_config.router_settings.name, True)
336 with self.assertRaises(Exception):
337 self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
339 def test_create_port(self):
341 Tests the neutron_utils.create_port() function
343 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
344 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
345 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
347 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
348 self.os_creds, self.network)
349 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
350 self.net_config.network_settings.subnet_settings[0].cidr, True)
352 self.port = neutron_utils.create_port(
353 self.neutron, self.os_creds, PortSettings(
355 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}],
356 network_name=self.net_config.network_settings.name))
357 validate_port(self.neutron, self.port, self.port_name)
359 def test_create_port_empty_name(self):
361 Tests the neutron_utils.create_port() function
363 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
364 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
365 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
367 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
368 self.os_creds, self.network)
369 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
370 self.net_config.network_settings.subnet_settings[0].cidr, True)
372 self.port = neutron_utils.create_port(
373 self.neutron, self.os_creds, PortSettings(
374 name=self.port_name, network_name=self.net_config.network_settings.name,
375 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}]))
376 validate_port(self.neutron, self.port, self.port_name)
378 def test_create_port_null_name(self):
380 Tests the neutron_utils.create_port() function for an Exception when the port name value is None
382 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
383 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
384 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
386 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
387 self.os_creds, self.network)
388 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
389 self.net_config.network_settings.subnet_settings[0].cidr, True)
391 with self.assertRaises(Exception):
392 self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
393 network_name=self.net_config.network_settings.name,
394 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
396 def test_create_port_null_network_object(self):
398 Tests the neutron_utils.create_port() function for an Exception when the network object is None
400 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
401 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
402 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
404 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
405 self.os_creds, self.network)
406 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
407 self.net_config.network_settings.subnet_settings[0].cidr, True)
409 with self.assertRaises(Exception):
410 self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
411 self.neutron, self.port_name, self.net_config.network_settings.name,
412 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
414 def test_create_port_null_ip(self):
416 Tests the neutron_utils.create_port() function for an Exception when the IP value is None
418 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
419 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
420 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
422 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
423 self.os_creds, self.network)
424 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
425 self.net_config.network_settings.subnet_settings[0].cidr, True)
427 with self.assertRaises(Exception):
428 self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
429 name=self.port_name, network_name=self.net_config.network_settings.name,
430 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': None}]))
432 def test_create_port_invalid_ip(self):
434 Tests the neutron_utils.create_port() function for an Exception when the IP value is None
436 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
437 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
438 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
440 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
441 self.os_creds, self.network)
442 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
443 self.net_config.network_settings.subnet_settings[0].cidr, True)
445 with self.assertRaises(Exception):
446 self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
447 name=self.port_name, network_name=self.net_config.network_settings.name,
448 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': 'foo'}]))
450 def test_create_port_invalid_ip_to_subnet(self):
452 Tests the neutron_utils.create_port() function for an Exception when the IP value is None
454 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
455 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
456 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
458 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
459 self.os_creds, self.network)
460 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
461 self.net_config.network_settings.subnet_settings[0].cidr, True)
463 with self.assertRaises(Exception):
464 self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
465 name=self.port_name, network_name=self.net_config.network_settings.name,
466 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name,
467 'ip': '10.197.123.100'}]))
470 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
472 Test for creating security groups via neutron_utils.py
476 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
477 self.sec_grp_name = guid + 'name'
479 self.security_groups = list()
480 self.security_group_rules = list()
481 self.neutron = neutron_utils.neutron_client(self.os_creds)
482 self.keystone = keystone_utils.keystone_client(self.os_creds)
486 Cleans the remote OpenStack objects
488 for rule in self.security_group_rules:
489 neutron_utils.delete_security_group_rule(self.neutron, rule)
491 for security_group in self.security_groups:
493 neutron_utils.delete_security_group(self.neutron, security_group)
497 def test_create_delete_simple_sec_grp(self):
499 Tests the neutron_utils.create_security_group() function
501 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
502 security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
504 self.assertTrue(sec_grp_settings.name, security_group['security_group']['name'])
506 sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
507 self.assertIsNotNone(sec_grp_get)
508 self.assertTrue(validation_utils.objects_equivalent(
509 security_group['security_group'], sec_grp_get['security_group']))
511 neutron_utils.delete_security_group(self.neutron, security_group)
512 sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
513 self.assertIsNone(sec_grp_get)
515 def test_create_sec_grp_no_name(self):
517 Tests the SecurityGroupSettings constructor and neutron_utils.create_security_group() function to ensure
518 that attempting to create a security group without a name will raise an exception
520 with self.assertRaises(Exception):
521 sec_grp_settings = SecurityGroupSettings()
522 self.security_groups.append(
523 neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings))
525 def test_create_sec_grp_no_rules(self):
527 Tests the neutron_utils.create_security_group() function
529 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
530 self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings))
532 self.assertTrue(sec_grp_settings.name, self.security_groups[0]['security_group']['name'])
533 self.assertTrue(sec_grp_settings.description, self.security_groups[0]['security_group']['description'])
535 sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
536 self.assertIsNotNone(sec_grp_get)
537 self.assertTrue(validation_utils.objects_equivalent(
538 self.security_groups[0]['security_group'], sec_grp_get['security_group']))
540 def test_create_sec_grp_one_rule(self):
542 Tests the neutron_utils.create_security_group() function
545 sec_grp_rule_settings = SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
546 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
547 rule_settings=[sec_grp_rule_settings])
549 self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings))
550 free_rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_groups[0])
551 for free_rule in free_rules:
552 self.security_group_rules.append(free_rule)
554 self.security_group_rules.append(
555 neutron_utils.create_security_group_rule(self.neutron, sec_grp_settings.rule_settings[0]))
557 # Refresh object so it is populated with the newly added rule
558 security_group = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
560 rules = neutron_utils.get_rules_by_security_group(self.neutron, security_group)
562 self.assertTrue(validation_utils.objects_equivalent(self.security_group_rules, rules))
564 self.assertTrue(sec_grp_settings.name, security_group['security_group']['name'])
565 self.assertTrue(sec_grp_settings.description, security_group['security_group']['description'])
567 sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
568 self.assertIsNotNone(sec_grp_get)
569 self.assertTrue(validation_utils.objects_equivalent(
570 security_group['security_group'], sec_grp_get['security_group']))
572 def test_get_sec_grp_by_id(self):
574 Tests the neutron_utils.create_security_group() function
577 self.security_groups.append(neutron_utils.create_security_group(
578 self.neutron, self.keystone,
579 SecurityGroupSettings(name=self.sec_grp_name + '-1', description='hello group')))
580 self.security_groups.append(neutron_utils.create_security_group(
581 self.neutron, self.keystone,
582 SecurityGroupSettings(name=self.sec_grp_name + '-2', description='hello group')))
584 sec_grp_1b = neutron_utils.get_security_group_by_id(
585 self.neutron, self.security_groups[0]['security_group']['id'])
586 sec_grp_2b = neutron_utils.get_security_group_by_id(
587 self.neutron, self.security_groups[1]['security_group']['id'])
589 self.assertEqual(self.security_groups[0]['security_group']['id'], sec_grp_1b['security_group']['id'])
590 self.assertEqual(self.security_groups[1]['security_group']['id'], sec_grp_2b['security_group']['id'])
598 def validate_network(neutron, name, exists):
600 Returns true if a network for a given name DOES NOT exist if the exists parameter is false conversely true.
601 Returns false if a network for a given name DOES exist if the exists parameter is true conversely false.
602 :param neutron: The neutron client
603 :param name: The expected network name
604 :param exists: Whether or not the network name should exist or not
607 network = neutron_utils.get_network(neutron, name)
608 if exists and network:
610 if not exists and not network:
615 def validate_subnet(neutron, name, cidr, exists):
617 Returns true if a subnet for a given name DOES NOT exist if the exists parameter is false conversely true.
618 Returns false if a subnet for a given name DOES exist if the exists parameter is true conversely false.
619 :param neutron: The neutron client
620 :param name: The expected subnet name
621 :param cidr: The expected CIDR value
622 :param exists: Whether or not the network name should exist or not
625 subnet = neutron_utils.get_subnet_by_name(neutron, name)
626 if exists and subnet:
627 return subnet.get('cidr') == cidr
628 if not exists and not subnet:
633 def validate_router(neutron, name, exists):
635 Returns true if a router for a given name DOES NOT exist if the exists parameter is false conversely true.
636 Returns false if a router for a given name DOES exist if the exists parameter is true conversely false.
637 :param neutron: The neutron client
638 :param name: The expected router name
639 :param exists: Whether or not the network name should exist or not
642 router = neutron_utils.get_router_by_name(neutron, name)
643 if exists and router:
648 def validate_interface_router(interface_router, router, subnet):
650 Returns true if the router ID & subnet ID have been properly included into the interface router object
651 :param interface_router: the object to validate
652 :param router: to validate against the interface_router
653 :param subnet: to validate against the interface_router
654 :return: True if both IDs match else False
656 subnet_id = interface_router.get('subnet_id')
657 router_id = interface_router.get('port_id')
659 return subnet.get('id') == subnet_id and router.get('id') == router_id
662 def validate_port(neutron, port_obj, this_port_name):
664 Returns true if a port for a given name DOES NOT exist if the exists parameter is false conversely true.
665 Returns false if a port for a given name DOES exist if the exists parameter is true conversely false.
666 :param neutron: The neutron client
667 :param port_obj: The port object to lookup
668 :param this_port_name: The expected router name
671 ports = neutron.list_ports()
672 for port, port_insts in ports.items():
673 for inst in port_insts:
674 if inst['id'] == port_obj['port']['id']:
675 return inst['name'] == this_port_name