Check a rule existence for a specific security group 27/32527/1
authortomsou <soth@intracom-telecom.com>
Mon, 27 Mar 2017 15:56:02 +0000 (15:56 +0000)
committerJose Lausuch <jose.lausuch@ericsson.com>
Thu, 30 Mar 2017 14:51:05 +0000 (14:51 +0000)
    Implement a functionality to check if a rule concerning
    a specific security group exists
    - function get_security_group_rules(neutron_client, sg_id)
      returns the list of the security rules for a specific security group
    - function check_security_group_rules(neutron_client,
                                          sg_id, direction,
                                          protocol,
                                          port_min=None,
                                          port_max=None)
      checks if a specific rule for a specific security group exists
      and returns True or False
    - implement unit tests for the two new functions

     This new functionality is needed for sdnvpn project

Change-Id: Ib930bc9a76141932f4164d88e2640b49f3df4d77
Signed-off-by: tomsou <soth@intracom-telecom.com>
(cherry picked from commit 41b103d9a6804a97ca85e2b09b628cea06219faf)

functest/tests/unit/utils/test_openstack_utils.py
functest/utils/openstack_utils.py

index 673ad5e..7f3995d 100644 (file)
@@ -229,6 +229,12 @@ class OSUtilsTesting(unittest.TestCase):
         self.sec_group = {'id': 'sec_group_id',
                           'name': 'test_sec_group'}
 
+        self.sec_group_rule = {'id': 'sec_group_rule_id',
+                               'direction': 'direction',
+                               'protocol': 'protocol',
+                               'port_range_max': 'port_max',
+                               'security_group_id': self.sec_group['id'],
+                               'port_range_min': 'port_min'}
         self.neutron_floatingip = {'id': 'fip_id',
                                    'floating_ip_address': 'test_ip'}
         self.neutron_client = mock.Mock()
@@ -260,6 +266,9 @@ class OSUtilsTesting(unittest.TestCase):
                  'show_bgpvpn.return_value': self.mock_return,
                  'list_security_groups.return_value': {'security_groups':
                                                        [self.sec_group]},
+                 'list_security_group_rules.'
+                 'return_value': {'security_group_rules':
+                                  [self.sec_group_rule]},
                  'create_security_group_rule.return_value': mock.Mock(),
                  'create_security_group.return_value': {'security_group':
                                                         self.sec_group},
@@ -1247,6 +1256,52 @@ class OSUtilsTesting(unittest.TestCase):
                                                    'test_sec_group'),
                              'sec_group_id')
 
+    def test_get_security_group_rules_default(self):
+        self.assertEqual(openstack_utils.
+                         get_security_group_rules(self.neutron_client,
+                                                  self.sec_group['id']),
+                         [self.sec_group_rule])
+
+    @mock.patch('functest.utils.openstack_utils.logger.error')
+    def test_get_security_group_rules_exception(self, mock_logger_error):
+        self.assertEqual(openstack_utils.
+                         get_security_group_rules(Exception,
+                                                  'sec_group_id'),
+                         None)
+        self.assertTrue(mock_logger_error.called)
+
+    def test_check_security_group_rules_not_exists(self):
+        self.assertEqual(openstack_utils.
+                         check_security_group_rules(self.neutron_client,
+                                                    'sec_group_id_2',
+                                                    'direction',
+                                                    'protocol',
+                                                    'port_min',
+                                                    'port_max'),
+                         True)
+
+    def test_check_security_group_rules_exists(self):
+        self.assertEqual(openstack_utils.
+                         check_security_group_rules(self.neutron_client,
+                                                    self.sec_group['id'],
+                                                    'direction',
+                                                    'protocol',
+                                                    'port_min',
+                                                    'port_max'),
+                         False)
+
+    @mock.patch('functest.utils.openstack_utils.logger.error')
+    def test_check_security_group_rules_exception(self, mock_logger_error):
+        self.assertEqual(openstack_utils.
+                         check_security_group_rules(Exception,
+                                                    'sec_group_id',
+                                                    'direction',
+                                                    'protocol',
+                                                    'port_max',
+                                                    'port_min'),
+                         None)
+        self.assertTrue(mock_logger_error.called)
+
     def test_create_security_group_default(self):
         self.assertEqual(openstack_utils.
                          create_security_group(self.neutron_client,
index ffc870f..4663f7b 100755 (executable)
@@ -1054,6 +1054,40 @@ def create_secgroup_rule(neutron_client, sg_id, direction, protocol,
         return False
 
 
+def get_security_group_rules(neutron_client, sg_id):
+    try:
+        security_rules = neutron_client.list_security_group_rules()[
+            'security_group_rules']
+        security_rules = [rule for rule in security_rules
+                          if rule["security_group_id"] == sg_id]
+        return security_rules
+    except Exception, e:
+        logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:"
+                     " %s" % e)
+        return None
+
+
+def check_security_group_rules(neutron_client, sg_id, direction, protocol,
+                               port_min=None, port_max=None):
+    try:
+        security_rules = get_security_group_rules(neutron_client, sg_id)
+        security_rules = [rule for rule in security_rules
+                          if (rule["direction"].lower() == direction
+                              and rule["protocol"].lower() == protocol
+                              and rule["port_range_min"] == port_min
+                              and rule["port_range_max"] == port_max)]
+        if len(security_rules) == 0:
+            return True
+        else:
+            return False
+    except Exception, e:
+        logger.error("Error [check_security_group_rules("
+                     " neutron_client, sg_id, direction,"
+                     " protocol, port_min=None, port_max=None)]: "
+                     "%s" % e)
+        return None
+
+
 def create_security_group_full(neutron_client,
                                sg_name, sg_description):
     sg_id = get_security_group_id(neutron_client, sg_name)