Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / ceph_test_case.py
1
2 import unittest
3 import time
4 import logging
5
6 from teuthology.orchestra.run import CommandFailedError
7
8 log = logging.getLogger(__name__)
9
10
11 class CephTestCase(unittest.TestCase):
12     """
13     For test tasks that want to define a structured set of
14     tests implemented in python.  Subclass this with appropriate
15     helpers for the subsystem you're testing.
16     """
17
18     # Environment references
19     mounts = None
20     fs = None
21     recovery_fs = None
22     ceph_cluster = None
23     mds_cluster = None
24     mgr_cluster = None
25     ctx = None
26
27     mon_manager = None
28
29     def setUp(self):
30         self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
31             "Starting test {0}".format(self.id()))
32
33     def tearDown(self):
34         self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
35             "Ended test {0}".format(self.id()))
36
37     def assert_cluster_log(self, expected_pattern, invert_match=False, timeout=10):
38         """
39         Context manager.  Assert that during execution, or up to 5 seconds later,
40         the Ceph cluster log emits a message matching the expected pattern.
41
42         :param expected_pattern: a string that you expect to see in the log output
43         """
44
45         ceph_manager = self.ceph_cluster.mon_manager
46
47         class ContextManager(object):
48             def match(self):
49                 found = expected_pattern in self.watcher_process.stdout.getvalue()
50                 if invert_match:
51                     return not found
52
53                 return found
54
55             def __enter__(self):
56                 self.watcher_process = ceph_manager.run_ceph_w()
57
58             def __exit__(self, exc_type, exc_val, exc_tb):
59                 if not self.watcher_process.finished:
60                     # Check if we got an early match, wait a bit if we didn't
61                     if self.match():
62                         return
63                     else:
64                         log.debug("No log hits yet, waiting...")
65                         # Default monc tick interval is 10s, so wait that long and
66                         # then some grace
67                         time.sleep(5 + timeout)
68
69                 self.watcher_process.stdin.close()
70                 try:
71                     self.watcher_process.wait()
72                 except CommandFailedError:
73                     pass
74
75                 if not self.match():
76                     log.error("Log output: \n{0}\n".format(self.watcher_process.stdout.getvalue()))
77                     raise AssertionError("Expected log message not found: '{0}'".format(expected_pattern))
78
79         return ContextManager()
80
81     def wait_for_health(self, pattern, timeout):
82         """
83         Wait until 'ceph health' contains messages matching the pattern
84         """
85         def seen_health_warning():
86             health = self.ceph_cluster.mon_manager.get_mon_health()
87             codes = [s for s in health['checks']]
88             summary_strings = [s[1]['summary']['message'] for s in health['checks'].iteritems()]
89             if len(summary_strings) == 0:
90                 log.debug("Not expected number of summary strings ({0})".format(summary_strings))
91                 return False
92             else:
93                 for ss in summary_strings:
94                     if pattern in ss:
95                          return True
96                 if pattern in codes:
97                     return True
98
99             log.debug("Not found expected summary strings yet ({0})".format(summary_strings))
100             return False
101
102         self.wait_until_true(seen_health_warning, timeout)
103
104     def wait_for_health_clear(self, timeout):
105         """
106         Wait until `ceph health` returns no messages
107         """
108         def is_clear():
109             health = self.ceph_cluster.mon_manager.get_mon_health()
110             return len(health['checks']) == 0
111
112         self.wait_until_true(is_clear, timeout)
113
114     def wait_until_equal(self, get_fn, expect_val, timeout, reject_fn=None):
115         period = 5
116         elapsed = 0
117         while True:
118             val = get_fn()
119             if val == expect_val:
120                 return
121             elif reject_fn and reject_fn(val):
122                 raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val))
123             else:
124                 if elapsed >= timeout:
125                     raise RuntimeError("Timed out after {0} seconds waiting for {1} (currently {2})".format(
126                         elapsed, expect_val, val
127                     ))
128                 else:
129                     log.debug("wait_until_equal: {0} != {1}, waiting...".format(val, expect_val))
130                 time.sleep(period)
131                 elapsed += period
132
133         log.debug("wait_until_equal: success")
134
135     def wait_until_true(self, condition, timeout):
136         period = 5
137         elapsed = 0
138         while True:
139             if condition():
140                 log.debug("wait_until_true: success in {0}s".format(elapsed))
141                 return
142             else:
143                 if elapsed >= timeout:
144                     raise RuntimeError("Timed out after {0}s".format(elapsed))
145                 else:
146                     log.debug("wait_until_true: waiting...")
147                 time.sleep(period)
148                 elapsed += period
149
150