Add unit tests for LogManager and fix some bugs. 73/973/1
authorasteroide <thomas.duval@orange.com>
Tue, 7 Jul 2015 12:11:51 +0000 (14:11 +0200)
committerasteroide <thomas.duval@orange.com>
Tue, 7 Jul 2015 12:11:51 +0000 (14:11 +0200)
Change-Id: I42dfbdf8f83928e6437dba7026d1a2905d46ab44

keystone-moon/keystone/contrib/moon/backends/flat.py
keystone-moon/keystone/contrib/moon/core.py
keystone-moon/keystone/tests/moon/unit/test_unit_core_log.py

index 6d18d3e..83ed2a0 100644 (file)
@@ -20,84 +20,75 @@ CONF = config.CONF
 class LogConnector(LogDriver):
 
     AUTHZ_FILE = '/var/log/moon/authz.log'
+    SYS_FILE = '/var/log/moon/system.log'
     TIME_FORMAT = '%Y-%m-%d-%H:%M:%S'
 
     def __init__(self):
         # Fixme (dthom): when logging from an other class, the %appname% in the event
         # is always keystone.contrib.moon.backends.flat
         super(LogConnector, self).__init__()
-        # Configure Log to add new files in /var/log/moon/authz.log and /var/log/moon/system.log
-        self.LOG = log.getLogger(__name__)
+
+        self.SYS_LOG = logging.getLogger(__name__)
+        if not len(self.SYS_LOG.handlers):
+            fh = logging.FileHandler(self.SYS_FILE)
+            fh.setLevel(logging.DEBUG)
+            formatter = logging.Formatter('%(asctime)s ------ %(message)s', self.TIME_FORMAT)
+            fh.setFormatter(formatter)
+            self.SYS_LOG.addHandler(fh)
+
         self.AUTHZ_LOG = logging.getLogger("authz")
-        self.AUTHZ_LOG.setLevel(logging.WARNING)
-        fh = logging.FileHandler(self.AUTHZ_FILE)
-        fh.setLevel(logging.WARNING)
-        formatter = logging.Formatter('%(asctime)s ------ %(message)s', self.TIME_FORMAT)
-        fh.setFormatter(formatter)
-        self.AUTHZ_LOG.addHandler(fh)
+        if not len(self.AUTHZ_LOG.handlers):
+            fh = logging.FileHandler(self.AUTHZ_FILE)
+            fh.setLevel(logging.WARNING)
+            formatter = logging.Formatter('%(asctime)s ------ %(message)s', self.TIME_FORMAT)
+            fh.setFormatter(formatter)
+            self.AUTHZ_LOG.addHandler(fh)
 
     def authz(self, message):
         self.AUTHZ_LOG.warn(message)
 
     def debug(self, message):
-        self.LOG.debug(message)
+        self.SYS_LOG.debug(message)
 
     def info(self, message):
-        self.LOG.info(message)
+        self.SYS_LOG.info(message)
 
     def warning(self, message):
-        self.LOG.warning(message)
+        self.SYS_LOG.warning(message)
 
     def error(self, message):
-        self.LOG.error(message)
+        self.SYS_LOG.error(message)
 
     def critical(self, message):
-        self.LOG.critical(message)
-
-    def get_logs(self, options):
-        options = options.split(",")
-        self.info("Options of logs check : {}".format(options))
-        event_number = None
-        time_from = None
-        time_to = None
-        filter_str = None
-        for opt in options:
-            if "event_number" in opt:
-                event_number = "".join(re.findall("\d*", opt.split("=")[-1]))
-                try:
-                    event_number = int(event_number)
-                except ValueError:
-                    event_number = None
-            elif "from" in opt:
-                time_from = "".join(re.findall("[\w\-:]*", opt.split("=")[-1]))
-                try:
-                    time_from = time.strptime(time_from, self.TIME_FORMAT)
-                except ValueError:
-                    time_from = None
-            elif "to" in opt:
-                time_to = "".join(re.findall("[\w\-:] *", opt.split("=")[-1]))
-                try:
-                    time_to = time.strptime(time_to, self.TIME_FORMAT)
-                except ValueError:
-                    time_to = None
-            elif "filter" in opt:
-                filter_str = "".join(re.findall("\w*", opt.split("=")[-1]))
-        _logs = open(self.AUTHZ_FILE).readlines()
+        self.SYS_LOG.critical(message)
+
+    def get_logs(self, logger="authz", event_number=None, time_from=None, time_to=None, filter_str=None):
+        if logger == "authz":
+            _logs = open(self.AUTHZ_FILE).readlines()
+        else:
+            _logs = open(self.SYS_FILE).readlines()
         if filter_str:
             _logs = filter(lambda x: filter_str in x, _logs)
-        self.info("Options of logs check : {} {} {} {}".format(event_number, time_from, time_to, filter_str))
         if time_from:
+            time_from = time.strptime(time_from.split(" ")[0], self.TIME_FORMAT)
             try:
+                __logs = []
                 for log in _logs:
-                    __logs = filter(lambda x: time_from <= time.strptime(x.split(" ")[0], self.TIME_FORMAT), _logs)
-                    _logs = __logs
+                    _log = time.strptime(log.split(" ")[0], self.TIME_FORMAT)
+                    if time_from <= _log:
+                        __logs.append(log)
+                _logs = __logs
             except ValueError:
                 self.error("Time format error")
         if time_to:
             try:
+                time_to = time.strptime(time_to.split(" ")[0], self.TIME_FORMAT)
+                __logs = []
                 for log in _logs:
-                    __logs = filter(lambda x: time_to >= time.strptime(x.split(" ")[0], self.TIME_FORMAT), _logs)
-                    _logs = __logs
+                    _log = time.strptime(log.split(" ")[0], self.TIME_FORMAT)
+                    if time_to >= _log:
+                        __logs.append(log)
+                _logs = __logs
             except ValueError:
                 self.error("Time format error")
         if event_number:
@@ -120,4 +111,4 @@ class LogConnector(LogDriver):
 #
 #     def admin(self, sub, obj, act):
 #         # return self.__super_extension.authz(sub, obj, act)
-#         return True
\ No newline at end of file
+#         return True
index 69e8585..4d19e7c 100644 (file)
@@ -9,6 +9,7 @@ import json
 import copy
 import re
 import six
+import time
 
 from keystone.common import manager
 from keystone import config
@@ -114,8 +115,54 @@ class LogManager(manager.Manager):
         driver = CONF.moon.log_driver
         super(LogManager, self).__init__(driver)
 
-    def get_logs(self, options):
-        return self.driver.get_logs(options)
+    def get_logs(self, logger="authz", options="", event_number=None, time_from=None, time_to=None, filter_str=None):
+
+        if len(options) > 0:
+            options = options.split(",")
+            event_number = None
+            time_from = None
+            time_to = None
+            filter_str = None
+            for opt in options:
+                if "event_number" in opt:
+                    event_number = "".join(re.findall("\d*", opt.split("=")[-1]))
+                    try:
+                        event_number = int(event_number)
+                    except ValueError:
+                        event_number = None
+                elif "from" in opt:
+                    time_from = "".join(re.findall("[\w\-:]*", opt.split("=")[-1]))
+                    try:
+                        time_from = time.strptime(time_from, self.TIME_FORMAT)
+                    except ValueError:
+                        time_from = None
+                elif "to" in opt:
+                    time_to = "".join(re.findall("[\w\-:] *", opt.split("=")[-1]))
+                    try:
+                        time_to = time.strptime(time_to, self.TIME_FORMAT)
+                    except ValueError:
+                        time_to = None
+                elif "filter" in opt:
+                    filter_str = "".join(re.findall("\w*", opt.split("=")[-1]))
+        return self.driver.get_logs(logger, event_number, time_from, time_to, filter_str)
+
+    def get_authz_logs(self, options="", event_number=None, time_from=None, time_to=None, filter_str=None):
+        return self.get_logs(
+            logger="authz",
+            options="",
+            event_number=None,
+            time_from=None,
+            time_to=None,
+            filter_str=None)
+
+    def get_sys_logs(self, options="", event_number=None, time_from=None, time_to=None, filter_str=None):
+        return self.get_logs(
+            logger="sys",
+            options="",
+            event_number=None,
+            time_from=None,
+            time_to=None,
+            filter_str=None)
 
     def authz(self, message):
         return self.driver.authz(message)
index 1b678d5..49f6812 100644 (file)
@@ -2,3 +2,170 @@
 # This software is distributed under the terms and conditions of the 'Apache-2.0'
 # license which can be found in the file 'LICENSE' in this package distribution
 # or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+"""Unit tests for LogManager"""
+
+import json
+import os
+import uuid
+import time
+from oslo_config import cfg
+from keystone.tests import unit as tests
+from keystone.contrib.moon.core import IntraExtensionAdminManager
+from keystone.tests.unit.ksfixtures import database
+from keystone import resource
+from keystone.contrib.moon.exception import *
+from keystone.tests.unit import default_fixtures
+from keystone.contrib.moon.core import LogManager, TenantManager
+
+CONF = cfg.CONF
+
+USER_ADMIN = {
+    'name': 'admin',
+    'domain_id': "default",
+    'password': 'admin'
+}
+
+IE = {
+    "name": "test IE",
+    "policymodel": "policy_rbac_authz",
+    "description": "a simple description."
+}
+
+TIME_FORMAT = '%Y-%m-%d-%H:%M:%S'
+
+class TestIntraExtensionAdminManager(tests.TestCase):
+
+    def setUp(self):
+        self.useFixture(database.Database())
+        super(TestIntraExtensionAdminManager, self).setUp()
+        self.load_backends()
+        self.load_fixtures(default_fixtures)
+        self.manager = IntraExtensionAdminManager()
+
+    def __get_key_from_value(self, value, values_dict):
+        return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0]
+
+    def load_extra_backends(self):
+        return {
+            "moonlog_api": LogManager(),
+            "tenant_api": TenantManager(),
+            # "resource_api": resource.Manager(),
+        }
+
+    def config_overrides(self):
+        super(TestIntraExtensionAdminManager, self).config_overrides()
+        self.policy_directory = 'examples/moon/policies'
+        self.config_fixture.config(
+            group='moon',
+            intraextension_driver='keystone.contrib.moon.backends.sql.IntraExtensionConnector')
+        self.config_fixture.config(
+            group='moon',
+            policy_directory=self.policy_directory)
+
+    def create_intra_extension(self, policy_model="policy_rbac_admin"):
+        # Create the admin user because IntraExtension needs it
+        self.admin = self.identity_api.create_user(USER_ADMIN)
+        IE["policymodel"] = policy_model
+        self.ref = self.manager.load_intra_extension(IE)
+        self.assertIsInstance(self.ref, dict)
+        self.create_tenant(self.ref["id"])
+
+    def create_tenant(self, authz_uuid):
+        tenant = {
+            "id": uuid.uuid4().hex,
+            "name": "TestAuthzIntraExtensionManager",
+            "enabled": True,
+            "description": "",
+            "domain_id": "default"
+        }
+        project = self.resource_api.create_project(tenant["id"], tenant)
+        mapping = self.tenant_api.set_tenant_dict(project["id"], project["name"], authz_uuid, None)
+        self.assertIsInstance(mapping, dict)
+        self.assertIn("authz", mapping)
+        self.assertEqual(mapping["authz"], authz_uuid)
+        return mapping
+
+    def create_user(self, username="TestAdminIntraExtensionManagerUser"):
+        user = {
+            "id": uuid.uuid4().hex,
+            "name": username,
+            "enabled": True,
+            "description": "",
+            "domain_id": "default"
+        }
+        _user = self.identity_api.create_user(user)
+        return _user
+
+    def delete_admin_intra_extension(self):
+        self.manager.delete_intra_extension(self.ref["id"])
+
+    def send_logs(self):
+        log_authz = "Test for authz " + uuid.uuid4().hex
+        logs = []
+        self.moonlog_api.authz(log_authz)
+        logs.append("Test for critical " + uuid.uuid4().hex)
+        self.moonlog_api.critical(logs[-1])
+        logs.append("Test for error " + uuid.uuid4().hex)
+        self.moonlog_api.error(logs[-1])
+        logs.append("Test for warning " + uuid.uuid4().hex)
+        self.moonlog_api.warning(logs[-1])
+        logs.append("Test for info " + uuid.uuid4().hex)
+        self.moonlog_api.info(logs[-1])
+        logs.append("Test for debug " + uuid.uuid4().hex)
+        self.moonlog_api.debug(logs[-1])
+        return log_authz, logs
+
+    def test_get_set_logs(self):
+        previous_authz_logs = self.moonlog_api.get_logs(logger="authz")
+        previous_sys_logs = self.moonlog_api.get_logs(logger="sys")
+
+        log_authz, logs = self.send_logs()
+        time.sleep(1)
+
+        authz_logs = self.moonlog_api.get_logs(logger="authz")
+        sys_logs = self.moonlog_api.get_logs(logger="sys")
+
+        self.assertIsInstance(authz_logs, list)
+        self.assertIsInstance(sys_logs, list)
+
+        self.assertIn(log_authz, " ".join(authz_logs))
+
+        self.assertEqual(len(authz_logs), len(previous_authz_logs)+1)
+        self.assertTrue(len(sys_logs) >= len(previous_sys_logs)+5)
+        for log in logs:
+            self.assertIn(log, " ".join(sys_logs))
+
+    def test_get_syslogger_with_options(self):
+
+        all_logs = self.moonlog_api.get_logs(logger="sys")
+
+        time_1 = time.strftime(TIME_FORMAT)
+        time.sleep(1)
+
+        log_authz, logs = self.send_logs()
+
+        NUMBER_OF_LOG = 5
+        sys_logs = self.moonlog_api.get_logs(logger="sys", event_number=NUMBER_OF_LOG)
+        self.assertIsInstance(sys_logs, list)
+        self.assertEqual(len(sys_logs), NUMBER_OF_LOG)
+
+        sys_logs = self.moonlog_api.get_logs(logger="sys", time_from=time_1)
+        self.assertIsInstance(sys_logs, list)
+        self.assertEqual(len(sys_logs), NUMBER_OF_LOG)
+
+        log_authz, logs = self.send_logs()
+
+        time.sleep(1)
+        time_2 = time.strftime(TIME_FORMAT)
+
+        log_authz, logs = self.send_logs()
+
+        sys_logs = self.moonlog_api.get_logs(logger="sys", time_to=time_2)
+        self.assertIsInstance(sys_logs, list)
+        self.assertEqual(len(sys_logs), len(all_logs)+3*NUMBER_OF_LOG)
+
+        sys_logs = self.moonlog_api.get_logs(logger="sys", time_from=time_1, time_to=time_2)
+        self.assertIsInstance(sys_logs, list)
+        self.assertEqual(len(sys_logs), 3*NUMBER_OF_LOG)
+