Expand OpenStackSecurityGroup class tests.
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from snaps.openstack import create_router
18 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
19     PortSettings
20 from snaps.openstack.create_security_group import SecurityGroupSettings, \
21     SecurityGroupRuleSettings, Direction
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
28
29 __author__ = 'spisarski'
30
31 ip_1 = '10.55.1.100'
32 ip_2 = '10.55.1.200'
33
34
35 class NeutronSmokeTests(OSComponentTestCase):
36     """
37     Tests to ensure that the neutron client can communicate with the cloud
38     """
39
40     def test_neutron_connect_success(self):
41         """
42         Tests to ensure that the proper credentials can connect.
43         """
44         neutron = neutron_utils.neutron_client(self.os_creds)
45
46         networks = neutron.list_networks()
47
48         found = False
49         networks = networks.get('networks')
50         for network in networks:
51             if network.get('name') == self.ext_net_name:
52                 found = True
53         self.assertTrue(found)
54
55     def test_neutron_connect_fail(self):
56         """
57         Tests to ensure that the improper credentials cannot connect.
58         """
59         from snaps.openstack.os_credentials import OSCreds
60
61         with self.assertRaises(Exception):
62             neutron = neutron_utils.neutron_client(
63                 OSCreds(username='user', password='pass', auth_url='url',
64                         project_name='project'))
65             neutron.list_networks()
66
67     def test_retrieve_ext_network_name(self):
68         """
69         Tests the neutron_utils.get_external_network_names to ensure the
70         configured self.ext_net_name is contained within the returned list
71         :return:
72         """
73         neutron = neutron_utils.neutron_client(self.os_creds)
74         ext_networks = neutron_utils.get_external_networks(neutron)
75         found = False
76         for network in ext_networks:
77             if network.name == self.ext_net_name:
78                 found = True
79                 break
80         self.assertTrue(found)
81
82
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
84     """
85     Test for creating networks via neutron_utils.py
86     """
87
88     def setUp(self):
89         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90         self.port_name = str(guid) + '-port'
91         self.neutron = neutron_utils.neutron_client(self.os_creds)
92         self.network = None
93         self.net_config = openstack_tests.get_pub_net_config(
94             net_name=guid + '-pub-net')
95
96     def tearDown(self):
97         """
98         Cleans the remote OpenStack objects
99         """
100         if self.network:
101             neutron_utils.delete_network(self.neutron, self.network)
102             validate_network(self.neutron, self.network.name, False)
103
104     def test_create_network(self):
105         """
106         Tests the neutron_utils.create_neutron_net() function
107         """
108         self.network = neutron_utils.create_network(
109             self.neutron, self.os_creds, self.net_config.network_settings)
110         self.assertEqual(self.net_config.network_settings.name,
111                          self.network.name)
112         self.assertTrue(validate_network(
113             self.neutron, self.net_config.network_settings.name, True))
114
115     def test_create_network_empty_name(self):
116         """
117         Tests the neutron_utils.create_neutron_net() function with an empty
118         network name
119         """
120         with self.assertRaises(Exception):
121             self.network = neutron_utils.create_network(
122                 self.neutron, self.os_creds,
123                 network_settings=NetworkSettings(name=''))
124
125     def test_create_network_null_name(self):
126         """
127         Tests the neutron_utils.create_neutron_net() function when the network
128         name is None
129         """
130         with self.assertRaises(Exception):
131             self.network = neutron_utils.create_network(
132                 self.neutron, self.os_creds,
133                 network_settings=NetworkSettings())
134
135
136 class NeutronUtilsSubnetTests(OSComponentTestCase):
137     """
138     Test for creating networks with subnets via neutron_utils.py
139     """
140
141     def setUp(self):
142         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
143         self.port_name = str(guid) + '-port'
144         self.neutron = neutron_utils.neutron_client(self.os_creds)
145         self.network = None
146         self.subnet = None
147         self.net_config = openstack_tests.get_pub_net_config(
148             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
149             external_net=self.ext_net_name)
150
151     def tearDown(self):
152         """
153         Cleans the remote OpenStack objects
154         """
155         if self.subnet:
156             neutron_utils.delete_subnet(self.neutron, self.subnet)
157             validate_subnet(self.neutron, self.subnet.name,
158                             self.net_config.network_settings.subnet_settings[
159                                 0].cidr, False)
160
161         if self.network:
162             neutron_utils.delete_network(self.neutron, self.network)
163             validate_network(self.neutron, self.network.name, False)
164
165     def test_create_subnet(self):
166         """
167         Tests the neutron_utils.create_neutron_net() function
168         """
169         self.network = neutron_utils.create_network(
170             self.neutron, self.os_creds, self.net_config.network_settings)
171         self.assertEqual(self.net_config.network_settings.name,
172                          self.network.name)
173         self.assertTrue(validate_network(
174             self.neutron, self.net_config.network_settings.name, True))
175
176         subnet_setting = self.net_config.network_settings.subnet_settings[0]
177         self.subnet = neutron_utils.create_subnet(
178             self.neutron, subnet_setting, self.os_creds, network=self.network)
179         validate_subnet(
180             self.neutron, subnet_setting.name, subnet_setting.cidr, True)
181
182     def test_create_subnet_null_name(self):
183         """
184         Tests the neutron_utils.create_neutron_subnet() function for an
185         Exception when the subnet name is None
186         """
187         self.network = neutron_utils.create_network(
188             self.neutron, self.os_creds, self.net_config.network_settings)
189         self.assertEqual(self.net_config.network_settings.name,
190                          self.network.name)
191         self.assertTrue(validate_network(
192             self.neutron, self.net_config.network_settings.name, True))
193
194         with self.assertRaises(Exception):
195             SubnetSettings(cidr=self.net_config.subnet_cidr)
196
197     def test_create_subnet_empty_name(self):
198         """
199         Tests the neutron_utils.create_neutron_net() function with an empty
200         name
201         """
202         self.network = neutron_utils.create_network(
203             self.neutron, self.os_creds, self.net_config.network_settings)
204         self.assertEqual(self.net_config.network_settings.name,
205                          self.network.name)
206         self.assertTrue(validate_network(
207             self.neutron, self.net_config.network_settings.name, True))
208
209         subnet_setting = self.net_config.network_settings.subnet_settings[0]
210         neutron_utils.create_subnet(
211             self.neutron, subnet_setting, self.os_creds, network=self.network)
212         validate_subnet(self.neutron, '', subnet_setting.cidr, True)
213
214     def test_create_subnet_null_cidr(self):
215         """
216         Tests the neutron_utils.create_neutron_subnet() function for an
217         Exception when the subnet CIDR value is None
218         """
219         self.network = neutron_utils.create_network(
220             self.neutron, self.os_creds, self.net_config.network_settings)
221         self.assertEqual(self.net_config.network_settings.name,
222                          self.network.name)
223         self.assertTrue(validate_network(
224             self.neutron, self.net_config.network_settings.name, True))
225
226         with self.assertRaises(Exception):
227             sub_sets = SubnetSettings(
228                 cidr=None, name=self.net_config.subnet_name)
229             neutron_utils.create_subnet(
230                 self.neutron, sub_sets, self.os_creds, network=self.network)
231
232     def test_create_subnet_empty_cidr(self):
233         """
234         Tests the neutron_utils.create_neutron_subnet() function for an
235         Exception when the subnet CIDR value is empty
236         """
237         self.network = neutron_utils.create_network(
238             self.neutron, self.os_creds, self.net_config.network_settings)
239         self.assertEqual(self.net_config.network_settings.name,
240                          self.network.name)
241         self.assertTrue(validate_network(
242             self.neutron, self.net_config.network_settings.name, True))
243
244         with self.assertRaises(Exception):
245             sub_sets = SubnetSettings(
246                 cidr='', name=self.net_config.subnet_name)
247             neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
248                                         network=self.network)
249
250
251 class NeutronUtilsRouterTests(OSComponentTestCase):
252     """
253     Test for creating routers via neutron_utils.py
254     """
255
256     def setUp(self):
257         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
258         self.port_name = str(guid) + '-port'
259         self.neutron = neutron_utils.neutron_client(self.os_creds)
260         self.network = None
261         self.subnet = None
262         self.port = None
263         self.router = None
264         self.interface_router = None
265         self.net_config = openstack_tests.get_pub_net_config(
266             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
267             router_name=guid + '-pub-router', external_net=self.ext_net_name)
268
269     def tearDown(self):
270         """
271         Cleans the remote OpenStack objects
272         """
273         if self.interface_router:
274             neutron_utils.remove_interface_router(self.neutron, self.router,
275                                                   self.subnet)
276
277         if self.router:
278             neutron_utils.delete_router(self.neutron, self.router)
279             validate_router(self.neutron, self.router.name, False)
280
281         if self.port:
282             neutron_utils.delete_port(self.neutron, self.port)
283
284         if self.subnet:
285             neutron_utils.delete_subnet(self.neutron, self.subnet)
286             validate_subnet(
287                 self.neutron, self.subnet.name,
288                 self.net_config.network_settings.subnet_settings[0].cidr,
289                 False)
290
291         if self.network:
292             neutron_utils.delete_network(self.neutron, self.network)
293             validate_network(self.neutron, self.network.name, False)
294
295     def test_create_router_simple(self):
296         """
297         Tests the neutron_utils.create_neutron_net() function when an external
298         gateway is requested
299         """
300         self.router = neutron_utils.create_router(
301             self.neutron, self.os_creds, self.net_config.router_settings)
302         validate_router(self.neutron, self.net_config.router_settings.name,
303                         True)
304
305     def test_create_router_with_public_interface(self):
306         """
307         Tests the neutron_utils.create_neutron_net() function when an external
308         gateway is requested
309         """
310         subnet_setting = self.net_config.network_settings.subnet_settings[0]
311         self.net_config = openstack_tests.OSNetworkConfig(
312             self.net_config.network_settings.name,
313             subnet_setting.name,
314             subnet_setting.cidr,
315             self.net_config.router_settings.name,
316             self.ext_net_name)
317         self.router = neutron_utils.create_router(
318             self.neutron, self.os_creds, self.net_config.router_settings)
319         validate_router(self.neutron, self.net_config.router_settings.name,
320                         True)
321
322         ext_net = neutron_utils.get_network(self.neutron, self.ext_net_name)
323         self.assertEqual(
324             self.router.external_gateway_info['network_id'], ext_net.id)
325
326     def test_create_router_empty_name(self):
327         """
328         Tests the neutron_utils.create_neutron_net() function
329         """
330         with self.assertRaises(Exception):
331             this_router_settings = create_router.RouterSettings(name='')
332             self.router = neutron_utils.create_router(self.neutron,
333                                                       self.os_creds,
334                                                       this_router_settings)
335
336     def test_create_router_null_name(self):
337         """
338         Tests the neutron_utils.create_neutron_subnet() function when the
339         subnet CIDR value is None
340         """
341         with self.assertRaises(Exception):
342             this_router_settings = create_router.RouterSettings()
343             self.router = neutron_utils.create_router(self.neutron,
344                                                       self.os_creds,
345                                                       this_router_settings)
346             validate_router(self.neutron, None, True)
347
348     def test_add_interface_router(self):
349         """
350         Tests the neutron_utils.add_interface_router() function
351         """
352         self.network = neutron_utils.create_network(
353             self.neutron, self.os_creds, self.net_config.network_settings)
354         self.assertEqual(self.net_config.network_settings.name,
355                          self.network.name)
356         self.assertTrue(validate_network(
357             self.neutron, self.net_config.network_settings.name, True))
358
359         subnet_setting = self.net_config.network_settings.subnet_settings[0]
360         self.subnet = neutron_utils.create_subnet(
361             self.neutron, subnet_setting,
362             self.os_creds, self.network)
363         validate_subnet(
364             self.neutron,
365             subnet_setting.name,
366             subnet_setting.cidr, True)
367
368         self.router = neutron_utils.create_router(
369             self.neutron, self.os_creds, self.net_config.router_settings)
370         validate_router(self.neutron, self.net_config.router_settings.name,
371                         True)
372
373         self.interface_router = neutron_utils.add_interface_router(
374             self.neutron, self.router, self.subnet)
375         validate_interface_router(self.interface_router, self.router,
376                                   self.subnet)
377
378     def test_add_interface_router_null_router(self):
379         """
380         Tests the neutron_utils.add_interface_router() function for an
381         Exception when the router value is None
382         """
383         self.network = neutron_utils.create_network(
384             self.neutron, self.os_creds, self.net_config.network_settings)
385         self.assertEqual(self.net_config.network_settings.name,
386                          self.network.name)
387         self.assertTrue(validate_network(
388             self.neutron, self.net_config.network_settings.name, True))
389
390         subnet_setting = self.net_config.network_settings.subnet_settings[0]
391         self.subnet = neutron_utils.create_subnet(
392             self.neutron, subnet_setting,
393             self.os_creds, self.network)
394         validate_subnet(
395             self.neutron, subnet_setting.name, subnet_setting.cidr, True)
396
397         with self.assertRaises(NeutronException):
398             self.interface_router = neutron_utils.add_interface_router(
399                 self.neutron, self.router, self.subnet)
400
401     def test_add_interface_router_null_subnet(self):
402         """
403         Tests the neutron_utils.add_interface_router() function for an
404         Exception when the subnet value is None
405         """
406         self.network = neutron_utils.create_network(
407             self.neutron, self.os_creds, self.net_config.network_settings)
408         self.assertEqual(self.net_config.network_settings.name,
409                          self.network.name)
410         self.assertTrue(validate_network(
411             self.neutron, self.net_config.network_settings.name, True))
412
413         self.router = neutron_utils.create_router(
414             self.neutron, self.os_creds, self.net_config.router_settings)
415         validate_router(self.neutron, self.net_config.router_settings.name,
416                         True)
417
418         with self.assertRaises(NeutronException):
419             self.interface_router = neutron_utils.add_interface_router(
420                 self.neutron, self.router, self.subnet)
421
422     def test_create_port(self):
423         """
424         Tests the neutron_utils.create_port() function
425         """
426         self.network = neutron_utils.create_network(
427             self.neutron, self.os_creds, self.net_config.network_settings)
428         self.assertEqual(self.net_config.network_settings.name,
429                          self.network.name)
430         self.assertTrue(validate_network(
431             self.neutron, self.net_config.network_settings.name, True))
432
433         subnet_setting = self.net_config.network_settings.subnet_settings[0]
434         self.subnet = neutron_utils.create_subnet(
435             self.neutron, subnet_setting, self.os_creds, self.network)
436         validate_subnet(self.neutron, subnet_setting.name,
437                         subnet_setting.cidr, True)
438
439         self.port = neutron_utils.create_port(
440             self.neutron, self.os_creds, PortSettings(
441                 name=self.port_name,
442                 ip_addrs=[{
443                     'subnet_name': subnet_setting.name,
444                     'ip': ip_1}],
445                 network_name=self.net_config.network_settings.name))
446         validate_port(self.neutron, self.port, self.port_name)
447
448     def test_create_port_empty_name(self):
449         """
450         Tests the neutron_utils.create_port() function
451         """
452         self.network = neutron_utils.create_network(
453             self.neutron, self.os_creds, self.net_config.network_settings)
454         self.assertEqual(self.net_config.network_settings.name,
455                          self.network.name)
456         self.assertTrue(validate_network(
457             self.neutron, self.net_config.network_settings.name, True))
458
459         subnet_setting = self.net_config.network_settings.subnet_settings[0]
460         self.subnet = neutron_utils.create_subnet(
461             self.neutron, subnet_setting, self.os_creds, self.network)
462         validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr,
463                         True)
464
465         self.port = neutron_utils.create_port(
466             self.neutron, self.os_creds, PortSettings(
467                 name=self.port_name,
468                 network_name=self.net_config.network_settings.name,
469                 ip_addrs=[{
470                     'subnet_name': subnet_setting.name,
471                     'ip': ip_1}]))
472         validate_port(self.neutron, self.port, self.port_name)
473
474     def test_create_port_null_name(self):
475         """
476         Tests the neutron_utils.create_port() function for an Exception when
477         the port name value is None
478         """
479         self.network = neutron_utils.create_network(
480             self.neutron, self.os_creds, self.net_config.network_settings)
481         self.assertEqual(self.net_config.network_settings.name,
482                          self.network.name)
483         self.assertTrue(validate_network(
484             self.neutron, self.net_config.network_settings.name, True))
485
486         subnet_setting = self.net_config.network_settings.subnet_settings[0]
487         self.subnet = neutron_utils.create_subnet(
488             self.neutron, subnet_setting,
489             self.os_creds, self.network)
490         validate_subnet(
491             self.neutron,
492             subnet_setting.name,
493             subnet_setting.cidr, True)
494
495         with self.assertRaises(Exception):
496             self.port = neutron_utils.create_port(
497                 self.neutron, self.os_creds,
498                 PortSettings(
499                     network_name=self.net_config.network_settings.name,
500                     ip_addrs=[{
501                         'subnet_name': subnet_setting.name,
502                         'ip': ip_1}]))
503
504     def test_create_port_null_network_object(self):
505         """
506         Tests the neutron_utils.create_port() function for an Exception when
507         the network object is None
508         """
509         with self.assertRaises(Exception):
510             self.port = neutron_utils.create_port(
511                 self.neutron, self.os_creds,
512                 PortSettings(
513                     name=self.port_name,
514                     network_name=self.net_config.network_settings.name,
515                     ip_addrs=[{
516                         'subnet_name':
517                             self.net_config.network_settings.subnet_settings[
518                                 0].name,
519                         'ip': ip_1}]))
520
521     def test_create_port_null_ip(self):
522         """
523         Tests the neutron_utils.create_port() function for an Exception when
524         the IP value is None
525         """
526         self.network = neutron_utils.create_network(
527             self.neutron, self.os_creds, self.net_config.network_settings)
528         self.assertEqual(self.net_config.network_settings.name,
529                          self.network.name)
530         self.assertTrue(validate_network(
531             self.neutron, self.net_config.network_settings.name, True))
532
533         subnet_setting = self.net_config.network_settings.subnet_settings[0]
534         self.subnet = neutron_utils.create_subnet(
535             self.neutron, subnet_setting,
536             self.os_creds, self.network)
537         validate_subnet(
538             self.neutron,
539             subnet_setting.name,
540             subnet_setting.cidr, True)
541
542         with self.assertRaises(Exception):
543             self.port = neutron_utils.create_port(
544                 self.neutron, self.os_creds,
545                 PortSettings(
546                     name=self.port_name,
547                     network_name=self.net_config.network_settings.name,
548                     ip_addrs=[{
549                         'subnet_name': subnet_setting.name,
550                         'ip': None}]))
551
552     def test_create_port_invalid_ip(self):
553         """
554         Tests the neutron_utils.create_port() function for an Exception when
555         the IP value is None
556         """
557         self.network = neutron_utils.create_network(
558             self.neutron, self.os_creds, self.net_config.network_settings)
559         self.assertEqual(self.net_config.network_settings.name,
560                          self.network.name)
561         self.assertTrue(validate_network(
562             self.neutron, self.net_config.network_settings.name, True))
563
564         subnet_setting = self.net_config.network_settings.subnet_settings[0]
565         self.subnet = neutron_utils.create_subnet(
566             self.neutron, subnet_setting, self.os_creds, self.network)
567         validate_subnet(self.neutron,
568                         subnet_setting.name,
569                         subnet_setting.cidr, True)
570
571         with self.assertRaises(Exception):
572             self.port = neutron_utils.create_port(
573                 self.neutron, self.os_creds,
574                 PortSettings(
575                     name=self.port_name,
576                     network_name=self.net_config.network_settings.name,
577                     ip_addrs=[{
578                         'subnet_name': subnet_setting.name,
579                         'ip': 'foo'}]))
580
581     def test_create_port_invalid_ip_to_subnet(self):
582         """
583         Tests the neutron_utils.create_port() function for an Exception when
584         the IP value is None
585         """
586         self.network = neutron_utils.create_network(
587             self.neutron, self.os_creds, self.net_config.network_settings)
588         self.assertEqual(self.net_config.network_settings.name,
589                          self.network.name)
590         self.assertTrue(validate_network(
591             self.neutron, self.net_config.network_settings.name, True))
592
593         subnet_setting = self.net_config.network_settings.subnet_settings[0]
594         self.subnet = neutron_utils.create_subnet(
595             self.neutron, subnet_setting, self.os_creds, self.network)
596         validate_subnet(
597             self.neutron, subnet_setting.name, subnet_setting.cidr, True)
598
599         with self.assertRaises(Exception):
600             self.port = neutron_utils.create_port(
601                 self.neutron, self.os_creds,
602                 PortSettings(
603                     name=self.port_name,
604                     network_name=self.net_config.network_settings.name,
605                     ip_addrs=[{
606                         'subnet_name': subnet_setting.name,
607                         'ip': '10.197.123.100'}]))
608
609
610 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
611     """
612     Test for creating security groups via neutron_utils.py
613     """
614
615     def setUp(self):
616         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
617         self.sec_grp_name = guid + 'name'
618
619         self.security_groups = list()
620         self.security_group_rules = list()
621         self.neutron = neutron_utils.neutron_client(self.os_creds)
622         self.keystone = keystone_utils.keystone_client(self.os_creds)
623
624     def tearDown(self):
625         """
626         Cleans the remote OpenStack objects
627         """
628         for rule in self.security_group_rules:
629             neutron_utils.delete_security_group_rule(self.neutron, rule)
630
631         for security_group in self.security_groups:
632             try:
633                 neutron_utils.delete_security_group(self.neutron,
634                                                     security_group)
635             except:
636                 pass
637
638     def test_create_delete_simple_sec_grp(self):
639         """
640         Tests the neutron_utils.create_security_group() function
641         """
642         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
643         security_group = neutron_utils.create_security_group(self.neutron,
644                                                              self.keystone,
645                                                              sec_grp_settings)
646
647         self.assertTrue(sec_grp_settings.name, security_group.name)
648
649         sec_grp_get = neutron_utils.get_security_group(self.neutron,
650                                                        sec_grp_settings.name)
651         self.assertIsNotNone(sec_grp_get)
652         self.assertTrue(validation_utils.objects_equivalent(
653             security_group, sec_grp_get))
654
655         neutron_utils.delete_security_group(self.neutron, security_group)
656         sec_grp_get = neutron_utils.get_security_group(self.neutron,
657                                                        sec_grp_settings.name)
658         self.assertIsNone(sec_grp_get)
659
660     def test_create_sec_grp_no_name(self):
661         """
662         Tests the SecurityGroupSettings constructor and
663         neutron_utils.create_security_group() function to ensure that
664         attempting to create a security group without a name will raise an
665         exception
666         """
667         with self.assertRaises(Exception):
668             sec_grp_settings = SecurityGroupSettings()
669             self.security_groups.append(
670                 neutron_utils.create_security_group(self.neutron,
671                                                     self.keystone,
672                                                     sec_grp_settings))
673
674     def test_create_sec_grp_no_rules(self):
675         """
676         Tests the neutron_utils.create_security_group() function
677         """
678         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
679                                                  description='hello group')
680         self.security_groups.append(
681             neutron_utils.create_security_group(self.neutron, self.keystone,
682                                                 sec_grp_settings))
683
684         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
685         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
686
687         sec_grp_get = neutron_utils.get_security_group(self.neutron,
688                                                        sec_grp_settings.name)
689         self.assertIsNotNone(sec_grp_get)
690         self.assertEqual(self.security_groups[0], sec_grp_get)
691
692     def test_create_sec_grp_one_rule(self):
693         """
694         Tests the neutron_utils.create_security_group() function
695         """
696
697         sec_grp_rule_settings = SecurityGroupRuleSettings(
698             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
699         sec_grp_settings = SecurityGroupSettings(
700             name=self.sec_grp_name, description='hello group',
701             rule_settings=[sec_grp_rule_settings])
702
703         self.security_groups.append(
704             neutron_utils.create_security_group(self.neutron, self.keystone,
705                                                 sec_grp_settings))
706         free_rules = neutron_utils.get_rules_by_security_group(
707             self.neutron, self.security_groups[0])
708         for free_rule in free_rules:
709             self.security_group_rules.append(free_rule)
710
711         self.security_group_rules.append(
712             neutron_utils.create_security_group_rule(
713                 self.neutron, sec_grp_settings.rule_settings[0]))
714
715         # Refresh object so it is populated with the newly added rule
716         security_group = neutron_utils.get_security_group(
717             self.neutron, sec_grp_settings.name)
718
719         rules = neutron_utils.get_rules_by_security_group(self.neutron,
720                                                           security_group)
721
722         self.assertTrue(
723             validation_utils.objects_equivalent(
724                  self.security_group_rules, rules))
725
726         self.assertTrue(sec_grp_settings.name, security_group.name)
727
728         sec_grp_get = neutron_utils.get_security_group(self.neutron,
729                                                        sec_grp_settings.name)
730         self.assertIsNotNone(sec_grp_get)
731         self.assertEqual(security_group, sec_grp_get)
732
733     def test_get_sec_grp_by_id(self):
734         """
735         Tests the neutron_utils.create_security_group() function
736         """
737
738         self.security_groups.append(neutron_utils.create_security_group(
739             self.neutron, self.keystone,
740             SecurityGroupSettings(name=self.sec_grp_name + '-1',
741                                   description='hello group')))
742         self.security_groups.append(neutron_utils.create_security_group(
743             self.neutron, self.keystone,
744             SecurityGroupSettings(name=self.sec_grp_name + '-2',
745                                   description='hello group')))
746
747         sec_grp_1b = neutron_utils.get_security_group_by_id(
748             self.neutron, self.security_groups[0].id)
749         sec_grp_2b = neutron_utils.get_security_group_by_id(
750             self.neutron, self.security_groups[1].id)
751
752         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
753         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
754
755
756 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
757     """
758     Test basic nova keypair functionality
759     """
760
761     def setUp(self):
762         """
763         Instantiates the CreateImage object that is responsible for downloading
764         and creating an OS image file within OpenStack
765         """
766         self.neutron = neutron_utils.neutron_client(self.os_creds)
767         self.floating_ip = None
768
769     def tearDown(self):
770         """
771         Cleans the image and downloaded image file
772         """
773         if self.floating_ip:
774             neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
775
776     def test_floating_ips(self):
777         """
778         Tests the creation of a floating IP
779         :return:
780         """
781         initial_fips = neutron_utils.get_floating_ips(self.neutron)
782
783         self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
784                                                             self.ext_net_name)
785         all_fips = neutron_utils.get_floating_ips(self.neutron)
786         self.assertEqual(len(initial_fips) + 1, len(all_fips))
787         returned = neutron_utils.get_floating_ip(self.neutron,
788                                                  self.floating_ip)
789         self.assertEqual(self.floating_ip.id, returned.id)
790         self.assertEqual(self.floating_ip.ip, returned.ip)
791
792
793 """
794 Validation routines
795 """
796
797
798 def validate_network(neutron, name, exists):
799     """
800     Returns true if a network for a given name DOES NOT exist if the exists
801     parameter is false conversely true. Returns false if a network for a given
802     name DOES exist if the exists parameter is true conversely false.
803     :param neutron: The neutron client
804     :param name: The expected network name
805     :param exists: Whether or not the network name should exist or not
806     :return: True/False
807     """
808     network = neutron_utils.get_network(neutron, name)
809     if exists and network:
810         return True
811     if not exists and not network:
812         return True
813     return False
814
815
816 def validate_subnet(neutron, name, cidr, exists):
817     """
818     Returns true if a subnet for a given name DOES NOT exist if the exists
819     parameter is false conversely true. Returns false if a subnet for a given
820     name DOES exist if the exists parameter is true conversely false.
821     :param neutron: The neutron client
822     :param name: The expected subnet name
823     :param cidr: The expected CIDR value
824     :param exists: Whether or not the network name should exist or not
825     :return: True/False
826     """
827     subnet = neutron_utils.get_subnet_by_name(neutron, name)
828     if exists and subnet:
829         return subnet.cidr == cidr
830     if not exists and not subnet:
831         return True
832     return False
833
834
835 def validate_router(neutron, name, exists):
836     """
837     Returns true if a router for a given name DOES NOT exist if the exists
838     parameter is false conversely true. Returns false if a router for a given
839     name DOES exist if the exists parameter is true conversely false.
840     :param neutron: The neutron client
841     :param name: The expected router name
842     :param exists: Whether or not the network name should exist or not
843     :return: True/False
844     """
845     router = neutron_utils.get_router_by_name(neutron, name)
846     if exists and router:
847         return True
848     return False
849
850
851 def validate_interface_router(interface_router, router, subnet):
852     """
853     Returns true if the router ID & subnet ID have been properly included into
854     the interface router object
855     :param interface_router: the SNAPS-OO InterfaceRouter domain object
856     :param router: to validate against the interface_router
857     :param subnet: to validate against the interface_router
858     :return: True if both IDs match else False
859     """
860     subnet_id = interface_router.subnet_id
861     router_id = interface_router.port_id
862
863     return subnet.id == subnet_id and router.id == router_id
864
865
866 def validate_port(neutron, port_obj, this_port_name):
867     """
868     Returns true if a port for a given name DOES NOT exist if the exists
869     parameter is false conversely true. Returns false if a port for a given
870     name DOES exist if the exists parameter is true conversely false.
871     :param neutron: The neutron client
872     :param port_obj: The port object to lookup
873     :param this_port_name: The expected router name
874     :return: True/False
875     """
876     os_ports = neutron.list_ports()
877     for os_port, os_port_insts in os_ports.items():
878         for os_inst in os_port_insts:
879             if os_inst['id'] == port_obj.id:
880                 return os_inst['name'] == this_port_name
881     return False