initial code repo
[stor4nfv.git] / src / ceph / qa / tasks / ceph_test_case.py
diff --git a/src/ceph/qa/tasks/ceph_test_case.py b/src/ceph/qa/tasks/ceph_test_case.py
new file mode 100644 (file)
index 0000000..5767df4
--- /dev/null
@@ -0,0 +1,150 @@
+
+import unittest
+import time
+import logging
+
+from teuthology.orchestra.run import CommandFailedError
+
+log = logging.getLogger(__name__)
+
+
+class CephTestCase(unittest.TestCase):
+    """
+    For test tasks that want to define a structured set of
+    tests implemented in python.  Subclass this with appropriate
+    helpers for the subsystem you're testing.
+    """
+
+    # Environment references
+    mounts = None
+    fs = None
+    recovery_fs = None
+    ceph_cluster = None
+    mds_cluster = None
+    mgr_cluster = None
+    ctx = None
+
+    mon_manager = None
+
+    def setUp(self):
+        self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
+            "Starting test {0}".format(self.id()))
+
+    def tearDown(self):
+        self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
+            "Ended test {0}".format(self.id()))
+
+    def assert_cluster_log(self, expected_pattern, invert_match=False, timeout=10):
+        """
+        Context manager.  Assert that during execution, or up to 5 seconds later,
+        the Ceph cluster log emits a message matching the expected pattern.
+
+        :param expected_pattern: a string that you expect to see in the log output
+        """
+
+        ceph_manager = self.ceph_cluster.mon_manager
+
+        class ContextManager(object):
+            def match(self):
+                found = expected_pattern in self.watcher_process.stdout.getvalue()
+                if invert_match:
+                    return not found
+
+                return found
+
+            def __enter__(self):
+                self.watcher_process = ceph_manager.run_ceph_w()
+
+            def __exit__(self, exc_type, exc_val, exc_tb):
+                if not self.watcher_process.finished:
+                    # Check if we got an early match, wait a bit if we didn't
+                    if self.match():
+                        return
+                    else:
+                        log.debug("No log hits yet, waiting...")
+                        # Default monc tick interval is 10s, so wait that long and
+                        # then some grace
+                        time.sleep(5 + timeout)
+
+                self.watcher_process.stdin.close()
+                try:
+                    self.watcher_process.wait()
+                except CommandFailedError:
+                    pass
+
+                if not self.match():
+                    log.error("Log output: \n{0}\n".format(self.watcher_process.stdout.getvalue()))
+                    raise AssertionError("Expected log message not found: '{0}'".format(expected_pattern))
+
+        return ContextManager()
+
+    def wait_for_health(self, pattern, timeout):
+        """
+        Wait until 'ceph health' contains messages matching the pattern
+        """
+        def seen_health_warning():
+            health = self.ceph_cluster.mon_manager.get_mon_health()
+            codes = [s for s in health['checks']]
+            summary_strings = [s[1]['summary']['message'] for s in health['checks'].iteritems()]
+            if len(summary_strings) == 0:
+                log.debug("Not expected number of summary strings ({0})".format(summary_strings))
+                return False
+            else:
+                for ss in summary_strings:
+                    if pattern in ss:
+                         return True
+                if pattern in codes:
+                    return True
+
+            log.debug("Not found expected summary strings yet ({0})".format(summary_strings))
+            return False
+
+        self.wait_until_true(seen_health_warning, timeout)
+
+    def wait_for_health_clear(self, timeout):
+        """
+        Wait until `ceph health` returns no messages
+        """
+        def is_clear():
+            health = self.ceph_cluster.mon_manager.get_mon_health()
+            return len(health['checks']) == 0
+
+        self.wait_until_true(is_clear, timeout)
+
+    def wait_until_equal(self, get_fn, expect_val, timeout, reject_fn=None):
+        period = 5
+        elapsed = 0
+        while True:
+            val = get_fn()
+            if val == expect_val:
+                return
+            elif reject_fn and reject_fn(val):
+                raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val))
+            else:
+                if elapsed >= timeout:
+                    raise RuntimeError("Timed out after {0} seconds waiting for {1} (currently {2})".format(
+                        elapsed, expect_val, val
+                    ))
+                else:
+                    log.debug("wait_until_equal: {0} != {1}, waiting...".format(val, expect_val))
+                time.sleep(period)
+                elapsed += period
+
+        log.debug("wait_until_equal: success")
+
+    def wait_until_true(self, condition, timeout):
+        period = 5
+        elapsed = 0
+        while True:
+            if condition():
+                log.debug("wait_until_true: success in {0}s".format(elapsed))
+                return
+            else:
+                if elapsed >= timeout:
+                    raise RuntimeError("Timed out after {0}s".format(elapsed))
+                else:
+                    log.debug("wait_until_true: waiting...")
+                time.sleep(period)
+                elapsed += period
+
+