Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / cephfs / cephfs_test_case.py
1 import json
2 import logging
3 from unittest import case
4 from tasks.ceph_test_case import CephTestCase
5 import os
6 import re
7 from StringIO import StringIO
8
9 from tasks.cephfs.fuse_mount import FuseMount
10
11 from teuthology.orchestra import run
12 from teuthology.orchestra.run import CommandFailedError
13
14
15 log = logging.getLogger(__name__)
16
17
18 def for_teuthology(f):
19     """
20     Decorator that adds an "is_for_teuthology" attribute to the wrapped function
21     """
22     f.is_for_teuthology = True
23     return f
24
25
26 def needs_trimming(f):
27     """
28     Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse
29     this means it needs to be able to run as root, currently)
30     """
31     f.needs_trimming = True
32     return f
33
34
35 class CephFSTestCase(CephTestCase):
36     """
37     Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
38     into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
39
40     Handles resetting the cluster under test between tests.
41     """
42
43     # FIXME weird explicit naming
44     mount_a = None
45     mount_b = None
46     recovery_mount = None
47
48     # Declarative test requirements: subclasses should override these to indicate
49     # their special needs.  If not met, tests will be skipped.
50     CLIENTS_REQUIRED = 1
51     MDSS_REQUIRED = 1
52     REQUIRE_KCLIENT_REMOTE = False
53     REQUIRE_ONE_CLIENT_REMOTE = False
54     REQUIRE_MEMSTORE = False
55
56     # Whether to create the default filesystem during setUp
57     REQUIRE_FILESYSTEM = True
58
59     # requires REQUIRE_FILESYSTEM = True
60     REQUIRE_RECOVERY_FILESYSTEM = False
61
62     LOAD_SETTINGS = []
63
64     def setUp(self):
65         super(CephFSTestCase, self).setUp()
66
67         if len(self.mds_cluster.mds_ids) < self.MDSS_REQUIRED:
68             raise case.SkipTest("Only have {0} MDSs, require {1}".format(
69                 len(self.mds_cluster.mds_ids), self.MDSS_REQUIRED
70             ))
71
72         if len(self.mounts) < self.CLIENTS_REQUIRED:
73             raise case.SkipTest("Only have {0} clients, require {1}".format(
74                 len(self.mounts), self.CLIENTS_REQUIRED
75             ))
76
77         if self.REQUIRE_KCLIENT_REMOTE:
78             if not isinstance(self.mounts[0], FuseMount) or not isinstance(self.mounts[1], FuseMount):
79                 # kclient kill() power cycles nodes, so requires clients to each be on
80                 # their own node
81                 if self.mounts[0].client_remote.hostname == self.mounts[1].client_remote.hostname:
82                     raise case.SkipTest("kclient clients must be on separate nodes")
83
84         if self.REQUIRE_ONE_CLIENT_REMOTE:
85             if self.mounts[0].client_remote.hostname in self.mds_cluster.get_mds_hostnames():
86                 raise case.SkipTest("Require first client to be on separate server from MDSs")
87
88         if self.REQUIRE_MEMSTORE:
89             objectstore = self.mds_cluster.get_config("osd_objectstore", "osd")
90             if objectstore != "memstore":
91                 # You certainly *could* run this on a real OSD, but you don't want to sit
92                 # here for hours waiting for the test to fill up a 1TB drive!
93                 raise case.SkipTest("Require `memstore` OSD backend to simulate full drives")
94
95         # Create friendly mount_a, mount_b attrs
96         for i in range(0, self.CLIENTS_REQUIRED):
97             setattr(self, "mount_{0}".format(chr(ord('a') + i)), self.mounts[i])
98
99         self.mds_cluster.clear_firewall()
100
101         # Unmount all clients, we are about to blow away the filesystem
102         for mount in self.mounts:
103             if mount.is_mounted():
104                 mount.umount_wait(force=True)
105
106         # To avoid any issues with e.g. unlink bugs, we destroy and recreate
107         # the filesystem rather than just doing a rm -rf of files
108         self.mds_cluster.mds_stop()
109         self.mds_cluster.mds_fail()
110         self.mds_cluster.delete_all_filesystems()
111         self.fs = None # is now invalid!
112         self.recovery_fs = None
113
114         # In case the previous filesystem had filled up the RADOS cluster, wait for that
115         # flag to pass.
116         osd_mon_report_interval_max = int(self.mds_cluster.get_config("osd_mon_report_interval_max", service_type='osd'))
117         self.wait_until_true(lambda: not self.mds_cluster.is_full(),
118                              timeout=osd_mon_report_interval_max * 5)
119
120         # In case anything is in the OSD blacklist list, clear it out.  This is to avoid
121         # the OSD map changing in the background (due to blacklist expiry) while tests run.
122         try:
123             self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "clear")
124         except CommandFailedError:
125             # Fallback for older Ceph cluster
126             blacklist = json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd("osd",
127                                   "dump", "--format=json-pretty"))['blacklist']
128             log.info("Removing {0} blacklist entries".format(len(blacklist)))
129             for addr, blacklisted_at in blacklist.items():
130                 self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "rm", addr)
131
132         client_mount_ids = [m.client_id for m in self.mounts]
133         # In case the test changes the IDs of clients, stash them so that we can
134         # reset in tearDown
135         self._original_client_ids = client_mount_ids
136         log.info(client_mount_ids)
137
138         # In case there were any extra auth identities around from a previous
139         # test, delete them
140         for entry in self.auth_list():
141             ent_type, ent_id = entry['entity'].split(".")
142             if ent_type == "client" and ent_id not in client_mount_ids and ent_id != "admin":
143                 self.mds_cluster.mon_manager.raw_cluster_cmd("auth", "del", entry['entity'])
144
145         if self.REQUIRE_FILESYSTEM:
146             self.fs = self.mds_cluster.newfs(create=True)
147             self.fs.mds_restart()
148
149             # In case some test messed with auth caps, reset them
150             for client_id in client_mount_ids:
151                 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
152                     'auth', 'caps', "client.{0}".format(client_id),
153                     'mds', 'allow',
154                     'mon', 'allow r',
155                     'osd', 'allow rw pool={0}'.format(self.fs.get_data_pool_name()))
156
157             # wait for mds restart to complete...
158             self.fs.wait_for_daemons()
159
160             # Mount the requested number of clients
161             for i in range(0, self.CLIENTS_REQUIRED):
162                 self.mounts[i].mount()
163                 self.mounts[i].wait_until_mounted()
164
165         if self.REQUIRE_RECOVERY_FILESYSTEM:
166             if not self.REQUIRE_FILESYSTEM:
167                 raise case.SkipTest("Recovery filesystem requires a primary filesystem as well")
168             self.fs.mon_manager.raw_cluster_cmd('fs', 'flag', 'set',
169                                                 'enable_multiple', 'true',
170                                                 '--yes-i-really-mean-it')
171             self.recovery_fs = self.mds_cluster.newfs(name="recovery_fs", create=False)
172             self.recovery_fs.set_metadata_overlay(True)
173             self.recovery_fs.set_data_pool_name(self.fs.get_data_pool_name())
174             self.recovery_fs.create()
175             self.recovery_fs.getinfo(refresh=True)
176             self.recovery_fs.mds_restart()
177             self.recovery_fs.wait_for_daemons()
178
179         # Load an config settings of interest
180         for setting in self.LOAD_SETTINGS:
181             setattr(self, setting, float(self.fs.mds_asok(
182                 ['config', 'get', setting], self.mds_cluster.mds_ids[0]
183             )[setting]))
184
185         self.configs_set = set()
186
187     def tearDown(self):
188         super(CephFSTestCase, self).tearDown()
189
190         self.mds_cluster.clear_firewall()
191         for m in self.mounts:
192             m.teardown()
193
194         for i, m in enumerate(self.mounts):
195             m.client_id = self._original_client_ids[i]
196
197         for subsys, key in self.configs_set:
198             self.mds_cluster.clear_ceph_conf(subsys, key)
199
200     def set_conf(self, subsys, key, value):
201         self.configs_set.add((subsys, key))
202         self.mds_cluster.set_ceph_conf(subsys, key, value)
203
204     def auth_list(self):
205         """
206         Convenience wrapper on "ceph auth ls"
207         """
208         return json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd(
209             "auth", "ls", "--format=json-pretty"
210         ))['auth_dump']
211
212     def assert_session_count(self, expected, ls_data=None, mds_id=None):
213         if ls_data is None:
214             ls_data = self.fs.mds_asok(['session', 'ls'], mds_id=mds_id)
215
216         alive_count = len([s for s in ls_data if s['state'] != 'killing'])
217
218         self.assertEqual(expected, alive_count, "Expected {0} sessions, found {1}".format(
219             expected, alive_count
220         ))
221
222     def assert_session_state(self, client_id,  expected_state):
223         self.assertEqual(
224             self._session_by_id(
225                 self.fs.mds_asok(['session', 'ls'])).get(client_id, {'state': None})['state'],
226             expected_state)
227
228     def get_session_data(self, client_id):
229         return self._session_by_id(client_id)
230
231     def _session_list(self):
232         ls_data = self.fs.mds_asok(['session', 'ls'])
233         ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']]
234         return ls_data
235
236     def get_session(self, client_id, session_ls=None):
237         if session_ls is None:
238             session_ls = self.fs.mds_asok(['session', 'ls'])
239
240         return self._session_by_id(session_ls)[client_id]
241
242     def _session_by_id(self, session_ls):
243         return dict([(s['id'], s) for s in session_ls])
244
245     def wait_for_daemon_start(self, daemon_ids=None):
246         """
247         Wait until all the daemons appear in the FSMap, either assigned
248         MDS ranks or in the list of standbys
249         """
250         def get_daemon_names():
251             return [info['name'] for info in self.mds_cluster.status().get_all()]
252
253         if daemon_ids is None:
254             daemon_ids = self.mds_cluster.mds_ids
255
256         try:
257             self.wait_until_true(
258                 lambda: set(daemon_ids) & set(get_daemon_names()) == set(daemon_ids),
259                 timeout=30
260             )
261         except RuntimeError:
262             log.warn("Timeout waiting for daemons {0}, while we have {1}".format(
263                 daemon_ids, get_daemon_names()
264             ))
265             raise
266
267     def assert_mds_crash(self, daemon_id):
268         """
269         Assert that the a particular MDS daemon crashes (block until
270         it does)
271         """
272         try:
273             self.mds_cluster.mds_daemons[daemon_id].proc.wait()
274         except CommandFailedError as e:
275             log.info("MDS '{0}' crashed with status {1} as expected".format(daemon_id, e.exitstatus))
276             self.mds_cluster.mds_daemons[daemon_id].proc = None
277
278             # Go remove the coredump from the crash, otherwise teuthology.internal.coredump will
279             # catch it later and treat it as a failure.
280             p = self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
281                 "sudo", "sysctl", "-n", "kernel.core_pattern"], stdout=StringIO())
282             core_pattern = p.stdout.getvalue().strip()
283             if os.path.dirname(core_pattern):  # Non-default core_pattern with a directory in it
284                 # We have seen a core_pattern that looks like it's from teuthology's coredump
285                 # task, so proceed to clear out the core file
286                 log.info("Clearing core from pattern: {0}".format(core_pattern))
287
288                 # Determine the PID of the crashed MDS by inspecting the MDSMap, it had
289                 # to talk to the mons to get assigned a rank to reach the point of crashing
290                 addr = self.mds_cluster.mon_manager.get_mds_status(daemon_id)['addr']
291                 pid_str = addr.split("/")[1]
292                 log.info("Determined crasher PID was {0}".format(pid_str))
293
294                 # Substitute PID into core_pattern to get a glob
295                 core_glob = core_pattern.replace("%p", pid_str)
296                 core_glob = re.sub("%[a-z]", "*", core_glob)  # Match all for all other % tokens
297
298                 # Verify that we see the expected single coredump matching the expected pattern
299                 ls_proc = self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
300                     "sudo", "ls", run.Raw(core_glob)
301                 ], stdout=StringIO())
302                 cores = [f for f in ls_proc.stdout.getvalue().strip().split("\n") if f]
303                 log.info("Enumerated cores: {0}".format(cores))
304                 self.assertEqual(len(cores), 1)
305
306                 log.info("Found core file {0}, deleting it".format(cores[0]))
307
308                 self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
309                     "sudo", "rm", "-f", cores[0]
310                 ])
311             else:
312                 log.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)")
313
314         else:
315             raise AssertionError("MDS daemon '{0}' did not crash as expected".format(daemon_id))