93f09411b36feca62b3f40d97b9a1254d52d7b47
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from snaps.openstack import create_router
18 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
19     PortSettings
20 from snaps.openstack.create_security_group import SecurityGroupSettings, \
21     SecurityGroupRuleSettings, Direction
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27
28 __author__ = 'spisarski'
29
30 ip_1 = '10.55.1.100'
31 ip_2 = '10.55.1.200'
32
33
34 class NeutronSmokeTests(OSComponentTestCase):
35     """
36     Tests to ensure that the neutron client can communicate with the cloud
37     """
38
39     def test_neutron_connect_success(self):
40         """
41         Tests to ensure that the proper credentials can connect.
42         """
43         neutron = neutron_utils.neutron_client(self.os_creds)
44
45         networks = neutron.list_networks()
46
47         found = False
48         networks = networks.get('networks')
49         for network in networks:
50             if network.get('name') == self.ext_net_name:
51                 found = True
52         self.assertTrue(found)
53
54     def test_neutron_connect_fail(self):
55         """
56         Tests to ensure that the improper credentials cannot connect.
57         """
58         from snaps.openstack.os_credentials import OSCreds
59
60         with self.assertRaises(Exception):
61             neutron = neutron_utils.neutron_client(
62                 OSCreds(username='user', password='pass', auth_url='url',
63                         project_name='project'))
64             neutron.list_networks()
65
66     def test_retrieve_ext_network_name(self):
67         """
68         Tests the neutron_utils.get_external_network_names to ensure the
69         configured self.ext_net_name is contained within the returned list
70         :return:
71         """
72         neutron = neutron_utils.neutron_client(self.os_creds)
73         ext_networks = neutron_utils.get_external_networks(neutron)
74         found = False
75         for network in ext_networks:
76             if network.name == self.ext_net_name:
77                 found = True
78                 break
79         self.assertTrue(found)
80
81
82 class NeutronUtilsNetworkTests(OSComponentTestCase):
83     """
84     Test for creating networks via neutron_utils.py
85     """
86
87     def setUp(self):
88         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
89         self.port_name = str(guid) + '-port'
90         self.neutron = neutron_utils.neutron_client(self.os_creds)
91         self.network = None
92         self.net_config = openstack_tests.get_pub_net_config(
93             net_name=guid + '-pub-net')
94
95     def tearDown(self):
96         """
97         Cleans the remote OpenStack objects
98         """
99         if self.network:
100             neutron_utils.delete_network(self.neutron, self.network)
101             validate_network(self.neutron, self.network.name, False)
102
103     def test_create_network(self):
104         """
105         Tests the neutron_utils.create_neutron_net() function
106         """
107         self.network = neutron_utils.create_network(
108             self.neutron, self.os_creds, self.net_config.network_settings)
109         self.assertEqual(self.net_config.network_settings.name,
110                          self.network.name)
111         self.assertTrue(validate_network(
112             self.neutron, self.net_config.network_settings.name, True))
113
114     def test_create_network_empty_name(self):
115         """
116         Tests the neutron_utils.create_neutron_net() function with an empty
117         network name
118         """
119         with self.assertRaises(Exception):
120             self.network = neutron_utils.create_network(
121                 self.neutron, self.os_creds,
122                 network_settings=NetworkSettings(name=''))
123
124     def test_create_network_null_name(self):
125         """
126         Tests the neutron_utils.create_neutron_net() function when the network
127         name is None
128         """
129         with self.assertRaises(Exception):
130             self.network = neutron_utils.create_network(
131                 self.neutron, self.os_creds,
132                 network_settings=NetworkSettings())
133
134
135 class NeutronUtilsSubnetTests(OSComponentTestCase):
136     """
137     Test for creating networks with subnets via neutron_utils.py
138     """
139
140     def setUp(self):
141         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
142         self.port_name = str(guid) + '-port'
143         self.neutron = neutron_utils.neutron_client(self.os_creds)
144         self.network = None
145         self.subnet = None
146         self.net_config = openstack_tests.get_pub_net_config(
147             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
148             external_net=self.ext_net_name)
149
150     def tearDown(self):
151         """
152         Cleans the remote OpenStack objects
153         """
154         if self.subnet:
155             neutron_utils.delete_subnet(self.neutron, self.subnet)
156             validate_subnet(self.neutron, self.subnet.name,
157                             self.net_config.network_settings.subnet_settings[
158                                 0].cidr, False)
159
160         if self.network:
161             neutron_utils.delete_network(self.neutron, self.network)
162             validate_network(self.neutron, self.network.name, False)
163
164     def test_create_subnet(self):
165         """
166         Tests the neutron_utils.create_neutron_net() function
167         """
168         self.network = neutron_utils.create_network(
169             self.neutron, self.os_creds, self.net_config.network_settings)
170         self.assertEqual(self.net_config.network_settings.name,
171                          self.network.name)
172         self.assertTrue(validate_network(
173             self.neutron, self.net_config.network_settings.name, True))
174
175         subnet_setting = self.net_config.network_settings.subnet_settings[0]
176         self.subnet = neutron_utils.create_subnet(
177             self.neutron, subnet_setting, self.os_creds, network=self.network)
178         validate_subnet(
179             self.neutron, subnet_setting.name, subnet_setting.cidr, True)
180
181     def test_create_subnet_null_name(self):
182         """
183         Tests the neutron_utils.create_neutron_subnet() function for an
184         Exception when the subnet name is None
185         """
186         self.network = neutron_utils.create_network(
187             self.neutron, self.os_creds, self.net_config.network_settings)
188         self.assertEqual(self.net_config.network_settings.name,
189                          self.network.name)
190         self.assertTrue(validate_network(
191             self.neutron, self.net_config.network_settings.name, True))
192
193         with self.assertRaises(Exception):
194             SubnetSettings(cidr=self.net_config.subnet_cidr)
195
196     def test_create_subnet_empty_name(self):
197         """
198         Tests the neutron_utils.create_neutron_net() function with an empty
199         name
200         """
201         self.network = neutron_utils.create_network(
202             self.neutron, self.os_creds, self.net_config.network_settings)
203         self.assertEqual(self.net_config.network_settings.name,
204                          self.network.name)
205         self.assertTrue(validate_network(
206             self.neutron, self.net_config.network_settings.name, True))
207
208         subnet_setting = self.net_config.network_settings.subnet_settings[0]
209         neutron_utils.create_subnet(
210             self.neutron, subnet_setting, self.os_creds, network=self.network)
211         validate_subnet(self.neutron, '', subnet_setting.cidr, True)
212
213     def test_create_subnet_null_cidr(self):
214         """
215         Tests the neutron_utils.create_neutron_subnet() function for an
216         Exception when the subnet CIDR value is None
217         """
218         self.network = neutron_utils.create_network(
219             self.neutron, self.os_creds, self.net_config.network_settings)
220         self.assertEqual(self.net_config.network_settings.name,
221                          self.network.name)
222         self.assertTrue(validate_network(
223             self.neutron, self.net_config.network_settings.name, True))
224
225         with self.assertRaises(Exception):
226             sub_sets = SubnetSettings(
227                 cidr=None, name=self.net_config.subnet_name)
228             neutron_utils.create_subnet(
229                 self.neutron, sub_sets, self.os_creds, network=self.network)
230
231     def test_create_subnet_empty_cidr(self):
232         """
233         Tests the neutron_utils.create_neutron_subnet() function for an
234         Exception when the subnet CIDR value is empty
235         """
236         self.network = neutron_utils.create_network(
237             self.neutron, self.os_creds, self.net_config.network_settings)
238         self.assertEqual(self.net_config.network_settings.name,
239                          self.network.name)
240         self.assertTrue(validate_network(
241             self.neutron, self.net_config.network_settings.name, True))
242
243         with self.assertRaises(Exception):
244             sub_sets = SubnetSettings(
245                 cidr='', name=self.net_config.subnet_name)
246             neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
247                                         network=self.network)
248
249
250 class NeutronUtilsRouterTests(OSComponentTestCase):
251     """
252     Test for creating routers via neutron_utils.py
253     """
254
255     def setUp(self):
256         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
257         self.port_name = str(guid) + '-port'
258         self.neutron = neutron_utils.neutron_client(self.os_creds)
259         self.network = None
260         self.subnet = None
261         self.port = None
262         self.router = None
263         self.interface_router = None
264         self.net_config = openstack_tests.get_pub_net_config(
265             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
266             router_name=guid + '-pub-router', external_net=self.ext_net_name)
267
268     def tearDown(self):
269         """
270         Cleans the remote OpenStack objects
271         """
272         if self.interface_router:
273             neutron_utils.remove_interface_router(self.neutron, self.router,
274                                                   self.subnet)
275
276         if self.router:
277             neutron_utils.delete_router(self.neutron, self.router)
278             validate_router(self.neutron, self.router.name, False)
279
280         if self.port:
281             neutron_utils.delete_port(self.neutron, self.port)
282
283         if self.subnet:
284             neutron_utils.delete_subnet(self.neutron, self.subnet)
285             validate_subnet(
286                 self.neutron, self.subnet.name,
287                 self.net_config.network_settings.subnet_settings[0].cidr,
288                 False)
289
290         if self.network:
291             neutron_utils.delete_network(self.neutron, self.network)
292             validate_network(self.neutron, self.network.name, False)
293
294     def test_create_router_simple(self):
295         """
296         Tests the neutron_utils.create_neutron_net() function when an external
297         gateway is requested
298         """
299         self.router = neutron_utils.create_router(
300             self.neutron, self.os_creds, self.net_config.router_settings)
301         validate_router(self.neutron, self.net_config.router_settings.name,
302                         True)
303
304     def test_create_router_with_public_interface(self):
305         """
306         Tests the neutron_utils.create_neutron_net() function when an external
307         gateway is requested
308         """
309         subnet_setting = self.net_config.network_settings.subnet_settings[0]
310         self.net_config = openstack_tests.OSNetworkConfig(
311             self.net_config.network_settings.name,
312             subnet_setting.name,
313             subnet_setting.cidr,
314             self.net_config.router_settings.name,
315             self.ext_net_name)
316         self.router = neutron_utils.create_router(
317             self.neutron, self.os_creds, self.net_config.router_settings)
318         validate_router(self.neutron, self.net_config.router_settings.name,
319                         True)
320         # TODO - Add validation that the router gatway has been set
321
322     def test_create_router_empty_name(self):
323         """
324         Tests the neutron_utils.create_neutron_net() function
325         """
326         with self.assertRaises(Exception):
327             this_router_settings = create_router.RouterSettings(name='')
328             self.router = neutron_utils.create_router(self.neutron,
329                                                       self.os_creds,
330                                                       this_router_settings)
331
332     def test_create_router_null_name(self):
333         """
334         Tests the neutron_utils.create_neutron_subnet() function when the
335         subnet CIDR value is None
336         """
337         with self.assertRaises(Exception):
338             this_router_settings = create_router.RouterSettings()
339             self.router = neutron_utils.create_router(self.neutron,
340                                                       self.os_creds,
341                                                       this_router_settings)
342             validate_router(self.neutron, None, True)
343
344     def test_add_interface_router(self):
345         """
346         Tests the neutron_utils.add_interface_router() function
347         """
348         self.network = neutron_utils.create_network(
349             self.neutron, self.os_creds, self.net_config.network_settings)
350         self.assertEqual(self.net_config.network_settings.name,
351                          self.network.name)
352         self.assertTrue(validate_network(
353             self.neutron, self.net_config.network_settings.name, True))
354
355         subnet_setting = self.net_config.network_settings.subnet_settings[0]
356         self.subnet = neutron_utils.create_subnet(
357             self.neutron, subnet_setting,
358             self.os_creds, self.network)
359         validate_subnet(
360             self.neutron,
361             subnet_setting.name,
362             subnet_setting.cidr, True)
363
364         self.router = neutron_utils.create_router(
365             self.neutron, self.os_creds, self.net_config.router_settings)
366         validate_router(self.neutron, self.net_config.router_settings.name,
367                         True)
368
369         self.interface_router = neutron_utils.add_interface_router(
370             self.neutron, self.router, self.subnet)
371         validate_interface_router(self.interface_router, self.router,
372                                   self.subnet)
373
374     def test_add_interface_router_null_router(self):
375         """
376         Tests the neutron_utils.add_interface_router() function for an
377         Exception when the router value is None
378         """
379         self.network = neutron_utils.create_network(
380             self.neutron, self.os_creds, self.net_config.network_settings)
381         self.assertEqual(self.net_config.network_settings.name,
382                          self.network.name)
383         self.assertTrue(validate_network(
384             self.neutron, self.net_config.network_settings.name, True))
385
386         subnet_setting = self.net_config.network_settings.subnet_settings[0]
387         self.subnet = neutron_utils.create_subnet(
388             self.neutron, subnet_setting,
389             self.os_creds, self.network)
390         validate_subnet(
391             self.neutron, subnet_setting.name, subnet_setting.cidr, True)
392
393         with self.assertRaises(Exception):
394             self.interface_router = neutron_utils.add_interface_router(
395                 self.neutron, self.router, self.subnet)
396
397     def test_add_interface_router_null_subnet(self):
398         """
399         Tests the neutron_utils.add_interface_router() function for an
400         Exception when the subnet value is None
401         """
402         self.network = neutron_utils.create_network(
403             self.neutron, self.os_creds, self.net_config.network_settings)
404         self.assertEqual(self.net_config.network_settings.name,
405                          self.network.name)
406         self.assertTrue(validate_network(
407             self.neutron, self.net_config.network_settings.name, True))
408
409         self.router = neutron_utils.create_router(
410             self.neutron, self.os_creds, self.net_config.router_settings)
411         validate_router(self.neutron, self.net_config.router_settings.name,
412                         True)
413
414         with self.assertRaises(Exception):
415             self.interface_router = neutron_utils.add_interface_router(
416                 self.neutron, self.router, self.subnet)
417
418     def test_create_port(self):
419         """
420         Tests the neutron_utils.create_port() function
421         """
422         self.network = neutron_utils.create_network(
423             self.neutron, self.os_creds, self.net_config.network_settings)
424         self.assertEqual(self.net_config.network_settings.name,
425                          self.network.name)
426         self.assertTrue(validate_network(
427             self.neutron, self.net_config.network_settings.name, True))
428
429         subnet_setting = self.net_config.network_settings.subnet_settings[0]
430         self.subnet = neutron_utils.create_subnet(
431             self.neutron, subnet_setting, self.os_creds, self.network)
432         validate_subnet(self.neutron, subnet_setting.name,
433                         subnet_setting.cidr, True)
434
435         self.port = neutron_utils.create_port(
436             self.neutron, self.os_creds, PortSettings(
437                 name=self.port_name,
438                 ip_addrs=[{
439                     'subnet_name': subnet_setting.name,
440                     'ip': ip_1}],
441                 network_name=self.net_config.network_settings.name))
442         validate_port(self.neutron, self.port, self.port_name)
443
444     def test_create_port_empty_name(self):
445         """
446         Tests the neutron_utils.create_port() function
447         """
448         self.network = neutron_utils.create_network(
449             self.neutron, self.os_creds, self.net_config.network_settings)
450         self.assertEqual(self.net_config.network_settings.name,
451                          self.network.name)
452         self.assertTrue(validate_network(
453             self.neutron, self.net_config.network_settings.name, True))
454
455         subnet_setting = self.net_config.network_settings.subnet_settings[0]
456         self.subnet = neutron_utils.create_subnet(
457             self.neutron, subnet_setting, self.os_creds, self.network)
458         validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr,
459                         True)
460
461         self.port = neutron_utils.create_port(
462             self.neutron, self.os_creds, PortSettings(
463                 name=self.port_name,
464                 network_name=self.net_config.network_settings.name,
465                 ip_addrs=[{
466                     'subnet_name': subnet_setting.name,
467                     'ip': ip_1}]))
468         validate_port(self.neutron, self.port, self.port_name)
469
470     def test_create_port_null_name(self):
471         """
472         Tests the neutron_utils.create_port() function for an Exception when
473         the port name value is None
474         """
475         self.network = neutron_utils.create_network(
476             self.neutron, self.os_creds, self.net_config.network_settings)
477         self.assertEqual(self.net_config.network_settings.name,
478                          self.network.name)
479         self.assertTrue(validate_network(
480             self.neutron, self.net_config.network_settings.name, True))
481
482         subnet_setting = self.net_config.network_settings.subnet_settings[0]
483         self.subnet = neutron_utils.create_subnet(
484             self.neutron, subnet_setting,
485             self.os_creds, self.network)
486         validate_subnet(
487             self.neutron,
488             subnet_setting.name,
489             subnet_setting.cidr, True)
490
491         with self.assertRaises(Exception):
492             self.port = neutron_utils.create_port(
493                 self.neutron, self.os_creds,
494                 PortSettings(
495                     network_name=self.net_config.network_settings.name,
496                     ip_addrs=[{
497                         'subnet_name': subnet_setting.name,
498                         'ip': ip_1}]))
499
500     def test_create_port_null_network_object(self):
501         """
502         Tests the neutron_utils.create_port() function for an Exception when
503         the network object is None
504         """
505         with self.assertRaises(Exception):
506             self.port = neutron_utils.create_port(
507                 self.neutron, self.os_creds,
508                 PortSettings(
509                     name=self.port_name,
510                     network_name=self.net_config.network_settings.name,
511                     ip_addrs=[{
512                         'subnet_name':
513                             self.net_config.network_settings.subnet_settings[
514                                 0].name,
515                         'ip': ip_1}]))
516
517     def test_create_port_null_ip(self):
518         """
519         Tests the neutron_utils.create_port() function for an Exception when
520         the IP value is None
521         """
522         self.network = neutron_utils.create_network(
523             self.neutron, self.os_creds, self.net_config.network_settings)
524         self.assertEqual(self.net_config.network_settings.name,
525                          self.network.name)
526         self.assertTrue(validate_network(
527             self.neutron, self.net_config.network_settings.name, True))
528
529         subnet_setting = self.net_config.network_settings.subnet_settings[0]
530         self.subnet = neutron_utils.create_subnet(
531             self.neutron, subnet_setting,
532             self.os_creds, self.network)
533         validate_subnet(
534             self.neutron,
535             subnet_setting.name,
536             subnet_setting.cidr, True)
537
538         with self.assertRaises(Exception):
539             self.port = neutron_utils.create_port(
540                 self.neutron, self.os_creds,
541                 PortSettings(
542                     name=self.port_name,
543                     network_name=self.net_config.network_settings.name,
544                     ip_addrs=[{
545                         'subnet_name': subnet_setting.name,
546                         'ip': None}]))
547
548     def test_create_port_invalid_ip(self):
549         """
550         Tests the neutron_utils.create_port() function for an Exception when
551         the IP value is None
552         """
553         self.network = neutron_utils.create_network(
554             self.neutron, self.os_creds, self.net_config.network_settings)
555         self.assertEqual(self.net_config.network_settings.name,
556                          self.network.name)
557         self.assertTrue(validate_network(
558             self.neutron, self.net_config.network_settings.name, True))
559
560         subnet_setting = self.net_config.network_settings.subnet_settings[0]
561         self.subnet = neutron_utils.create_subnet(
562             self.neutron, subnet_setting, self.os_creds, self.network)
563         validate_subnet(self.neutron,
564                         subnet_setting.name,
565                         subnet_setting.cidr, True)
566
567         with self.assertRaises(Exception):
568             self.port = neutron_utils.create_port(
569                 self.neutron, self.os_creds,
570                 PortSettings(
571                     name=self.port_name,
572                     network_name=self.net_config.network_settings.name,
573                     ip_addrs=[{
574                         'subnet_name': subnet_setting.name,
575                         'ip': 'foo'}]))
576
577     def test_create_port_invalid_ip_to_subnet(self):
578         """
579         Tests the neutron_utils.create_port() function for an Exception when
580         the IP value is None
581         """
582         self.network = neutron_utils.create_network(
583             self.neutron, self.os_creds, self.net_config.network_settings)
584         self.assertEqual(self.net_config.network_settings.name,
585                          self.network.name)
586         self.assertTrue(validate_network(
587             self.neutron, self.net_config.network_settings.name, True))
588
589         subnet_setting = self.net_config.network_settings.subnet_settings[0]
590         self.subnet = neutron_utils.create_subnet(
591             self.neutron, subnet_setting, self.os_creds, self.network)
592         validate_subnet(
593             self.neutron, subnet_setting.name, subnet_setting.cidr, True)
594
595         with self.assertRaises(Exception):
596             self.port = neutron_utils.create_port(
597                 self.neutron, self.os_creds,
598                 PortSettings(
599                     name=self.port_name,
600                     network_name=self.net_config.network_settings.name,
601                     ip_addrs=[{
602                         'subnet_name': subnet_setting.name,
603                         'ip': '10.197.123.100'}]))
604
605
606 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
607     """
608     Test for creating security groups via neutron_utils.py
609     """
610
611     def setUp(self):
612         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
613         self.sec_grp_name = guid + 'name'
614
615         self.security_groups = list()
616         self.security_group_rules = list()
617         self.neutron = neutron_utils.neutron_client(self.os_creds)
618         self.keystone = keystone_utils.keystone_client(self.os_creds)
619
620     def tearDown(self):
621         """
622         Cleans the remote OpenStack objects
623         """
624         for rule in self.security_group_rules:
625             neutron_utils.delete_security_group_rule(self.neutron, rule)
626
627         for security_group in self.security_groups:
628             try:
629                 neutron_utils.delete_security_group(self.neutron,
630                                                     security_group)
631             except:
632                 pass
633
634     def test_create_delete_simple_sec_grp(self):
635         """
636         Tests the neutron_utils.create_security_group() function
637         """
638         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
639         security_group = neutron_utils.create_security_group(self.neutron,
640                                                              self.keystone,
641                                                              sec_grp_settings)
642
643         self.assertTrue(sec_grp_settings.name, security_group.name)
644
645         sec_grp_get = neutron_utils.get_security_group(self.neutron,
646                                                        sec_grp_settings.name)
647         self.assertIsNotNone(sec_grp_get)
648         self.assertTrue(validation_utils.objects_equivalent(
649             security_group, sec_grp_get))
650
651         neutron_utils.delete_security_group(self.neutron, security_group)
652         sec_grp_get = neutron_utils.get_security_group(self.neutron,
653                                                        sec_grp_settings.name)
654         self.assertIsNone(sec_grp_get)
655
656     def test_create_sec_grp_no_name(self):
657         """
658         Tests the SecurityGroupSettings constructor and
659         neutron_utils.create_security_group() function to ensure that
660         attempting to create a security group without a name will raise an
661         exception
662         """
663         with self.assertRaises(Exception):
664             sec_grp_settings = SecurityGroupSettings()
665             self.security_groups.append(
666                 neutron_utils.create_security_group(self.neutron,
667                                                     self.keystone,
668                                                     sec_grp_settings))
669
670     def test_create_sec_grp_no_rules(self):
671         """
672         Tests the neutron_utils.create_security_group() function
673         """
674         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
675                                                  description='hello group')
676         self.security_groups.append(
677             neutron_utils.create_security_group(self.neutron, self.keystone,
678                                                 sec_grp_settings))
679
680         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
681         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
682
683         sec_grp_get = neutron_utils.get_security_group(self.neutron,
684                                                        sec_grp_settings.name)
685         self.assertIsNotNone(sec_grp_get)
686         self.assertEqual(self.security_groups[0], sec_grp_get)
687
688     def test_create_sec_grp_one_rule(self):
689         """
690         Tests the neutron_utils.create_security_group() function
691         """
692
693         sec_grp_rule_settings = SecurityGroupRuleSettings(
694             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
695         sec_grp_settings = SecurityGroupSettings(
696             name=self.sec_grp_name, description='hello group',
697             rule_settings=[sec_grp_rule_settings])
698
699         self.security_groups.append(
700             neutron_utils.create_security_group(self.neutron, self.keystone,
701                                                 sec_grp_settings))
702         free_rules = neutron_utils.get_rules_by_security_group(
703             self.neutron, self.security_groups[0])
704         for free_rule in free_rules:
705             self.security_group_rules.append(free_rule)
706
707         self.security_group_rules.append(
708             neutron_utils.create_security_group_rule(
709                 self.neutron, sec_grp_settings.rule_settings[0]))
710
711         # Refresh object so it is populated with the newly added rule
712         security_group = neutron_utils.get_security_group(
713             self.neutron, sec_grp_settings.name)
714
715         rules = neutron_utils.get_rules_by_security_group(self.neutron,
716                                                           security_group)
717
718         self.assertTrue(
719             validation_utils.objects_equivalent(
720                  self.security_group_rules, rules))
721
722         self.assertTrue(sec_grp_settings.name, security_group.name)
723
724         sec_grp_get = neutron_utils.get_security_group(self.neutron,
725                                                        sec_grp_settings.name)
726         self.assertIsNotNone(sec_grp_get)
727         self.assertEqual(security_group, sec_grp_get)
728
729     def test_get_sec_grp_by_id(self):
730         """
731         Tests the neutron_utils.create_security_group() function
732         """
733
734         self.security_groups.append(neutron_utils.create_security_group(
735             self.neutron, self.keystone,
736             SecurityGroupSettings(name=self.sec_grp_name + '-1',
737                                   description='hello group')))
738         self.security_groups.append(neutron_utils.create_security_group(
739             self.neutron, self.keystone,
740             SecurityGroupSettings(name=self.sec_grp_name + '-2',
741                                   description='hello group')))
742
743         sec_grp_1b = neutron_utils.get_security_group_by_id(
744             self.neutron, self.security_groups[0].id)
745         sec_grp_2b = neutron_utils.get_security_group_by_id(
746             self.neutron, self.security_groups[1].id)
747
748         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
749         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
750
751
752 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
753     """
754     Test basic nova keypair functionality
755     """
756
757     def setUp(self):
758         """
759         Instantiates the CreateImage object that is responsible for downloading
760         and creating an OS image file within OpenStack
761         """
762         self.neutron = neutron_utils.neutron_client(self.os_creds)
763         self.floating_ip = None
764
765     def tearDown(self):
766         """
767         Cleans the image and downloaded image file
768         """
769         if self.floating_ip:
770             neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
771
772     def test_floating_ips(self):
773         """
774         Tests the creation of a floating IP
775         :return:
776         """
777         initial_fips = neutron_utils.get_floating_ips(self.neutron)
778
779         self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
780                                                             self.ext_net_name)
781         all_fips = neutron_utils.get_floating_ips(self.neutron)
782         self.assertEqual(len(initial_fips) + 1, len(all_fips))
783         returned = neutron_utils.get_floating_ip(self.neutron,
784                                                  self.floating_ip)
785         self.assertEqual(self.floating_ip.id, returned.id)
786         self.assertEqual(self.floating_ip.ip, returned.ip)
787
788
789 """
790 Validation routines
791 """
792
793
794 def validate_network(neutron, name, exists):
795     """
796     Returns true if a network for a given name DOES NOT exist if the exists
797     parameter is false conversely true. Returns false if a network for a given
798     name DOES exist if the exists parameter is true conversely false.
799     :param neutron: The neutron client
800     :param name: The expected network name
801     :param exists: Whether or not the network name should exist or not
802     :return: True/False
803     """
804     network = neutron_utils.get_network(neutron, name)
805     if exists and network:
806         return True
807     if not exists and not network:
808         return True
809     return False
810
811
812 def validate_subnet(neutron, name, cidr, exists):
813     """
814     Returns true if a subnet for a given name DOES NOT exist if the exists
815     parameter is false conversely true. Returns false if a subnet for a given
816     name DOES exist if the exists parameter is true conversely false.
817     :param neutron: The neutron client
818     :param name: The expected subnet name
819     :param cidr: The expected CIDR value
820     :param exists: Whether or not the network name should exist or not
821     :return: True/False
822     """
823     subnet = neutron_utils.get_subnet_by_name(neutron, name)
824     if exists and subnet:
825         return subnet.cidr == cidr
826     if not exists and not subnet:
827         return True
828     return False
829
830
831 def validate_router(neutron, name, exists):
832     """
833     Returns true if a router for a given name DOES NOT exist if the exists
834     parameter is false conversely true. Returns false if a router for a given
835     name DOES exist if the exists parameter is true conversely false.
836     :param neutron: The neutron client
837     :param name: The expected router name
838     :param exists: Whether or not the network name should exist or not
839     :return: True/False
840     """
841     router = neutron_utils.get_router_by_name(neutron, name)
842     if exists and router:
843         return True
844     return False
845
846
847 def validate_interface_router(interface_router, router, subnet):
848     """
849     Returns true if the router ID & subnet ID have been properly included into
850     the interface router object
851     :param interface_router: the SNAPS-OO InterfaceRouter domain object
852     :param router: to validate against the interface_router
853     :param subnet: to validate against the interface_router
854     :return: True if both IDs match else False
855     """
856     subnet_id = interface_router.subnet_id
857     router_id = interface_router.port_id
858
859     return subnet.id == subnet_id and router.id == router_id
860
861
862 def validate_port(neutron, port_obj, this_port_name):
863     """
864     Returns true if a port for a given name DOES NOT exist if the exists
865     parameter is false conversely true. Returns false if a port for a given
866     name DOES exist if the exists parameter is true conversely false.
867     :param neutron: The neutron client
868     :param port_obj: The port object to lookup
869     :param this_port_name: The expected router name
870     :return: True/False
871     """
872     os_ports = neutron.list_ports()
873     for os_port, os_port_insts in os_ports.items():
874         for os_inst in os_port_insts:
875             if os_inst['id'] == port_obj.id:
876                 return os_inst['name'] == this_port_name
877     return False