6 from teuthology.orchestra.run import CommandFailedError
8 log = logging.getLogger(__name__)
11 class CephTestCase(unittest.TestCase):
13 For test tasks that want to define a structured set of
14 tests implemented in python. Subclass this with appropriate
15 helpers for the subsystem you're testing.
18 # Environment references
30 self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
31 "Starting test {0}".format(self.id()))
34 self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
35 "Ended test {0}".format(self.id()))
37 def assert_cluster_log(self, expected_pattern, invert_match=False, timeout=10):
39 Context manager. Assert that during execution, or up to 5 seconds later,
40 the Ceph cluster log emits a message matching the expected pattern.
42 :param expected_pattern: a string that you expect to see in the log output
45 ceph_manager = self.ceph_cluster.mon_manager
47 class ContextManager(object):
49 found = expected_pattern in self.watcher_process.stdout.getvalue()
56 self.watcher_process = ceph_manager.run_ceph_w()
58 def __exit__(self, exc_type, exc_val, exc_tb):
59 if not self.watcher_process.finished:
60 # Check if we got an early match, wait a bit if we didn't
64 log.debug("No log hits yet, waiting...")
65 # Default monc tick interval is 10s, so wait that long and
67 time.sleep(5 + timeout)
69 self.watcher_process.stdin.close()
71 self.watcher_process.wait()
72 except CommandFailedError:
76 log.error("Log output: \n{0}\n".format(self.watcher_process.stdout.getvalue()))
77 raise AssertionError("Expected log message not found: '{0}'".format(expected_pattern))
79 return ContextManager()
81 def wait_for_health(self, pattern, timeout):
83 Wait until 'ceph health' contains messages matching the pattern
85 def seen_health_warning():
86 health = self.ceph_cluster.mon_manager.get_mon_health()
87 codes = [s for s in health['checks']]
88 summary_strings = [s[1]['summary']['message'] for s in health['checks'].iteritems()]
89 if len(summary_strings) == 0:
90 log.debug("Not expected number of summary strings ({0})".format(summary_strings))
93 for ss in summary_strings:
99 log.debug("Not found expected summary strings yet ({0})".format(summary_strings))
102 self.wait_until_true(seen_health_warning, timeout)
104 def wait_for_health_clear(self, timeout):
106 Wait until `ceph health` returns no messages
109 health = self.ceph_cluster.mon_manager.get_mon_health()
110 return len(health['checks']) == 0
112 self.wait_until_true(is_clear, timeout)
114 def wait_until_equal(self, get_fn, expect_val, timeout, reject_fn=None):
119 if val == expect_val:
121 elif reject_fn and reject_fn(val):
122 raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val))
124 if elapsed >= timeout:
125 raise RuntimeError("Timed out after {0} seconds waiting for {1} (currently {2})".format(
126 elapsed, expect_val, val
129 log.debug("wait_until_equal: {0} != {1}, waiting...".format(val, expect_val))
133 log.debug("wait_until_equal: success")
135 def wait_until_true(self, condition, timeout):
140 log.debug("wait_until_true: success in {0}s".format(elapsed))
143 if elapsed >= timeout:
144 raise RuntimeError("Timed out after {0}s".format(elapsed))
146 log.debug("wait_until_true: waiting...")