initial code repo
[stor4nfv.git] / src / ceph / qa / tasks / cephfs_test_runner.py
diff --git a/src/ceph/qa/tasks/cephfs_test_runner.py b/src/ceph/qa/tasks/cephfs_test_runner.py
new file mode 100644 (file)
index 0000000..d57e85d
--- /dev/null
@@ -0,0 +1,209 @@
+import contextlib
+import logging
+import os
+import unittest
+from unittest import suite, loader, case
+from teuthology.task import interactive
+from teuthology import misc
+from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
+from tasks.mgr.mgr_test_case import MgrCluster
+
+log = logging.getLogger(__name__)
+
+
+class DecoratingLoader(loader.TestLoader):
+    """
+    A specialization of TestLoader that tags some extra attributes
+    onto test classes as they are loaded.
+    """
+    def __init__(self, params):
+        self._params = params
+        super(DecoratingLoader, self).__init__()
+
+    def _apply_params(self, obj):
+        for k, v in self._params.items():
+            setattr(obj, k, v)
+
+    def loadTestsFromTestCase(self, testCaseClass):
+        self._apply_params(testCaseClass)
+        return super(DecoratingLoader, self).loadTestsFromTestCase(testCaseClass)
+
+    def loadTestsFromName(self, name, module=None):
+        result = super(DecoratingLoader, self).loadTestsFromName(name, module)
+
+        # Special case for when we were called with the name of a method, we get
+        # a suite with one TestCase
+        tests_in_result = list(result)
+        if len(tests_in_result) == 1 and isinstance(tests_in_result[0], case.TestCase):
+            self._apply_params(tests_in_result[0])
+
+        return result
+
+
+class LogStream(object):
+    def __init__(self):
+        self.buffer = ""
+
+    def write(self, data):
+        self.buffer += data
+        if "\n" in self.buffer:
+            lines = self.buffer.split("\n")
+            for line in lines[:-1]:
+                log.info(line)
+            self.buffer = lines[-1]
+
+    def flush(self):
+        pass
+
+
+class InteractiveFailureResult(unittest.TextTestResult):
+    """
+    Specialization that implements interactive-on-error style
+    behavior.
+    """
+    ctx = None
+
+    def addFailure(self, test, err):
+        log.error(self._exc_info_to_string(err, test))
+        log.error("Failure in test '{0}', going interactive".format(
+            self.getDescription(test)
+        ))
+        interactive.task(ctx=self.ctx, config=None)
+
+    def addError(self, test, err):
+        log.error(self._exc_info_to_string(err, test))
+        log.error("Error in test '{0}', going interactive".format(
+            self.getDescription(test)
+        ))
+        interactive.task(ctx=self.ctx, config=None)
+
+
+@contextlib.contextmanager
+def task(ctx, config):
+    """
+    Run the CephFS test cases.
+
+    Run everything in tasks/cephfs/test_*.py:
+
+    ::
+
+        tasks:
+          - install:
+          - ceph:
+          - ceph-fuse:
+          - cephfs_test_runner:
+
+    `modules` argument allows running only some specific modules:
+
+    ::
+
+        tasks:
+            ...
+          - cephfs_test_runner:
+              modules:
+                - tasks.cephfs.test_sessionmap
+                - tasks.cephfs.test_auto_repair
+
+    By default, any cases that can't be run on the current cluster configuration
+    will generate a failure.  When the optional `fail_on_skip` argument is set
+    to false, any tests that can't be run on the current configuration will
+    simply be skipped:
+
+    ::
+        tasks:
+            ...
+         - cephfs_test_runner:
+           fail_on_skip: false
+
+    """
+
+    ceph_cluster = CephCluster(ctx)
+
+    if len(list(misc.all_roles_of_type(ctx.cluster, 'mds'))):
+        mds_cluster = MDSCluster(ctx)
+        fs = Filesystem(ctx)
+    else:
+        mds_cluster = None
+        fs = None
+
+    if len(list(misc.all_roles_of_type(ctx.cluster, 'mgr'))):
+        mgr_cluster = MgrCluster(ctx)
+    else:
+        mgr_cluster = None
+
+    # Mount objects, sorted by ID
+    if hasattr(ctx, 'mounts'):
+        mounts = [v for k, v in sorted(ctx.mounts.items(), lambda a, b: cmp(a[0], b[0]))]
+    else:
+        # The test configuration has a filesystem but no fuse/kclient mounts
+        mounts = []
+
+    decorating_loader = DecoratingLoader({
+        "ctx": ctx,
+        "mounts": mounts,
+        "fs": fs,
+        "ceph_cluster": ceph_cluster,
+        "mds_cluster": mds_cluster,
+        "mgr_cluster": mgr_cluster,
+    })
+
+    fail_on_skip = config.get('fail_on_skip', True)
+
+    # Put useful things onto ctx for interactive debugging
+    ctx.fs = fs
+    ctx.mds_cluster = mds_cluster
+    ctx.mgr_cluster = mgr_cluster
+
+    # Depending on config, either load specific modules, or scan for moduless
+    if config and 'modules' in config and config['modules']:
+        module_suites = []
+        for mod_name in config['modules']:
+            # Test names like cephfs.test_auto_repair
+            module_suites.append(decorating_loader.loadTestsFromName(mod_name))
+        overall_suite = suite.TestSuite(module_suites)
+    else:
+        # Default, run all tests
+        overall_suite = decorating_loader.discover(
+            os.path.join(
+                os.path.dirname(os.path.abspath(__file__)),
+                "cephfs/"
+            )
+        )
+
+    if ctx.config.get("interactive-on-error", False):
+        InteractiveFailureResult.ctx = ctx
+        result_class = InteractiveFailureResult
+    else:
+        result_class = unittest.TextTestResult
+
+    class LoggingResult(result_class):
+        def startTest(self, test):
+            log.info("Starting test: {0}".format(self.getDescription(test)))
+            return super(LoggingResult, self).startTest(test)
+
+        def addSkip(self, test, reason):
+            if fail_on_skip:
+                # Don't just call addFailure because that requires a traceback
+                self.failures.append((test, reason))
+            else:
+                super(LoggingResult, self).addSkip(test, reason)
+
+    # Execute!
+    result = unittest.TextTestRunner(
+        stream=LogStream(),
+        resultclass=LoggingResult,
+        verbosity=2,
+        failfast=True).run(overall_suite)
+
+    if not result.wasSuccessful():
+        result.printErrors()  # duplicate output at end for convenience
+
+        bad_tests = []
+        for test, error in result.errors:
+            bad_tests.append(str(test))
+        for test, failure in result.failures:
+            bad_tests.append(str(test))
+
+        raise RuntimeError("Test failure: {0}".format(", ".join(bad_tests)))
+
+    yield