2 MDS admin socket scrubbing-related tests.
8 from teuthology.exceptions import CommandFailedError
10 from tasks.cephfs.cephfs_test_case import CephFSTestCase
12 log = logging.getLogger(__name__)
15 class TestScrubChecks(CephFSTestCase):
17 Run flush and scrub commands on the specified files in the filesystem. This
18 task will run through a sequence of operations, but it is not comprehensive
19 on its own -- it doesn't manipulate the mds cache state to test on both
20 in- and out-of-memory parts of the hierarchy. So it's designed to be run
21 multiple times within a single test run, so that the test can manipulate
27 path: path/to/test/dir
31 Increment the run_seq on subsequent invocations within a single test run;
32 it uses that value to generate unique folder and file names.
38 def test_scrub_checks(self):
42 def _checks(self, run_seq):
44 test_dir = "scrub_test_path"
46 abs_test_path = "/{0}".format(test_dir)
48 log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
49 client_path = os.path.join(self.mount_a.mountpoint, test_dir)
50 log.info("client_path: {0}".format(client_path))
52 log.info("Cloning repo into place")
53 repo_path = self.clone_repo(self.mount_a, client_path)
55 log.info("Initiating mds_scrub_checks on mds.{id_}, " +
56 "test_path {path}, run_seq {seq}".format(
57 id_=mds_rank, path=abs_test_path, seq=run_seq)
61 success_validator = lambda j, r: self.json_validator(j, r, "return_code", 0)
63 nep = "{test_path}/i/dont/exist".format(test_path=abs_test_path)
64 self.asok_command(mds_rank, "flush_path {nep}".format(nep=nep),
65 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
66 self.asok_command(mds_rank, "scrub_path {nep}".format(nep=nep),
67 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
69 test_repo_path = "{test_path}/ceph-qa-suite".format(test_path=abs_test_path)
70 dirpath = "{repo_path}/suites".format(repo_path=test_repo_path)
73 log.info("First run: flushing {dirpath}".format(dirpath=dirpath))
74 command = "flush_path {dirpath}".format(dirpath=dirpath)
75 self.asok_command(mds_rank, command, success_validator)
76 command = "scrub_path {dirpath}".format(dirpath=dirpath)
77 self.asok_command(mds_rank, command, success_validator)
79 filepath = "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format(
80 repo_path=test_repo_path)
82 log.info("First run: flushing {filepath}".format(filepath=filepath))
83 command = "flush_path {filepath}".format(filepath=filepath)
84 self.asok_command(mds_rank, command, success_validator)
85 command = "scrub_path {filepath}".format(filepath=filepath)
86 self.asok_command(mds_rank, command, success_validator)
88 filepath = "{repo_path}/suites/fs/basic/clusters/fixed-3-cephfs.yaml". \
89 format(repo_path=test_repo_path)
90 command = "scrub_path {filepath}".format(filepath=filepath)
91 self.asok_command(mds_rank, command,
92 lambda j, r: self.json_validator(j, r, "performed_validation",
96 log.info("First run: flushing base dir /")
97 command = "flush_path /"
98 self.asok_command(mds_rank, command, success_validator)
99 command = "scrub_path /"
100 self.asok_command(mds_rank, command, success_validator)
102 new_dir = "{repo_path}/new_dir_{i}".format(repo_path=repo_path, i=run_seq)
103 test_new_dir = "{repo_path}/new_dir_{i}".format(repo_path=test_repo_path,
105 self.mount_a.run_shell(["mkdir", new_dir])
106 command = "flush_path {dir}".format(dir=test_new_dir)
107 self.asok_command(mds_rank, command, success_validator)
109 new_file = "{repo_path}/new_file_{i}".format(repo_path=repo_path,
111 test_new_file = "{repo_path}/new_file_{i}".format(repo_path=test_repo_path,
113 self.mount_a.write_n_mb(new_file, 1)
115 command = "flush_path {file}".format(file=test_new_file)
116 self.asok_command(mds_rank, command, success_validator)
118 # check that scrub fails on errors
119 ino = self.mount_a.path_to_ino(new_file)
120 rados_obj_name = "{ino:x}.00000000".format(ino=ino)
121 command = "scrub_path {file}".format(file=test_new_file)
123 # Missing parent xattr -> ENODATA
124 self.fs.rados(["rmxattr", rados_obj_name, "parent"], pool=self.fs.get_data_pool_name())
125 self.asok_command(mds_rank, command,
126 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENODATA))
128 # Missing object -> ENOENT
129 self.fs.rados(["rm", rados_obj_name], pool=self.fs.get_data_pool_name())
130 self.asok_command(mds_rank, command,
131 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
133 command = "flush_path /"
134 self.asok_command(mds_rank, command, success_validator)
136 def test_scrub_repair(self):
138 test_dir = "scrub_repair_path"
140 self.mount_a.run_shell(["sudo", "mkdir", test_dir])
141 self.mount_a.run_shell(["sudo", "touch", "{0}/file".format(test_dir)])
142 dir_objname = "{:x}.00000000".format(self.mount_a.path_to_ino(test_dir))
144 self.mount_a.umount_wait()
146 # flush journal entries to dirfrag objects, and expire journal
147 self.fs.mds_asok(['flush', 'journal'])
150 # remove the dentry from dirfrag, cause incorrect fragstat/rstat
151 self.fs.rados(["rmomapkey", dir_objname, "file_head"],
152 pool=self.fs.get_metadata_pool_name())
154 self.fs.mds_fail_restart()
155 self.fs.wait_for_daemons()
158 self.mount_a.wait_until_mounted()
160 # fragstat indicates the directory is not empty, rmdir should fail
161 with self.assertRaises(CommandFailedError) as ar:
162 self.mount_a.run_shell(["sudo", "rmdir", test_dir])
163 self.assertEqual(ar.exception.exitstatus, 1)
165 self.asok_command(mds_rank, "scrub_path /{0} repair".format(test_dir),
166 lambda j, r: self.json_validator(j, r, "return_code", 0))
168 # wait a few second for background repair
171 # fragstat should be fixed
172 self.mount_a.run_shell(["sudo", "rmdir", test_dir])
175 def json_validator(json_out, rc, element, expected_value):
177 return False, "asok command returned error {rc}".format(rc=rc)
178 element_value = json_out.get(element)
179 if element_value != expected_value:
180 return False, "unexpectedly got {jv} instead of {ev}!".format(
181 jv=element_value, ev=expected_value)
182 return True, "Succeeded"
184 def asok_command(self, mds_rank, command, validator):
185 log.info("Running command '{command}'".format(command=command))
187 command_list = command.split()
189 # we just assume there's an active mds for every rank
190 mds_id = self.fs.get_active_names()[mds_rank]
191 proc = self.fs.mon_manager.admin_socket('mds', mds_id,
192 command_list, check_status=False)
193 rout = proc.exitstatus
194 sout = proc.stdout.getvalue()
197 jout = json.loads(sout)
201 log.info("command '{command}' got response code " +
202 "'{rout}' and stdout '{sout}'".format(
203 command=command, rout=rout, sout=sout))
205 success, errstring = validator(jout, rout)
208 raise AsokCommandFailedError(command, rout, jout, errstring)
212 def clone_repo(self, client_mount, path):
213 repo = "ceph-qa-suite"
214 repo_path = os.path.join(path, repo)
215 client_mount.run_shell(["mkdir", "-p", path])
218 client_mount.stat(repo_path)
219 except CommandFailedError:
220 client_mount.run_shell([
221 "git", "clone", '--branch', 'giant',
222 "http://github.com/ceph/{repo}".format(repo=repo),
223 "{path}/{repo}".format(path=path, repo=repo)
229 class AsokCommandFailedError(Exception):
231 Exception thrown when we get an unexpected response
232 on an admin socket command
235 def __init__(self, command, rc, json_out, errstring):
236 self.command = command
239 self.errstring = errstring
242 return "Admin socket: {command} failed with rc={rc}," + \
243 "json output={json}, because '{es}'".format(
244 command=self.command, rc=self.rc,
245 json=self.json, es=self.errstring)