Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / cephfs / test_data_scan.py
1
2 """
3 Test our tools for recovering metadata from the data pool
4 """
5 import json
6
7 import logging
8 import os
9 from textwrap import dedent
10 import traceback
11 from collections import namedtuple, defaultdict
12
13 from teuthology.orchestra.run import CommandFailedError
14 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
15
16 log = logging.getLogger(__name__)
17
18
19 ValidationError = namedtuple("ValidationError", ["exception", "backtrace"])
20
21
22 class Workload(object):
23     def __init__(self, filesystem, mount):
24         self._mount = mount
25         self._filesystem = filesystem
26         self._initial_state = None
27
28         # Accumulate backtraces for every failed validation, and return them.  Backtraces
29         # are rather verbose, but we only see them when something breaks, and they
30         # let us see which check failed without having to decorate each check with
31         # a string
32         self._errors = []
33
34     def assert_equal(self, a, b):
35         try:
36             if a != b:
37                 raise AssertionError("{0} != {1}".format(a, b))
38         except AssertionError as e:
39             self._errors.append(
40                 ValidationError(e, traceback.format_exc(3))
41             )
42
43     def write(self):
44         """
45         Write the workload files to the mount
46         """
47         raise NotImplementedError()
48
49     def validate(self):
50         """
51         Read from the mount and validate that the workload files are present (i.e. have
52         survived or been reconstructed from the test scenario)
53         """
54         raise NotImplementedError()
55
56     def damage(self):
57         """
58         Damage the filesystem pools in ways that will be interesting to recover from.  By
59         default just wipe everything in the metadata pool
60         """
61         # Delete every object in the metadata pool
62         objects = self._filesystem.rados(["ls"]).split("\n")
63         for o in objects:
64             self._filesystem.rados(["rm", o])
65
66     def flush(self):
67         """
68         Called after client unmount, after write: flush whatever you want
69         """
70         self._filesystem.mds_asok(["flush", "journal"])
71
72
73 class SimpleWorkload(Workload):
74     """
75     Single file, single directory, check that it gets recovered and so does its size
76     """
77     def write(self):
78         self._mount.run_shell(["mkdir", "subdir"])
79         self._mount.write_n_mb("subdir/sixmegs", 6)
80         self._initial_state = self._mount.stat("subdir/sixmegs")
81
82     def validate(self):
83         self._mount.run_shell(["ls", "subdir"])
84         st = self._mount.stat("subdir/sixmegs")
85         self.assert_equal(st['st_size'], self._initial_state['st_size'])
86         return self._errors
87
88
89 class MovedFile(Workload):
90     def write(self):
91         # Create a file whose backtrace disagrees with his eventual position
92         # in the metadata.  We will see that he gets reconstructed in his
93         # original position according to his backtrace.
94         self._mount.run_shell(["mkdir", "subdir_alpha"])
95         self._mount.run_shell(["mkdir", "subdir_bravo"])
96         self._mount.write_n_mb("subdir_alpha/sixmegs", 6)
97         self._filesystem.mds_asok(["flush", "journal"])
98         self._mount.run_shell(["mv", "subdir_alpha/sixmegs", "subdir_bravo/sixmegs"])
99         self._initial_state = self._mount.stat("subdir_bravo/sixmegs")
100
101     def flush(self):
102         pass
103
104     def validate(self):
105         self.assert_equal(self._mount.ls(), ["subdir_alpha"])
106         st = self._mount.stat("subdir_alpha/sixmegs")
107         self.assert_equal(st['st_size'], self._initial_state['st_size'])
108         return self._errors
109
110
111 class BacktracelessFile(Workload):
112     def write(self):
113         self._mount.run_shell(["mkdir", "subdir"])
114         self._mount.write_n_mb("subdir/sixmegs", 6)
115         self._initial_state = self._mount.stat("subdir/sixmegs")
116
117     def flush(self):
118         # Never flush metadata, so backtrace won't be written
119         pass
120
121     def validate(self):
122         ino_name = "%x" % self._initial_state["st_ino"]
123
124         # The inode should be linked into lost+found because we had no path for it
125         self.assert_equal(self._mount.ls(), ["lost+found"])
126         self.assert_equal(self._mount.ls("lost+found"), [ino_name])
127         st = self._mount.stat("lost+found/{ino_name}".format(ino_name=ino_name))
128
129         # We might not have got the name or path, but we should still get the size
130         self.assert_equal(st['st_size'], self._initial_state['st_size'])
131
132         return self._errors
133
134
135 class StripedStashedLayout(Workload):
136     def __init__(self, fs, m):
137         super(StripedStashedLayout, self).__init__(fs, m)
138
139         # Nice small stripes so we can quickly do our writes+validates
140         self.sc = 4
141         self.ss = 65536
142         self.os = 262144
143
144         self.interesting_sizes = [
145             # Exactly stripe_count objects will exist
146             self.os * self.sc,
147             # Fewer than stripe_count objects will exist
148             self.os * self.sc / 2,
149             self.os * (self.sc - 1) + self.os / 2,
150             self.os * (self.sc - 1) + self.os / 2 - 1,
151             self.os * (self.sc + 1) + self.os / 2,
152             self.os * (self.sc + 1) + self.os / 2 + 1,
153             # More than stripe_count objects will exist
154             self.os * self.sc + self.os * self.sc / 2
155         ]
156
157     def write(self):
158         # Create a dir with a striped layout set on it
159         self._mount.run_shell(["mkdir", "stripey"])
160
161         self._mount.setfattr("./stripey", "ceph.dir.layout",
162              "stripe_unit={ss} stripe_count={sc} object_size={os} pool={pool}".format(
163                  ss=self.ss, os=self.os, sc=self.sc,
164                  pool=self._filesystem.get_data_pool_name()
165              ))
166
167         # Write files, then flush metadata so that its layout gets written into an xattr
168         for i, n_bytes in enumerate(self.interesting_sizes):
169             self._mount.write_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
170             # This is really just validating the validator
171             self._mount.validate_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
172         self._filesystem.mds_asok(["flush", "journal"])
173
174         # Write another file in the same way, but this time don't flush the metadata,
175         # so that it won't have the layout xattr
176         self._mount.write_test_pattern("stripey/unflushed_file", 1024 * 512)
177         self._mount.validate_test_pattern("stripey/unflushed_file", 1024 * 512)
178
179         self._initial_state = {
180             "unflushed_ino": self._mount.path_to_ino("stripey/unflushed_file")
181         }
182
183     def flush(self):
184         # Pass because we already selectively flushed during write
185         pass
186
187     def validate(self):
188         # The first files should have been recovered into its original location
189         # with the correct layout: read back correct data
190         for i, n_bytes in enumerate(self.interesting_sizes):
191             try:
192                 self._mount.validate_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
193             except CommandFailedError as e:
194                 self._errors.append(
195                     ValidationError("File {0} (size {1}): {2}".format(i, n_bytes, e), traceback.format_exc(3))
196                 )
197
198         # The unflushed file should have been recovered into lost+found without
199         # the correct layout: read back junk
200         ino_name = "%x" % self._initial_state["unflushed_ino"]
201         self.assert_equal(self._mount.ls("lost+found"), [ino_name])
202         try:
203             self._mount.validate_test_pattern(os.path.join("lost+found", ino_name), 1024 * 512)
204         except CommandFailedError:
205             pass
206         else:
207             self._errors.append(
208                 ValidationError("Unexpectedly valid data in unflushed striped file", "")
209             )
210
211         return self._errors
212
213
214 class ManyFilesWorkload(Workload):
215     def __init__(self, filesystem, mount, file_count):
216         super(ManyFilesWorkload, self).__init__(filesystem, mount)
217         self.file_count = file_count
218
219     def write(self):
220         self._mount.run_shell(["mkdir", "subdir"])
221         for n in range(0, self.file_count):
222             self._mount.write_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
223
224     def validate(self):
225         for n in range(0, self.file_count):
226             try:
227                 self._mount.validate_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
228             except CommandFailedError as e:
229                 self._errors.append(
230                     ValidationError("File {0}: {1}".format(n, e), traceback.format_exc(3))
231                 )
232
233         return self._errors
234
235
236 class MovedDir(Workload):
237     def write(self):
238         # Create a nested dir that we will then move.  Two files with two different
239         # backtraces referring to the moved dir, claiming two different locations for
240         # it.  We will see that only one backtrace wins and the dir ends up with
241         # single linkage.
242         self._mount.run_shell(["mkdir", "-p", "grandmother/parent"])
243         self._mount.write_n_mb("grandmother/parent/orig_pos_file", 1)
244         self._filesystem.mds_asok(["flush", "journal"])
245         self._mount.run_shell(["mkdir", "grandfather"])
246         self._mount.run_shell(["mv", "grandmother/parent", "grandfather"])
247         self._mount.write_n_mb("grandfather/parent/new_pos_file", 2)
248         self._filesystem.mds_asok(["flush", "journal"])
249
250         self._initial_state = (
251             self._mount.stat("grandfather/parent/orig_pos_file"),
252             self._mount.stat("grandfather/parent/new_pos_file")
253         )
254
255     def validate(self):
256         root_files = self._mount.ls()
257         self.assert_equal(len(root_files), 1)
258         self.assert_equal(root_files[0] in ["grandfather", "grandmother"], True)
259         winner = root_files[0]
260         st_opf = self._mount.stat("{0}/parent/orig_pos_file".format(winner))
261         st_npf = self._mount.stat("{0}/parent/new_pos_file".format(winner))
262
263         self.assert_equal(st_opf['st_size'], self._initial_state[0]['st_size'])
264         self.assert_equal(st_npf['st_size'], self._initial_state[1]['st_size'])
265
266
267 class MissingZerothObject(Workload):
268     def write(self):
269         self._mount.run_shell(["mkdir", "subdir"])
270         self._mount.write_n_mb("subdir/sixmegs", 6)
271         self._initial_state = self._mount.stat("subdir/sixmegs")
272
273     def damage(self):
274         super(MissingZerothObject, self).damage()
275         zeroth_id = "{0:x}.00000000".format(self._initial_state['st_ino'])
276         self._filesystem.rados(["rm", zeroth_id], pool=self._filesystem.get_data_pool_name())
277
278     def validate(self):
279         st = self._mount.stat("lost+found/{0:x}".format(self._initial_state['st_ino']))
280         self.assert_equal(st['st_size'], self._initial_state['st_size'])
281
282
283 class NonDefaultLayout(Workload):
284     """
285     Check that the reconstruction copes with files that have a different
286     object size in their layout
287     """
288     def write(self):
289         self._mount.run_shell(["touch", "datafile"])
290         self._mount.setfattr("./datafile", "ceph.file.layout.object_size", "8388608")
291         self._mount.run_shell(["dd", "if=/dev/urandom", "of=./datafile", "bs=1M", "count=32"])
292         self._initial_state = self._mount.stat("datafile")
293
294     def validate(self):
295         # Check we got the layout reconstructed properly
296         object_size = int(self._mount.getfattr(
297             "./datafile", "ceph.file.layout.object_size"))
298         self.assert_equal(object_size, 8388608)
299
300         # Check we got the file size reconstructed properly
301         st = self._mount.stat("datafile")
302         self.assert_equal(st['st_size'], self._initial_state['st_size'])
303
304
305 class TestDataScan(CephFSTestCase):
306     MDSS_REQUIRED = 2
307
308     def is_marked_damaged(self, rank):
309         mds_map = self.fs.get_mds_map()
310         return rank in mds_map['damaged']
311
312     def _rebuild_metadata(self, workload, workers=1):
313         """
314         That when all objects in metadata pool are removed, we can rebuild a metadata pool
315         based on the contents of a data pool, and a client can see and read our files.
316         """
317
318         # First, inject some files
319
320         workload.write()
321
322         # Unmount the client and flush the journal: the tool should also cope with
323         # situations where there is dirty metadata, but we'll test that separately
324         self.mount_a.umount_wait()
325         workload.flush()
326
327         # Stop the MDS
328         self.fs.mds_stop()
329         self.fs.mds_fail()
330
331         # After recovery, we need the MDS to not be strict about stats (in production these options
332         # are off by default, but in QA we need to explicitly disable them)
333         self.fs.set_ceph_conf('mds', 'mds verify scatter', False)
334         self.fs.set_ceph_conf('mds', 'mds debug scatterstat', False)
335
336         # Apply any data damage the workload wants
337         workload.damage()
338
339         # Reset the MDS map in case multiple ranks were in play: recovery procedure
340         # only understands how to rebuild metadata under rank 0
341         self.fs.mon_manager.raw_cluster_cmd('fs', 'reset', self.fs.name,
342                 '--yes-i-really-mean-it')
343
344         self.fs.mds_restart()
345
346         def get_state(mds_id):
347             info = self.mds_cluster.get_mds_info(mds_id)
348             return info['state'] if info is not None else None
349
350         self.wait_until_true(lambda: self.is_marked_damaged(0), 60)
351         for mds_id in self.fs.mds_ids:
352             self.wait_until_equal(
353                     lambda: get_state(mds_id),
354                     "up:standby",
355                     timeout=60)
356
357         self.fs.table_tool([self.fs.name + ":0", "reset", "session"])
358         self.fs.table_tool([self.fs.name + ":0", "reset", "snap"])
359         self.fs.table_tool([self.fs.name + ":0", "reset", "inode"])
360
361         # Run the recovery procedure
362         if False:
363             with self.assertRaises(CommandFailedError):
364                 # Normal reset should fail when no objects are present, we'll use --force instead
365                 self.fs.journal_tool(["journal", "reset"])
366
367         self.fs.journal_tool(["journal", "reset", "--force"])
368         self.fs.data_scan(["init"])
369         self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()], worker_count=workers)
370         self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()], worker_count=workers)
371
372         # Mark the MDS repaired
373         self.fs.mon_manager.raw_cluster_cmd('mds', 'repaired', '0')
374
375         # Start the MDS
376         self.fs.mds_restart()
377         self.fs.wait_for_daemons()
378         log.info(str(self.mds_cluster.status()))
379
380         # Mount a client
381         self.mount_a.mount()
382         self.mount_a.wait_until_mounted()
383
384         # See that the files are present and correct
385         errors = workload.validate()
386         if errors:
387             log.error("Validation errors found: {0}".format(len(errors)))
388             for e in errors:
389                 log.error(e.exception)
390                 log.error(e.backtrace)
391             raise AssertionError("Validation failed, first error: {0}\n{1}".format(
392                 errors[0].exception, errors[0].backtrace
393             ))
394
395     def test_rebuild_simple(self):
396         self._rebuild_metadata(SimpleWorkload(self.fs, self.mount_a))
397
398     def test_rebuild_moved_file(self):
399         self._rebuild_metadata(MovedFile(self.fs, self.mount_a))
400
401     def test_rebuild_backtraceless(self):
402         self._rebuild_metadata(BacktracelessFile(self.fs, self.mount_a))
403
404     def test_rebuild_moved_dir(self):
405         self._rebuild_metadata(MovedDir(self.fs, self.mount_a))
406
407     def test_rebuild_missing_zeroth(self):
408         self._rebuild_metadata(MissingZerothObject(self.fs, self.mount_a))
409
410     def test_rebuild_nondefault_layout(self):
411         self._rebuild_metadata(NonDefaultLayout(self.fs, self.mount_a))
412
413     def test_stashed_layout(self):
414         self._rebuild_metadata(StripedStashedLayout(self.fs, self.mount_a))
415
416     def _dirfrag_keys(self, object_id):
417         keys_str = self.fs.rados(["listomapkeys", object_id])
418         if keys_str:
419             return keys_str.split("\n")
420         else:
421             return []
422
423     def test_fragmented_injection(self):
424         """
425         That when injecting a dentry into a fragmented directory, we put it in the right fragment.
426         """
427
428         self.fs.set_allow_dirfrags(True)
429
430         file_count = 100
431         file_names = ["%s" % n for n in range(0, file_count)]
432
433         # Create a directory of `file_count` files, each named after its
434         # decimal number and containing the string of its decimal number
435         self.mount_a.run_python(dedent("""
436         import os
437         path = os.path.join("{path}", "subdir")
438         os.mkdir(path)
439         for n in range(0, {file_count}):
440             open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
441         """.format(
442             path=self.mount_a.mountpoint,
443             file_count=file_count
444         )))
445
446         dir_ino = self.mount_a.path_to_ino("subdir")
447
448         # Only one MDS should be active!
449         self.assertEqual(len(self.fs.get_active_names()), 1)
450
451         # Ensure that one directory is fragmented
452         mds_id = self.fs.get_active_names()[0]
453         self.fs.mds_asok(["dirfrag", "split", "/subdir", "0/0", "1"], mds_id)
454
455         # Flush journal and stop MDS
456         self.mount_a.umount_wait()
457         self.fs.mds_asok(["flush", "journal"], mds_id)
458         self.fs.mds_stop()
459         self.fs.mds_fail()
460
461         # Pick a dentry and wipe out its key
462         # Because I did a 1 bit split, I know one frag will be named <inode>.01000000
463         frag_obj_id = "{0:x}.01000000".format(dir_ino)
464         keys = self._dirfrag_keys(frag_obj_id)
465         victim_key = keys[7]  # arbitrary choice
466         log.info("victim_key={0}".format(victim_key))
467         victim_dentry = victim_key.split("_head")[0]
468         self.fs.rados(["rmomapkey", frag_obj_id, victim_key])
469
470         # Start filesystem back up, observe that the file appears to be gone in an `ls`
471         self.fs.mds_restart()
472         self.fs.wait_for_daemons()
473         self.mount_a.mount()
474         self.mount_a.wait_until_mounted()
475         files = self.mount_a.run_shell(["ls", "subdir/"]).stdout.getvalue().strip().split("\n")
476         self.assertListEqual(sorted(files), sorted(list(set(file_names) - set([victim_dentry]))))
477
478         # Stop the filesystem
479         self.mount_a.umount_wait()
480         self.fs.mds_stop()
481         self.fs.mds_fail()
482
483         # Run data-scan, observe that it inserts our dentry back into the correct fragment
484         # by checking the omap now has the dentry's key again
485         self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()])
486         self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()])
487         self.assertIn(victim_key, self._dirfrag_keys(frag_obj_id))
488
489         # Start the filesystem and check that the dentry we deleted is now once again visible
490         # and points to the correct file data.
491         self.fs.mds_restart()
492         self.fs.wait_for_daemons()
493         self.mount_a.mount()
494         self.mount_a.wait_until_mounted()
495         out = self.mount_a.run_shell(["cat", "subdir/{0}".format(victim_dentry)]).stdout.getvalue().strip()
496         self.assertEqual(out, victim_dentry)
497
498         # Finally, close the loop by checking our injected dentry survives a merge
499         mds_id = self.fs.get_active_names()[0]
500         self.mount_a.ls("subdir")  # Do an ls to ensure both frags are in cache so the merge will work
501         self.fs.mds_asok(["dirfrag", "merge", "/subdir", "0/0"], mds_id)
502         self.fs.mds_asok(["flush", "journal"], mds_id)
503         frag_obj_id = "{0:x}.00000000".format(dir_ino)
504         keys = self._dirfrag_keys(frag_obj_id)
505         self.assertListEqual(sorted(keys), sorted(["%s_head" % f for f in file_names]))
506
507     @for_teuthology
508     def test_parallel_execution(self):
509         self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 25), workers=7)
510
511     def test_pg_files(self):
512         """
513         That the pg files command tells us which files are associated with
514         a particular PG
515         """
516         file_count = 20
517         self.mount_a.run_shell(["mkdir", "mydir"])
518         self.mount_a.create_n_files("mydir/myfile", file_count)
519
520         # Some files elsewhere in the system that we will ignore
521         # to check that the tool is filtering properly
522         self.mount_a.run_shell(["mkdir", "otherdir"])
523         self.mount_a.create_n_files("otherdir/otherfile", file_count)
524
525         pgs_to_files = defaultdict(list)
526         # Rough (slow) reimplementation of the logic
527         for i in range(0, file_count):
528             file_path = "mydir/myfile_{0}".format(i)
529             ino = self.mount_a.path_to_ino(file_path)
530             obj = "{0:x}.{1:08x}".format(ino, 0)
531             pgid = json.loads(self.fs.mon_manager.raw_cluster_cmd(
532                 "osd", "map", self.fs.get_data_pool_name(), obj,
533                 "--format=json-pretty"
534             ))['pgid']
535             pgs_to_files[pgid].append(file_path)
536             log.info("{0}: {1}".format(file_path, pgid))
537
538         pg_count = self.fs.get_pgs_per_fs_pool()
539         for pg_n in range(0, pg_count):
540             pg_str = "{0}.{1}".format(self.fs.get_data_pool_id(), pg_n)
541             out = self.fs.data_scan(["pg_files", "mydir", pg_str])
542             lines = [l for l in out.split("\n") if l]
543             log.info("{0}: {1}".format(pg_str, lines))
544             self.assertSetEqual(set(lines), set(pgs_to_files[pg_str]))
545
546     def test_scan_links(self):
547         """
548         The scan_links command fixes linkage errors
549         """
550         self.mount_a.run_shell(["mkdir", "testdir1"])
551         self.mount_a.run_shell(["mkdir", "testdir2"])
552         dir1_ino = self.mount_a.path_to_ino("testdir1")
553         dir2_ino = self.mount_a.path_to_ino("testdir2")
554         dirfrag1_oid = "{0:x}.00000000".format(dir1_ino)
555         dirfrag2_oid = "{0:x}.00000000".format(dir2_ino)
556
557         self.mount_a.run_shell(["touch", "testdir1/file1"])
558         self.mount_a.run_shell(["ln", "testdir1/file1", "testdir1/link1"])
559         self.mount_a.run_shell(["ln", "testdir1/file1", "testdir2/link2"])
560
561         mds_id = self.fs.get_active_names()[0]
562         self.fs.mds_asok(["flush", "journal"], mds_id)
563
564         dirfrag1_keys = self._dirfrag_keys(dirfrag1_oid)
565
566         # introduce duplicated primary link
567         file1_key = "file1_head"
568         self.assertIn(file1_key, dirfrag1_keys)
569         file1_omap_data = self.fs.rados(["getomapval", dirfrag1_oid, file1_key, '-'])
570         self.fs.rados(["setomapval", dirfrag2_oid, file1_key], stdin_data=file1_omap_data)
571         self.assertIn(file1_key, self._dirfrag_keys(dirfrag2_oid))
572
573         # remove a remote link, make inode link count incorrect
574         link1_key = 'link1_head'
575         self.assertIn(link1_key, dirfrag1_keys)
576         self.fs.rados(["rmomapkey", dirfrag1_oid, link1_key])
577
578         # increase good primary link's version
579         self.mount_a.run_shell(["touch", "testdir1/file1"])
580         self.mount_a.umount_wait()
581
582         self.fs.mds_asok(["flush", "journal"], mds_id)
583         self.fs.mds_stop()
584         self.fs.mds_fail()
585
586         # repair linkage errors
587         self.fs.data_scan(["scan_links"])
588
589         # primary link in testdir2 was deleted?
590         self.assertNotIn(file1_key, self._dirfrag_keys(dirfrag2_oid))
591
592         self.fs.mds_restart()
593         self.fs.wait_for_daemons()
594
595         self.mount_a.mount()
596         self.mount_a.wait_until_mounted()
597
598         # link count was adjusted?
599         file1_nlink = self.mount_a.path_to_nlink("testdir1/file1")
600         self.assertEqual(file1_nlink, 2)