f6fc2bbd4c09f8a6544da239ef6dd0f1ea9fc7c3
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from snaps.openstack import create_router
18 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
19     PortSettings
20 from snaps.openstack.create_security_group import SecurityGroupSettings, \
21     SecurityGroupRuleSettings, Direction
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
28
29 __author__ = 'spisarski'
30
31 ip_1 = '10.55.1.100'
32 ip_2 = '10.55.1.200'
33
34
35 class NeutronSmokeTests(OSComponentTestCase):
36     """
37     Tests to ensure that the neutron client can communicate with the cloud
38     """
39
40     def test_neutron_connect_success(self):
41         """
42         Tests to ensure that the proper credentials can connect.
43         """
44         neutron = neutron_utils.neutron_client(self.os_creds)
45
46         networks = neutron.list_networks()
47
48         found = False
49         networks = networks.get('networks')
50         for network in networks:
51             if network.get('name') == self.ext_net_name:
52                 found = True
53         self.assertTrue(found)
54
55     def test_neutron_connect_fail(self):
56         """
57         Tests to ensure that the improper credentials cannot connect.
58         """
59         from snaps.openstack.os_credentials import OSCreds
60
61         with self.assertRaises(Exception):
62             neutron = neutron_utils.neutron_client(
63                 OSCreds(username='user', password='pass', auth_url='url',
64                         project_name='project'))
65             neutron.list_networks()
66
67     def test_retrieve_ext_network_name(self):
68         """
69         Tests the neutron_utils.get_external_network_names to ensure the
70         configured self.ext_net_name is contained within the returned list
71         :return:
72         """
73         neutron = neutron_utils.neutron_client(self.os_creds)
74         ext_networks = neutron_utils.get_external_networks(neutron)
75         found = False
76         for network in ext_networks:
77             if network.name == self.ext_net_name:
78                 found = True
79                 break
80         self.assertTrue(found)
81
82
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
84     """
85     Test for creating networks via neutron_utils.py
86     """
87
88     def setUp(self):
89         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90         self.port_name = str(guid) + '-port'
91         self.neutron = neutron_utils.neutron_client(self.os_creds)
92         self.network = None
93         self.net_config = openstack_tests.get_pub_net_config(
94             net_name=guid + '-pub-net')
95
96     def tearDown(self):
97         """
98         Cleans the remote OpenStack objects
99         """
100         if self.network:
101             neutron_utils.delete_network(self.neutron, self.network)
102
103     def test_create_network(self):
104         """
105         Tests the neutron_utils.create_neutron_net() function
106         """
107         self.network = neutron_utils.create_network(
108             self.neutron, self.os_creds, self.net_config.network_settings)
109         self.assertEqual(self.net_config.network_settings.name,
110                          self.network.name)
111         self.assertTrue(validate_network(
112             self.neutron, self.net_config.network_settings.name, True))
113
114     def test_create_network_empty_name(self):
115         """
116         Tests the neutron_utils.create_neutron_net() function with an empty
117         network name
118         """
119         with self.assertRaises(Exception):
120             self.network = neutron_utils.create_network(
121                 self.neutron, self.os_creds,
122                 network_settings=NetworkSettings(name=''))
123
124     def test_create_network_null_name(self):
125         """
126         Tests the neutron_utils.create_neutron_net() function when the network
127         name is None
128         """
129         with self.assertRaises(Exception):
130             self.network = neutron_utils.create_network(
131                 self.neutron, self.os_creds,
132                 network_settings=NetworkSettings())
133
134
135 class NeutronUtilsSubnetTests(OSComponentTestCase):
136     """
137     Test for creating networks with subnets via neutron_utils.py
138     """
139
140     def setUp(self):
141         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
142         self.port_name = str(guid) + '-port'
143         self.neutron = neutron_utils.neutron_client(self.os_creds)
144         self.network = None
145         self.subnet = None
146         self.net_config = openstack_tests.get_pub_net_config(
147             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
148             external_net=self.ext_net_name)
149
150     def tearDown(self):
151         """
152         Cleans the remote OpenStack objects
153         """
154         if self.subnet:
155             neutron_utils.delete_subnet(self.neutron, self.subnet)
156         if self.network:
157             neutron_utils.delete_network(self.neutron, self.network)
158
159     def test_create_subnet(self):
160         """
161         Tests the neutron_utils.create_neutron_net() function
162         """
163         self.network = neutron_utils.create_network(
164             self.neutron, self.os_creds, self.net_config.network_settings)
165         self.assertEqual(self.net_config.network_settings.name,
166                          self.network.name)
167         self.assertTrue(validate_network(
168             self.neutron, self.net_config.network_settings.name, True))
169
170         subnet_setting = self.net_config.network_settings.subnet_settings[0]
171         self.subnet = neutron_utils.create_subnet(
172             self.neutron, subnet_setting, self.os_creds, network=self.network)
173         self.assertTrue(validate_subnet(
174             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
175
176     def test_create_subnet_null_name(self):
177         """
178         Tests the neutron_utils.create_neutron_subnet() function for an
179         Exception when the subnet name is None
180         """
181         self.network = neutron_utils.create_network(
182             self.neutron, self.os_creds, self.net_config.network_settings)
183         self.assertEqual(self.net_config.network_settings.name,
184                          self.network.name)
185         self.assertTrue(validate_network(
186             self.neutron, self.net_config.network_settings.name, True))
187
188         with self.assertRaises(Exception):
189             SubnetSettings(cidr=self.net_config.subnet_cidr)
190
191     def test_create_subnet_empty_name(self):
192         """
193         Tests the neutron_utils.create_neutron_net() function with an empty
194         name
195         """
196         self.network = neutron_utils.create_network(
197             self.neutron, self.os_creds, self.net_config.network_settings)
198         self.assertEqual(self.net_config.network_settings.name,
199                          self.network.name)
200         self.assertTrue(validate_network(
201             self.neutron, self.net_config.network_settings.name, True))
202
203         subnet_setting = self.net_config.network_settings.subnet_settings[0]
204         neutron_utils.create_subnet(
205             self.neutron, subnet_setting, self.os_creds, network=self.network)
206         self.assertTrue(validate_subnet(
207             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
208         self.assertFalse(validate_subnet(
209             self.neutron, '', subnet_setting.cidr, True))
210
211     def test_create_subnet_null_cidr(self):
212         """
213         Tests the neutron_utils.create_neutron_subnet() function for an
214         Exception when the subnet CIDR value is None
215         """
216         self.network = neutron_utils.create_network(
217             self.neutron, self.os_creds, self.net_config.network_settings)
218         self.assertEqual(self.net_config.network_settings.name,
219                          self.network.name)
220         self.assertTrue(validate_network(
221             self.neutron, self.net_config.network_settings.name, True))
222
223         with self.assertRaises(Exception):
224             sub_sets = SubnetSettings(
225                 cidr=None, name=self.net_config.subnet_name)
226             neutron_utils.create_subnet(
227                 self.neutron, sub_sets, self.os_creds, network=self.network)
228
229     def test_create_subnet_empty_cidr(self):
230         """
231         Tests the neutron_utils.create_neutron_subnet() function for an
232         Exception when the subnet CIDR value is empty
233         """
234         self.network = neutron_utils.create_network(
235             self.neutron, self.os_creds, self.net_config.network_settings)
236         self.assertEqual(self.net_config.network_settings.name,
237                          self.network.name)
238         self.assertTrue(validate_network(
239             self.neutron, self.net_config.network_settings.name, True))
240
241         with self.assertRaises(Exception):
242             sub_sets = SubnetSettings(
243                 cidr='', name=self.net_config.subnet_name)
244             neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
245                                         network=self.network)
246
247
248 class NeutronUtilsRouterTests(OSComponentTestCase):
249     """
250     Test for creating routers via neutron_utils.py
251     """
252
253     def setUp(self):
254         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
255         self.port_name = str(guid) + '-port'
256         self.neutron = neutron_utils.neutron_client(self.os_creds)
257         self.network = None
258         self.subnet = None
259         self.port = None
260         self.router = None
261         self.interface_router = None
262         self.net_config = openstack_tests.get_pub_net_config(
263             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
264             router_name=guid + '-pub-router', external_net=self.ext_net_name)
265
266     def tearDown(self):
267         """
268         Cleans the remote OpenStack objects
269         """
270         if self.interface_router:
271             neutron_utils.remove_interface_router(self.neutron, self.router,
272                                                   self.subnet)
273
274         if self.router:
275             neutron_utils.delete_router(self.neutron, self.router)
276             validate_router(self.neutron, self.router.name, False)
277
278         if self.port:
279             neutron_utils.delete_port(self.neutron, self.port)
280
281         if self.subnet:
282             neutron_utils.delete_subnet(self.neutron, self.subnet)
283
284         if self.network:
285             neutron_utils.delete_network(self.neutron, self.network)
286
287     def test_create_router_simple(self):
288         """
289         Tests the neutron_utils.create_neutron_net() function when an external
290         gateway is requested
291         """
292         self.router = neutron_utils.create_router(
293             self.neutron, self.os_creds, self.net_config.router_settings)
294         validate_router(self.neutron, self.net_config.router_settings.name,
295                         True)
296
297     def test_create_router_with_public_interface(self):
298         """
299         Tests the neutron_utils.create_neutron_net() function when an external
300         gateway is requested
301         """
302         subnet_setting = self.net_config.network_settings.subnet_settings[0]
303         self.net_config = openstack_tests.OSNetworkConfig(
304             self.net_config.network_settings.name,
305             subnet_setting.name,
306             subnet_setting.cidr,
307             self.net_config.router_settings.name,
308             self.ext_net_name)
309         self.router = neutron_utils.create_router(
310             self.neutron, self.os_creds, self.net_config.router_settings)
311         validate_router(self.neutron, self.net_config.router_settings.name,
312                         True)
313
314         ext_net = neutron_utils.get_network(
315             self.neutron, network_name=self.ext_net_name)
316         self.assertEqual(
317             self.router.external_gateway_info['network_id'], ext_net.id)
318
319     def test_create_router_empty_name(self):
320         """
321         Tests the neutron_utils.create_neutron_net() function
322         """
323         with self.assertRaises(Exception):
324             this_router_settings = create_router.RouterSettings(name='')
325             self.router = neutron_utils.create_router(self.neutron,
326                                                       self.os_creds,
327                                                       this_router_settings)
328
329     def test_create_router_null_name(self):
330         """
331         Tests the neutron_utils.create_neutron_subnet() function when the
332         subnet CIDR value is None
333         """
334         with self.assertRaises(Exception):
335             this_router_settings = create_router.RouterSettings()
336             self.router = neutron_utils.create_router(self.neutron,
337                                                       self.os_creds,
338                                                       this_router_settings)
339             validate_router(self.neutron, None, True)
340
341     def test_add_interface_router(self):
342         """
343         Tests the neutron_utils.add_interface_router() function
344         """
345         self.network = neutron_utils.create_network(
346             self.neutron, self.os_creds, self.net_config.network_settings)
347         self.assertEqual(self.net_config.network_settings.name,
348                          self.network.name)
349         self.assertTrue(validate_network(
350             self.neutron, self.net_config.network_settings.name, True))
351
352         subnet_setting = self.net_config.network_settings.subnet_settings[0]
353         self.subnet = neutron_utils.create_subnet(
354             self.neutron, subnet_setting,
355             self.os_creds, self.network)
356         self.assertTrue(validate_subnet(
357             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
358
359         self.router = neutron_utils.create_router(
360             self.neutron, self.os_creds, self.net_config.router_settings)
361         validate_router(self.neutron, self.net_config.router_settings.name,
362                         True)
363
364         self.interface_router = neutron_utils.add_interface_router(
365             self.neutron, self.router, self.subnet)
366         validate_interface_router(self.interface_router, self.router,
367                                   self.subnet)
368
369     def test_add_interface_router_null_router(self):
370         """
371         Tests the neutron_utils.add_interface_router() function for an
372         Exception when the router value is None
373         """
374         self.network = neutron_utils.create_network(
375             self.neutron, self.os_creds, self.net_config.network_settings)
376         self.assertEqual(self.net_config.network_settings.name,
377                          self.network.name)
378         self.assertTrue(validate_network(
379             self.neutron, self.net_config.network_settings.name, True))
380
381         subnet_setting = self.net_config.network_settings.subnet_settings[0]
382         self.subnet = neutron_utils.create_subnet(
383             self.neutron, subnet_setting,
384             self.os_creds, self.network)
385         self.assertTrue(validate_subnet(
386             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
387
388         with self.assertRaises(NeutronException):
389             self.interface_router = neutron_utils.add_interface_router(
390                 self.neutron, self.router, self.subnet)
391
392     def test_add_interface_router_null_subnet(self):
393         """
394         Tests the neutron_utils.add_interface_router() function for an
395         Exception when the subnet value is None
396         """
397         self.network = neutron_utils.create_network(
398             self.neutron, self.os_creds, self.net_config.network_settings)
399         self.assertEqual(self.net_config.network_settings.name,
400                          self.network.name)
401         self.assertTrue(validate_network(
402             self.neutron, self.net_config.network_settings.name, True))
403
404         self.router = neutron_utils.create_router(
405             self.neutron, self.os_creds, self.net_config.router_settings)
406         validate_router(self.neutron, self.net_config.router_settings.name,
407                         True)
408
409         with self.assertRaises(NeutronException):
410             self.interface_router = neutron_utils.add_interface_router(
411                 self.neutron, self.router, self.subnet)
412
413     def test_create_port(self):
414         """
415         Tests the neutron_utils.create_port() function
416         """
417         self.network = neutron_utils.create_network(
418             self.neutron, self.os_creds, self.net_config.network_settings)
419         self.assertEqual(self.net_config.network_settings.name,
420                          self.network.name)
421         self.assertTrue(validate_network(
422             self.neutron, self.net_config.network_settings.name, True))
423
424         subnet_setting = self.net_config.network_settings.subnet_settings[0]
425         self.subnet = neutron_utils.create_subnet(
426             self.neutron, subnet_setting, self.os_creds, self.network)
427         self.assertTrue(validate_subnet(
428             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
429
430         self.port = neutron_utils.create_port(
431             self.neutron, self.os_creds, PortSettings(
432                 name=self.port_name,
433                 ip_addrs=[{
434                     'subnet_name': subnet_setting.name,
435                     'ip': ip_1}],
436                 network_name=self.net_config.network_settings.name))
437         validate_port(self.neutron, self.port, self.port_name)
438
439     def test_create_port_empty_name(self):
440         """
441         Tests the neutron_utils.create_port() function
442         """
443         self.network = neutron_utils.create_network(
444             self.neutron, self.os_creds, self.net_config.network_settings)
445         self.assertEqual(self.net_config.network_settings.name,
446                          self.network.name)
447         self.assertTrue(validate_network(
448             self.neutron, self.net_config.network_settings.name, True))
449
450         subnet_setting = self.net_config.network_settings.subnet_settings[0]
451         self.subnet = neutron_utils.create_subnet(
452             self.neutron, subnet_setting, self.os_creds, self.network)
453         self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
454                                         subnet_setting.cidr, True))
455
456         self.port = neutron_utils.create_port(
457             self.neutron, self.os_creds, PortSettings(
458                 name=self.port_name,
459                 network_name=self.net_config.network_settings.name,
460                 ip_addrs=[{
461                     'subnet_name': subnet_setting.name,
462                     'ip': ip_1}]))
463         validate_port(self.neutron, self.port, self.port_name)
464
465     def test_create_port_null_name(self):
466         """
467         Tests the neutron_utils.create_port() function for an Exception when
468         the port name value is None
469         """
470         self.network = neutron_utils.create_network(
471             self.neutron, self.os_creds, self.net_config.network_settings)
472         self.assertEqual(self.net_config.network_settings.name,
473                          self.network.name)
474         self.assertTrue(validate_network(
475             self.neutron, self.net_config.network_settings.name, True))
476
477         subnet_setting = self.net_config.network_settings.subnet_settings[0]
478         self.subnet = neutron_utils.create_subnet(
479             self.neutron, subnet_setting,
480             self.os_creds, self.network)
481         self.assertTrue(validate_subnet(
482             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
483
484         with self.assertRaises(Exception):
485             self.port = neutron_utils.create_port(
486                 self.neutron, self.os_creds,
487                 PortSettings(
488                     network_name=self.net_config.network_settings.name,
489                     ip_addrs=[{
490                         'subnet_name': subnet_setting.name,
491                         'ip': ip_1}]))
492
493     def test_create_port_null_network_object(self):
494         """
495         Tests the neutron_utils.create_port() function for an Exception when
496         the network object is None
497         """
498         with self.assertRaises(Exception):
499             self.port = neutron_utils.create_port(
500                 self.neutron, self.os_creds,
501                 PortSettings(
502                     name=self.port_name,
503                     network_name=self.net_config.network_settings.name,
504                     ip_addrs=[{
505                         'subnet_name':
506                             self.net_config.network_settings.subnet_settings[
507                                 0].name,
508                         'ip': ip_1}]))
509
510     def test_create_port_null_ip(self):
511         """
512         Tests the neutron_utils.create_port() function for an Exception when
513         the IP value is None
514         """
515         self.network = neutron_utils.create_network(
516             self.neutron, self.os_creds, self.net_config.network_settings)
517         self.assertEqual(self.net_config.network_settings.name,
518                          self.network.name)
519         self.assertTrue(validate_network(
520             self.neutron, self.net_config.network_settings.name, True))
521
522         subnet_setting = self.net_config.network_settings.subnet_settings[0]
523         self.subnet = neutron_utils.create_subnet(
524             self.neutron, subnet_setting,
525             self.os_creds, self.network)
526         self.assertTrue(validate_subnet(
527             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
528
529         with self.assertRaises(Exception):
530             self.port = neutron_utils.create_port(
531                 self.neutron, self.os_creds,
532                 PortSettings(
533                     name=self.port_name,
534                     network_name=self.net_config.network_settings.name,
535                     ip_addrs=[{
536                         'subnet_name': subnet_setting.name,
537                         'ip': None}]))
538
539     def test_create_port_invalid_ip(self):
540         """
541         Tests the neutron_utils.create_port() function for an Exception when
542         the IP value is None
543         """
544         self.network = neutron_utils.create_network(
545             self.neutron, self.os_creds, self.net_config.network_settings)
546         self.assertEqual(self.net_config.network_settings.name,
547                          self.network.name)
548         self.assertTrue(validate_network(
549             self.neutron, self.net_config.network_settings.name, True))
550
551         subnet_setting = self.net_config.network_settings.subnet_settings[0]
552         self.subnet = neutron_utils.create_subnet(
553             self.neutron, subnet_setting, self.os_creds, self.network)
554         self.assertTrue(validate_subnet(
555             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
556
557         with self.assertRaises(Exception):
558             self.port = neutron_utils.create_port(
559                 self.neutron, self.os_creds,
560                 PortSettings(
561                     name=self.port_name,
562                     network_name=self.net_config.network_settings.name,
563                     ip_addrs=[{
564                         'subnet_name': subnet_setting.name,
565                         'ip': 'foo'}]))
566
567     def test_create_port_invalid_ip_to_subnet(self):
568         """
569         Tests the neutron_utils.create_port() function for an Exception when
570         the IP value is None
571         """
572         self.network = neutron_utils.create_network(
573             self.neutron, self.os_creds, self.net_config.network_settings)
574         self.assertEqual(self.net_config.network_settings.name,
575                          self.network.name)
576         self.assertTrue(validate_network(
577             self.neutron, self.net_config.network_settings.name, True))
578
579         subnet_setting = self.net_config.network_settings.subnet_settings[0]
580         self.subnet = neutron_utils.create_subnet(
581             self.neutron, subnet_setting, self.os_creds, self.network)
582         self.assertTrue(validate_subnet(
583             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
584
585         with self.assertRaises(Exception):
586             self.port = neutron_utils.create_port(
587                 self.neutron, self.os_creds,
588                 PortSettings(
589                     name=self.port_name,
590                     network_name=self.net_config.network_settings.name,
591                     ip_addrs=[{
592                         'subnet_name': subnet_setting.name,
593                         'ip': '10.197.123.100'}]))
594
595
596 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
597     """
598     Test for creating security groups via neutron_utils.py
599     """
600
601     def setUp(self):
602         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
603         self.sec_grp_name = guid + 'name'
604
605         self.security_groups = list()
606         self.security_group_rules = list()
607         self.neutron = neutron_utils.neutron_client(self.os_creds)
608         self.keystone = keystone_utils.keystone_client(self.os_creds)
609
610     def tearDown(self):
611         """
612         Cleans the remote OpenStack objects
613         """
614         for rule in self.security_group_rules:
615             neutron_utils.delete_security_group_rule(self.neutron, rule)
616
617         for security_group in self.security_groups:
618             try:
619                 neutron_utils.delete_security_group(self.neutron,
620                                                     security_group)
621             except:
622                 pass
623
624     def test_create_delete_simple_sec_grp(self):
625         """
626         Tests the neutron_utils.create_security_group() function
627         """
628         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
629         security_group = neutron_utils.create_security_group(self.neutron,
630                                                              self.keystone,
631                                                              sec_grp_settings)
632
633         self.assertTrue(sec_grp_settings.name, security_group.name)
634
635         sec_grp_get = neutron_utils.get_security_group(
636             self.neutron, sec_grp_settings=sec_grp_settings)
637         self.assertIsNotNone(sec_grp_get)
638         self.assertTrue(validation_utils.objects_equivalent(
639             security_group, sec_grp_get))
640
641         neutron_utils.delete_security_group(self.neutron, security_group)
642         sec_grp_get = neutron_utils.get_security_group(
643             self.neutron, sec_grp_settings=sec_grp_settings)
644         self.assertIsNone(sec_grp_get)
645
646     def test_create_sec_grp_no_name(self):
647         """
648         Tests the SecurityGroupSettings constructor and
649         neutron_utils.create_security_group() function to ensure that
650         attempting to create a security group without a name will raise an
651         exception
652         """
653         with self.assertRaises(Exception):
654             sec_grp_settings = SecurityGroupSettings()
655             self.security_groups.append(
656                 neutron_utils.create_security_group(self.neutron,
657                                                     self.keystone,
658                                                     sec_grp_settings))
659
660     def test_create_sec_grp_no_rules(self):
661         """
662         Tests the neutron_utils.create_security_group() function
663         """
664         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
665                                                  description='hello group')
666         self.security_groups.append(
667             neutron_utils.create_security_group(self.neutron, self.keystone,
668                                                 sec_grp_settings))
669
670         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
671         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
672
673         sec_grp_get = neutron_utils.get_security_group(
674             self.neutron, sec_grp_settings=sec_grp_settings)
675         self.assertIsNotNone(sec_grp_get)
676         self.assertEqual(self.security_groups[0], sec_grp_get)
677
678     def test_create_sec_grp_one_rule(self):
679         """
680         Tests the neutron_utils.create_security_group() function
681         """
682
683         sec_grp_rule_settings = SecurityGroupRuleSettings(
684             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
685         sec_grp_settings = SecurityGroupSettings(
686             name=self.sec_grp_name, description='hello group',
687             rule_settings=[sec_grp_rule_settings])
688
689         self.security_groups.append(
690             neutron_utils.create_security_group(self.neutron, self.keystone,
691                                                 sec_grp_settings))
692         free_rules = neutron_utils.get_rules_by_security_group(
693             self.neutron, self.security_groups[0])
694         for free_rule in free_rules:
695             self.security_group_rules.append(free_rule)
696
697         self.security_group_rules.append(
698             neutron_utils.create_security_group_rule(
699                 self.neutron, sec_grp_settings.rule_settings[0]))
700
701         # Refresh object so it is populated with the newly added rule
702         security_group = neutron_utils.get_security_group(
703             self.neutron, sec_grp_settings=sec_grp_settings)
704
705         rules = neutron_utils.get_rules_by_security_group(self.neutron,
706                                                           security_group)
707
708         self.assertTrue(
709             validation_utils.objects_equivalent(
710                  self.security_group_rules, rules))
711
712         self.assertTrue(sec_grp_settings.name, security_group.name)
713
714         sec_grp_get = neutron_utils.get_security_group(
715             self.neutron, sec_grp_settings=sec_grp_settings)
716         self.assertIsNotNone(sec_grp_get)
717         self.assertEqual(security_group, sec_grp_get)
718
719     def test_get_sec_grp_by_id(self):
720         """
721         Tests the neutron_utils.create_security_group() function
722         """
723
724         self.security_groups.append(neutron_utils.create_security_group(
725             self.neutron, self.keystone,
726             SecurityGroupSettings(name=self.sec_grp_name + '-1',
727                                   description='hello group')))
728         self.security_groups.append(neutron_utils.create_security_group(
729             self.neutron, self.keystone,
730             SecurityGroupSettings(name=self.sec_grp_name + '-2',
731                                   description='hello group')))
732
733         sec_grp_1b = neutron_utils.get_security_group_by_id(
734             self.neutron, self.security_groups[0].id)
735         sec_grp_2b = neutron_utils.get_security_group_by_id(
736             self.neutron, self.security_groups[1].id)
737
738         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
739         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
740
741
742 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
743     """
744     Test basic nova keypair functionality
745     """
746
747     def setUp(self):
748         """
749         Instantiates the CreateImage object that is responsible for downloading
750         and creating an OS image file within OpenStack
751         """
752         self.neutron = neutron_utils.neutron_client(self.os_creds)
753         self.floating_ip = None
754
755     def tearDown(self):
756         """
757         Cleans the image and downloaded image file
758         """
759         if self.floating_ip:
760             neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
761
762     def test_floating_ips(self):
763         """
764         Tests the creation of a floating IP
765         :return:
766         """
767         initial_fips = neutron_utils.get_floating_ips(self.neutron)
768
769         self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
770                                                             self.ext_net_name)
771         all_fips = neutron_utils.get_floating_ips(self.neutron)
772         self.assertEqual(len(initial_fips) + 1, len(all_fips))
773         returned = neutron_utils.get_floating_ip(self.neutron,
774                                                  self.floating_ip)
775         self.assertEqual(self.floating_ip.id, returned.id)
776         self.assertEqual(self.floating_ip.ip, returned.ip)
777
778
779 """
780 Validation routines
781 """
782
783
784 def validate_network(neutron, name, exists):
785     """
786     Returns true if a network for a given name DOES NOT exist if the exists
787     parameter is false conversely true. Returns false if a network for a given
788     name DOES exist if the exists parameter is true conversely false.
789     :param neutron: The neutron client
790     :param name: The expected network name
791     :param exists: Whether or not the network name should exist or not
792     :return: True/False
793     """
794     network = neutron_utils.get_network(neutron, network_name=name)
795     if exists and network:
796         return True
797     if not exists and not network:
798         return True
799     return False
800
801
802 def validate_subnet(neutron, name, cidr, exists):
803     """
804     Returns true if a subnet for a given name DOES NOT exist if the exists
805     parameter is false conversely true. Returns false if a subnet for a given
806     name DOES exist if the exists parameter is true conversely false.
807     :param neutron: The neutron client
808     :param name: The expected subnet name
809     :param cidr: The expected CIDR value
810     :param exists: Whether or not the network name should exist or not
811     :return: True/False
812     """
813     subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
814     if exists and subnet and subnet.name == name:
815         return subnet.cidr == cidr
816     if not exists and not subnet:
817         return True
818     return False
819
820
821 def validate_router(neutron, name, exists):
822     """
823     Returns true if a router for a given name DOES NOT exist if the exists
824     parameter is false conversely true. Returns false if a router for a given
825     name DOES exist if the exists parameter is true conversely false.
826     :param neutron: The neutron client
827     :param name: The expected router name
828     :param exists: Whether or not the network name should exist or not
829     :return: True/False
830     """
831     router = neutron_utils.get_router(neutron, router_name=name)
832     if exists and router:
833         return True
834     return False
835
836
837 def validate_interface_router(interface_router, router, subnet):
838     """
839     Returns true if the router ID & subnet ID have been properly included into
840     the interface router object
841     :param interface_router: the SNAPS-OO InterfaceRouter domain object
842     :param router: to validate against the interface_router
843     :param subnet: to validate against the interface_router
844     :return: True if both IDs match else False
845     """
846     subnet_id = interface_router.subnet_id
847     router_id = interface_router.port_id
848
849     return subnet.id == subnet_id and router.id == router_id
850
851
852 def validate_port(neutron, port_obj, this_port_name):
853     """
854     Returns true if a port for a given name DOES NOT exist if the exists
855     parameter is false conversely true. Returns false if a port for a given
856     name DOES exist if the exists parameter is true conversely false.
857     :param neutron: The neutron client
858     :param port_obj: The port object to lookup
859     :param this_port_name: The expected router name
860     :return: True/False
861     """
862     os_ports = neutron.list_ports()
863     for os_port, os_port_insts in os_ports.items():
864         for os_inst in os_port_insts:
865             if os_inst['id'] == port_obj.id:
866                 return os_inst['name'] == this_port_name
867     return False