4 from textwrap import dedent
9 from teuthology.orchestra.run import CommandFailedError, Raw
10 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
12 log = logging.getLogger(__name__)
15 class TestStrays(CephFSTestCase):
21 # Range of different file sizes used in throttle test's workload
22 throttle_workload_size_range = 16
25 def test_ops_throttle(self):
26 self._test_throttling(self.OPS_THROTTLE)
29 def test_files_throttle(self):
30 self._test_throttling(self.FILES_THROTTLE)
32 def test_dir_deletion(self):
34 That when deleting a bunch of dentries and the containing
35 directory, everything gets purged.
36 Catches cases where the client might e.g. fail to trim
37 the unlinked dir from its cache.
40 create_script = dedent("""
43 mount_path = "{mount_path}"
46 file_count = {file_count}
47 os.mkdir(os.path.join(mount_path, subdir))
48 for i in xrange(0, file_count):
49 filename = "{{0}}_{{1}}.bin".format(i, size)
50 f = open(os.path.join(mount_path, subdir, filename), 'w')
54 mount_path=self.mount_a.mountpoint,
59 self.mount_a.run_python(create_script)
61 # That the dirfrag object is created
62 self.fs.mds_asok(["flush", "journal"])
63 dir_ino = self.mount_a.path_to_ino("delete_me")
64 self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0))
67 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
68 self.fs.mds_asok(["flush", "journal"])
70 # That all the removed files get created as strays
71 strays = self.get_mdc_stat("strays_created")
72 self.assertEqual(strays, file_count + 1)
74 # That the strays all get enqueued for purge
75 self.wait_until_equal(
76 lambda: self.get_mdc_stat("strays_enqueued"),
82 # That all the purge operations execute
83 self.wait_until_equal(
84 lambda: self.get_stat("purge_queue", "pq_executed"),
89 # That finally, the directory metadata object is gone
90 self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0))
92 # That finally, the data objects are all gone
93 self.await_data_pool_empty()
95 def _test_throttling(self, throttle_type):
98 return self._do_test_throttling(throttle_type)
100 for l in self.data_log:
101 log.info(",".join([l_.__str__() for l_ in l]))
104 def _do_test_throttling(self, throttle_type):
106 That the mds_max_purge_ops setting is respected
109 def set_throttles(files, ops):
111 Helper for updating ops/files limits, and calculating effective
112 ops_per_pg setting to give the same ops limit.
114 self.set_conf('mds', 'mds_max_purge_files', "%d" % files)
115 self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops)
117 pgs = self.fs.mon_manager.get_pool_property(
118 self.fs.get_data_pool_name(),
121 ops_per_pg = float(ops) / pgs
122 self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg)
124 # Test conditions depend on what we're going to be exercising.
125 # * Lift the threshold on whatever throttle we are *not* testing, so
126 # that the throttle of interest is the one that will be the bottleneck
127 # * Create either many small files (test file count throttling) or fewer
128 # large files (test op throttling)
129 if throttle_type == self.OPS_THROTTLE:
130 set_throttles(files=100000000, ops=16)
131 size_unit = 1024 * 1024 # big files, generate lots of ops
132 file_multiplier = 100
133 elif throttle_type == self.FILES_THROTTLE:
134 # The default value of file limit is pretty permissive, so to avoid
135 # the test running too fast, create lots of files and set the limit
137 set_throttles(ops=100000000, files=6)
138 size_unit = 1024 # small, numerous files
139 file_multiplier = 200
141 raise NotImplemented(throttle_type)
143 # Pick up config changes
144 self.fs.mds_fail_restart()
145 self.fs.wait_for_daemons()
147 create_script = dedent("""
150 mount_path = "{mount_path}"
152 size_unit = {size_unit}
153 file_multiplier = {file_multiplier}
154 os.mkdir(os.path.join(mount_path, subdir))
155 for i in xrange(0, file_multiplier):
156 for size in xrange(0, {size_range}*size_unit, size_unit):
157 filename = "{{0}}_{{1}}.bin".format(i, size / size_unit)
158 f = open(os.path.join(mount_path, subdir, filename), 'w')
162 mount_path=self.mount_a.mountpoint,
164 file_multiplier=file_multiplier,
165 size_range=self.throttle_workload_size_range
168 self.mount_a.run_python(create_script)
170 # We will run the deletion in the background, to reduce the risk of it completing before
171 # we have started monitoring the stray statistics.
173 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
174 self.fs.mds_asok(["flush", "journal"])
176 background_thread = gevent.spawn(background)
178 total_inodes = file_multiplier * self.throttle_workload_size_range + 1
179 mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds'))
180 mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds'))
182 # During this phase we look for the concurrent ops to exceed half
183 # the limit (a heuristic) and not exceed the limit (a correctness
191 stats = self.fs.mds_asok(['perf', 'dump'])
192 mdc_stats = stats['mds_cache']
193 pq_stats = stats['purge_queue']
194 if elapsed >= purge_timeout:
195 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats))
197 num_strays = mdc_stats['num_strays']
198 num_strays_purging = pq_stats['pq_executing']
199 num_purge_ops = pq_stats['pq_executing_ops']
201 self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops])
203 files_high_water = max(files_high_water, num_strays_purging)
204 ops_high_water = max(ops_high_water, num_purge_ops)
206 total_strays_created = mdc_stats['strays_created']
207 total_strays_purged = pq_stats['pq_executed']
209 if total_strays_purged == total_inodes:
210 log.info("Complete purge in {0} seconds".format(elapsed))
212 elif total_strays_purged > total_inodes:
213 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats))
215 if throttle_type == self.OPS_THROTTLE:
216 # 11 is filer_max_purge_ops plus one for the backtrace:
217 # limit is allowed to be overshot by this much.
218 if num_purge_ops > mds_max_purge_ops + 11:
219 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
220 num_purge_ops, mds_max_purge_ops
222 elif throttle_type == self.FILES_THROTTLE:
223 if num_strays_purging > mds_max_purge_files:
224 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
225 num_strays_purging, mds_max_purge_files
228 raise NotImplemented(throttle_type)
230 log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
231 num_strays_purging, num_strays,
232 total_strays_purged, total_strays_created
237 background_thread.join()
239 # Check that we got up to a respectable rate during the purge. This is totally
240 # racy, but should be safeish unless the cluster is pathologically slow, or
241 # insanely fast such that the deletions all pass before we have polled the
243 if throttle_type == self.OPS_THROTTLE:
244 if ops_high_water < mds_max_purge_ops / 2:
245 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
246 ops_high_water, mds_max_purge_ops
248 elif throttle_type == self.FILES_THROTTLE:
249 if files_high_water < mds_max_purge_files / 2:
250 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
251 ops_high_water, mds_max_purge_files
254 # Sanity check all MDC stray stats
255 stats = self.fs.mds_asok(['perf', 'dump'])
256 mdc_stats = stats['mds_cache']
257 pq_stats = stats['purge_queue']
258 self.assertEqual(mdc_stats['num_strays'], 0)
259 self.assertEqual(mdc_stats['num_strays_delayed'], 0)
260 self.assertEqual(pq_stats['pq_executing'], 0)
261 self.assertEqual(pq_stats['pq_executing_ops'], 0)
262 self.assertEqual(mdc_stats['strays_created'], total_inodes)
263 self.assertEqual(mdc_stats['strays_enqueued'], total_inodes)
264 self.assertEqual(pq_stats['pq_executed'], total_inodes)
266 def get_mdc_stat(self, name, mds_id=None):
267 return self.get_stat("mds_cache", name, mds_id)
269 def get_stat(self, subsys, name, mds_id=None):
270 return self.fs.mds_asok(['perf', 'dump', subsys, name],
271 mds_id=mds_id)[subsys][name]
273 def _wait_for_counter(self, subsys, counter, expect_val, timeout=60,
275 self.wait_until_equal(
276 lambda: self.get_stat(subsys, counter, mds_id),
277 expect_val=expect_val, timeout=timeout,
278 reject_fn=lambda x: x > expect_val
281 def test_open_inode(self):
283 That the case of a dentry unlinked while a client holds an
284 inode open is handled correctly.
286 The inode should be moved into a stray dentry, while the original
287 dentry and directory should be purged.
289 The inode's data should be purged when the client eventually closes
292 mount_a_client_id = self.mount_a.get_global_id()
294 # Write some bytes to a file
298 p = self.mount_a.open_background("open_file")
299 self.mount_a.write_n_mb("open_file", size_mb)
300 open_file_ino = self.mount_a.path_to_ino("open_file")
302 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
305 self.mount_a.run_shell(["rm", "-f", "open_file"])
307 # Wait to see the stray count increment
308 self.wait_until_equal(
309 lambda: self.get_mdc_stat("num_strays"),
310 expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
312 # See that while the stray count has incremented, none have passed
313 # on to the purge queue
314 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
315 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
317 # See that the client still holds 2 caps
318 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
320 # See that the data objects remain in the data pool
321 self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024))
324 self.mount_a.kill_background(p)
326 # Wait to see the client cap count decrement
327 self.wait_until_equal(
328 lambda: self.get_session(mount_a_client_id)['num_caps'],
329 expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1
331 # Wait to see the purge counter increment, stray count go to zero
332 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
333 self.wait_until_equal(
334 lambda: self.get_mdc_stat("num_strays"),
335 expect_val=0, timeout=6, reject_fn=lambda x: x > 1
337 self._wait_for_counter("purge_queue", "pq_executed", 1)
339 # See that the data objects no longer exist
340 self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024))
342 self.await_data_pool_empty()
344 def test_hardlink_reintegration(self):
346 That removal of primary dentry of hardlinked inode results
347 in reintegration of inode into the previously-remote dentry,
348 rather than lingering as a stray indefinitely.
350 # Write some bytes to file_a
352 self.mount_a.run_shell(["mkdir", "dir_1"])
353 self.mount_a.write_n_mb("dir_1/file_a", size_mb)
354 ino = self.mount_a.path_to_ino("dir_1/file_a")
356 # Create a hardlink named file_b
357 self.mount_a.run_shell(["mkdir", "dir_2"])
358 self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
359 self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino)
362 self.fs.mds_asok(['flush', 'journal'])
364 # See that backtrace for the file points to the file_a path
365 pre_unlink_bt = self.fs.read_backtrace(ino)
366 self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a")
368 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
369 self.mount_a.umount_wait()
370 self.fs.mds_asok(['flush', 'journal'])
371 self.fs.mds_fail_restart()
372 self.fs.wait_for_daemons()
376 self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"])
378 # See that a stray was created
379 self.assertEqual(self.get_mdc_stat("num_strays"), 1)
380 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
382 # Wait, see that data objects are still present (i.e. that the
383 # stray did not advance to purging given time)
385 self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
386 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
388 # See that before reintegration, the inode's backtrace points to a stray dir
389 self.fs.mds_asok(['flush', 'journal'])
390 self.assertTrue(self.get_backtrace_path(ino).startswith("stray"))
392 last_reintegrated = self.get_mdc_stat("strays_reintegrated")
394 # Do a metadata operation on the remaining link (mv is heavy handed, but
395 # others like touch may be satisfied from caps without poking MDS)
396 self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
398 # Stray reintegration should happen as a result of the eval_remote call
399 # on responding to a client request.
400 self.wait_until_equal(
401 lambda: self.get_mdc_stat("num_strays"),
406 # See the reintegration counter increment
407 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
408 self.assertGreater(curr_reintegrated, last_reintegrated)
409 last_reintegrated = curr_reintegrated
412 self.fs.mds_asok(['flush', 'journal'])
414 # See that the backtrace for the file points to the remaining link's path
415 post_reint_bt = self.fs.read_backtrace(ino)
416 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c")
418 # mds should reintegrates stray when unlink finishes
419 self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
420 self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"])
422 # Stray reintegration should happen as a result of the notify_stray call
423 # on completion of unlink
424 self.wait_until_equal(
425 lambda: self.get_mdc_stat("num_strays"),
430 # See the reintegration counter increment
431 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
432 self.assertGreater(curr_reintegrated, last_reintegrated)
433 last_reintegrated = curr_reintegrated
436 self.fs.mds_asok(['flush', 'journal'])
438 # See that the backtrace for the file points to the newest link's path
439 post_reint_bt = self.fs.read_backtrace(ino)
440 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d")
442 # Now really delete it
443 self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"])
444 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
445 self._wait_for_counter("purge_queue", "pq_executed", 1)
447 self.assert_purge_idle()
448 self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
450 # We caused the inode to go stray 3 times
451 self.assertEqual(self.get_mdc_stat("strays_created"), 3)
452 # We purged it at the last
453 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1)
455 def test_mv_hardlink_cleanup(self):
457 That when doing a rename from A to B, and B has hardlinks,
458 then we make a stray for B which is then reintegrated
459 into one of his hardlinks.
461 # Create file_a, file_b, and a hardlink to file_b
463 self.mount_a.write_n_mb("file_a", size_mb)
464 file_a_ino = self.mount_a.path_to_ino("file_a")
466 self.mount_a.write_n_mb("file_b", size_mb)
467 file_b_ino = self.mount_a.path_to_ino("file_b")
469 self.mount_a.run_shell(["ln", "file_b", "linkto_b"])
470 self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino)
473 self.mount_a.run_shell(["mv", "file_a", "file_b"])
475 # Stray reintegration should happen as a result of the notify_stray call on
476 # completion of rename
477 self.wait_until_equal(
478 lambda: self.get_mdc_stat("num_strays"),
483 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
484 self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1)
486 # No data objects should have been deleted, as both files still have linkage.
487 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
488 self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024))
490 self.fs.mds_asok(['flush', 'journal'])
492 post_reint_bt = self.fs.read_backtrace(file_b_ino)
493 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b")
495 def _setup_two_ranks(self):
497 self.fs.set_max_mds(2)
499 # See that we have two active MDSs
500 self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
501 reject_fn=lambda v: v > 2 or v < 1)
503 active_mds_names = self.fs.get_active_names()
504 rank_0_id = active_mds_names[0]
505 rank_1_id = active_mds_names[1]
506 log.info("Ranks 0 and 1 are {0} and {1}".format(
507 rank_0_id, rank_1_id))
509 # Get rid of other MDS daemons so that it's easier to know which
510 # daemons to expect in which ranks after restarts
511 for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}:
512 self.mds_cluster.mds_stop(unneeded_mds)
513 self.mds_cluster.mds_fail(unneeded_mds)
515 return rank_0_id, rank_1_id
517 def _force_migrate(self, to_id, path, watch_ino):
519 :param to_id: MDS id to move it to
520 :param path: Filesystem path (string) to move
521 :param watch_ino: Inode number to look for at destination to confirm move
524 self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path])
526 # Poll the MDS cache dump to watch for the export completing
531 data = self.fs.mds_asok(["dump", "cache"], to_id)
532 for inode_data in data:
533 if inode_data['ino'] == watch_ino:
534 log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2)))
535 if inode_data['is_auth'] is True:
540 if migrate_elapsed > migrate_timeout:
541 raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed))
546 def _is_stopped(self, rank):
547 mds_map = self.fs.get_mds_map()
548 return rank not in [i['rank'] for i in mds_map['info'].values()]
550 def test_purge_on_shutdown(self):
552 That when an MDS rank is shut down, its purge queue is
553 drained in the process.
555 rank_0_id, rank_1_id = self._setup_two_ranks()
557 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
558 self.mds_cluster.mds_fail_restart(rank_1_id)
559 self.fs.wait_for_daemons()
563 self.mount_a.create_n_files("delete_me/file", file_count)
565 self._force_migrate(rank_1_id, "delete_me",
566 self.mount_a.path_to_ino("delete_me/file_0"))
568 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
569 self.mount_a.umount_wait()
571 # See all the strays go into purge queue
572 self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id)
573 self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id)
574 self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0)
576 # See nothing get purged from the purge queue (yet)
578 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
581 self.fs.set_max_mds(1)
582 self.fs.deactivate(1)
584 # It shouldn't proceed past stopping because its still not allowed
587 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
588 self.assertFalse(self._is_stopped(1))
590 # Permit the daemon to start purging again
591 self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id),
593 "--mds_max_purge_files 100")
595 # It should now proceed through shutdown
596 self.wait_until_true(
597 lambda: self._is_stopped(1),
601 # ...and in the process purge all that data
602 self.await_data_pool_empty()
604 def test_migration_on_shutdown(self):
606 That when an MDS rank is shut down, any non-purgeable strays
607 get migrated to another rank.
610 rank_0_id, rank_1_id = self._setup_two_ranks()
612 # Create a non-purgeable stray in a ~mds1 stray directory
613 # by doing a hard link and deleting the original file
614 self.mount_a.run_shell(["mkdir", "dir_1", "dir_2"])
615 self.mount_a.run_shell(["touch", "dir_1/original"])
616 self.mount_a.run_shell(["ln", "dir_1/original", "dir_2/linkto"])
618 self._force_migrate(rank_1_id, "dir_1",
619 self.mount_a.path_to_ino("dir_1/original"))
621 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
622 self.mount_a.umount_wait()
623 self.fs.mds_asok(['flush', 'journal'], rank_0_id)
624 self.fs.mds_asok(['flush', 'journal'], rank_1_id)
625 self.fs.mds_fail_restart()
626 self.fs.wait_for_daemons()
628 active_mds_names = self.fs.get_active_names()
629 rank_0_id = active_mds_names[0]
630 rank_1_id = active_mds_names[1]
634 self.mount_a.run_shell(["rm", "-f", "dir_1/original"])
635 self.mount_a.umount_wait()
637 self._wait_for_counter("mds_cache", "strays_created", 1,
641 self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1")
642 self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1")
644 # Wait til we get to a single active MDS mdsmap state
645 self.wait_until_true(lambda: self._is_stopped(1), timeout=120)
647 # See that the stray counter on rank 0 has incremented
648 self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
650 def assert_backtrace(self, ino, expected_path):
652 Assert that the backtrace in the data pool for an inode matches
653 an expected /foo/bar path.
655 expected_elements = expected_path.strip("/").split("/")
656 bt = self.fs.read_backtrace(ino)
657 actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']]))
658 self.assertListEqual(expected_elements, actual_elements)
660 def get_backtrace_path(self, ino):
661 bt = self.fs.read_backtrace(ino)
662 elements = reversed([dn['dname'] for dn in bt['ancestors']])
663 return "/".join(elements)
665 def assert_purge_idle(self):
667 Assert that the MDS perf counters indicate no strays exist and
668 no ongoing purge activity. Sanity check for when PurgeQueue should
671 mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
672 pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
673 self.assertEqual(mdc_stats["num_strays"], 0)
674 self.assertEqual(mdc_stats["num_strays_delayed"], 0)
675 self.assertEqual(pq_stats["pq_executing"], 0)
676 self.assertEqual(pq_stats["pq_executing_ops"], 0)
678 def test_mv_cleanup(self):
680 That when doing a rename from A to B, and B has no hardlinks,
681 then we make a stray for B and purge him.
683 # Create file_a and file_b, write some to both
685 self.mount_a.write_n_mb("file_a", size_mb)
686 file_a_ino = self.mount_a.path_to_ino("file_a")
687 self.mount_a.write_n_mb("file_b", size_mb)
688 file_b_ino = self.mount_a.path_to_ino("file_b")
690 self.fs.mds_asok(['flush', 'journal'])
691 self.assert_backtrace(file_a_ino, "file_a")
692 self.assert_backtrace(file_b_ino, "file_b")
695 self.mount_a.run_shell(['mv', 'file_a', 'file_b'])
697 # See that stray counter increments
698 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
699 # Wait for purge counter to increment
700 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
701 self._wait_for_counter("purge_queue", "pq_executed", 1)
703 self.assert_purge_idle()
705 # file_b should have been purged
706 self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024))
708 # Backtrace should have updated from file_a to file_b
709 self.fs.mds_asok(['flush', 'journal'])
710 self.assert_backtrace(file_a_ino, "file_b")
712 # file_a's data should still exist
713 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
715 def _pool_df(self, pool_name):
721 "max_avail": 19630292406,
725 :param pool_name: Which pool (must exist)
727 out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty")
728 for p in json.loads(out)['pools']:
729 if p['name'] == pool_name:
732 raise RuntimeError("Pool '{0}' not found".format(pool_name))
734 def await_data_pool_empty(self):
735 self.wait_until_true(
736 lambda: self._pool_df(
737 self.fs.get_data_pool_name()
741 def test_snapshot_remove(self):
743 That removal of a snapshot that references a now-unlinked file results
744 in purging on the stray for the file.
747 self.fs.mon_manager.raw_cluster_cmd("mds", "set", "allow_new_snaps", "true",
748 "--yes-i-really-mean-it")
750 # Create a dir with a file in it
752 self.mount_a.run_shell(["mkdir", "snapdir"])
753 self.mount_a.run_shell(["mkdir", "snapdir/subdir"])
754 self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024)
755 file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a")
758 self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"])
760 # Cause the head revision to deviate from the snapshot
761 self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb)
763 # Flush the journal so that backtraces, dirfrag objects will actually be written
764 self.fs.mds_asok(["flush", "journal"])
767 self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
768 self.mount_a.run_shell(["rmdir", "snapdir/subdir"])
770 # Unmount the client because when I come back to check the data is still
771 # in the file I don't want to just see what's in the page cache.
772 self.mount_a.umount_wait()
774 self.assertEqual(self.get_mdc_stat("strays_created"), 2)
776 # FIXME: at this stage we see a purge and the stray count drops to
777 # zero, but there's actually still a stray, so at the very
778 # least the StrayManager stats code is slightly off
782 # See that the data from the snapshotted revision of the file is still present
784 self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024)
786 # Remove the snapshot
787 self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"])
789 # Purging file_a doesn't happen until after we've flushed the journal, because
790 # it is referenced by the snapshotted subdir, and the snapshot isn't really
791 # gone until the journal references to it are gone
792 self.fs.mds_asok(["flush", "journal"])
794 # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs.
795 # See also: http://tracker.ceph.com/issues/20072
796 self.wait_until_true(
797 lambda: self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024),
801 # See that a purge happens now
802 self._wait_for_counter("mds_cache", "strays_enqueued", 2)
803 self._wait_for_counter("purge_queue", "pq_executed", 2)
805 self.await_data_pool_empty()
807 def test_fancy_layout(self):
809 purge stray file with fancy layout
812 file_name = "fancy_layout_file"
813 self.mount_a.run_shell(["touch", file_name])
815 file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608"
816 self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout)
818 # 35MB requires 7 objects
820 self.mount_a.write_n_mb(file_name, size_mb)
822 self.mount_a.run_shell(["rm", "-f", file_name])
823 self.fs.mds_asok(["flush", "journal"])
825 # can't use self.fs.data_objects_absent here, it does not support fancy layout
826 self.await_data_pool_empty()
828 def test_dirfrag_limit(self):
830 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
832 That fragmentation (forced) will allow more entries to be created.
834 That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged.
837 self.fs.set_allow_dirfrags(True)
840 for mds in self.fs.get_daemon_names():
841 self.fs.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT)], mds)
844 self.mount_a.run_python(dedent("""
846 path = os.path.join("{path}", "subdir")
848 for n in range(0, {file_count}):
849 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
851 path=self.mount_a.mountpoint,
852 file_count=LOW_LIMIT+1
854 except CommandFailedError:
857 raise RuntimeError("fragment size exceeded")
859 # Now test that we can go beyond the limit if we fragment the directory
861 self.mount_a.run_python(dedent("""
863 path = os.path.join("{path}", "subdir2")
865 for n in range(0, {file_count}):
866 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
867 dfd = os.open(path, os.O_DIRECTORY)
870 path=self.mount_a.mountpoint,
874 # Ensure that subdir2 is fragmented
875 mds_id = self.fs.get_active_names()[0]
876 self.fs.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id)
878 # remount+flush (release client caps)
879 self.mount_a.umount_wait()
880 self.fs.mds_asok(["flush", "journal"], mds_id)
882 self.mount_a.wait_until_mounted()
884 # Create 50% more files than the current fragment limit
885 self.mount_a.run_python(dedent("""
887 path = os.path.join("{path}", "subdir2")
888 for n in range({file_count}, ({file_count}*3)//2):
889 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
891 path=self.mount_a.mountpoint,
895 # Now test the stray directory size is limited and recovers
896 strays_before = self.get_mdc_stat("strays_created")
898 self.mount_a.run_python(dedent("""
900 path = os.path.join("{path}", "subdir3")
902 for n in range({file_count}):
903 fpath = os.path.join(path, "%s" % n)
909 path=self.mount_a.mountpoint,
910 file_count=LOW_LIMIT*10 # 10 stray directories, should collide before this count
912 except CommandFailedError:
915 raise RuntimeError("fragment size exceeded")
917 strays_after = self.get_mdc_stat("strays_created")
918 self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT)
920 self._wait_for_counter("mds_cache", "strays_enqueued", strays_after)
921 self._wait_for_counter("purge_queue", "pq_executed", strays_after)
923 self.mount_a.run_python(dedent("""
925 path = os.path.join("{path}", "subdir4")
927 for n in range({file_count}):
928 fpath = os.path.join(path, "%s" % n)
934 path=self.mount_a.mountpoint,
938 def test_purge_queue_upgrade(self):
940 That when starting on a system with no purge queue in the metadata
941 pool, we silently create one.
945 self.mds_cluster.mds_stop()
946 self.mds_cluster.mds_fail()
947 self.fs.rados(["rm", "500.00000000"])
948 self.mds_cluster.mds_restart()
949 self.fs.wait_for_daemons()
951 def test_purge_queue_op_rate(self):
953 A busy purge queue is meant to aggregate operations sufficiently
954 that our RADOS ops to the metadata pool are not O(files). Check
959 # For low rates of deletion, the rate of metadata ops actually
960 # will be o(files), so to see the desired behaviour we have to give
961 # the system a significant quantity, i.e. an order of magnitude
962 # more than the number of files it will purge at one time.
966 self.set_conf('mds', 'mds_bal_frag', 'false')
967 self.set_conf('mds', 'mds_max_purge_files', "%d" % max_purge_files)
968 self.fs.mds_fail_restart()
969 self.fs.wait_for_daemons()
974 self.mount_a.run_shell(["mkdir", "phase1"])
975 self.mount_a.create_n_files("phase1/file", phase_1_files)
977 self.mount_a.run_shell(["mkdir", "phase2"])
978 self.mount_a.create_n_files("phase2/file", phase_2_files)
980 def unlink_and_count_ops(path, expected_deletions):
981 initial_ops = self.get_stat("objecter", "op")
982 initial_pq_executed = self.get_stat("purge_queue", "pq_executed")
984 self.mount_a.run_shell(["rm", "-rf", path])
986 self._wait_for_counter(
987 "purge_queue", "pq_executed", initial_pq_executed + expected_deletions
990 final_ops = self.get_stat("objecter", "op")
992 # Calculation of the *overhead* operations, i.e. do not include
993 # the operations where we actually delete files.
994 return final_ops - initial_ops - expected_deletions
996 self.fs.mds_asok(['flush', 'journal'])
997 phase1_ops = unlink_and_count_ops("phase1/", phase_1_files + 1)
999 self.fs.mds_asok(['flush', 'journal'])
1000 phase2_ops = unlink_and_count_ops("phase2/", phase_2_files + 1)
1002 log.info("Phase 1: {0}".format(phase1_ops))
1003 log.info("Phase 2: {0}".format(phase2_ops))
1005 # The success criterion is that deleting double the number
1006 # of files doesn't generate double the number of overhead ops
1007 # -- this comparison is a rough approximation of that rule.
1008 self.assertTrue(phase2_ops < phase1_ops * 1.25)
1010 # Finally, check that our activity did include properly quiescing
1011 # the queue (i.e. call to Journaler::write_head in the right place),
1012 # by restarting the MDS and checking that it doesn't try re-executing
1013 # any of the work we did.
1014 self.fs.mds_asok(['flush', 'journal']) # flush to ensure no strays
1016 self.fs.mds_fail_restart()
1017 self.fs.wait_for_daemons()
1019 self.assertEqual(self.get_stat("purge_queue", "pq_executed"), 0)
1021 def test_replicated_delete_speed(self):
1023 That deletions of replicated metadata are not pathologically slow
1025 rank_0_id, rank_1_id = self._setup_two_ranks()
1027 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
1028 self.mds_cluster.mds_fail_restart(rank_1_id)
1029 self.fs.wait_for_daemons()
1033 self.mount_a.create_n_files("delete_me/file", file_count)
1035 self._force_migrate(rank_1_id, "delete_me",
1036 self.mount_a.path_to_ino("delete_me/file_0"))
1038 begin = datetime.datetime.now()
1039 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
1040 end = datetime.datetime.now()
1042 # What we're really checking here is that we are completing client
1043 # operations immediately rather than delaying until the next tick.
1044 tick_period = float(self.fs.get_config("mds_tick_interval",
1045 service_type="mds"))
1047 duration = (end - begin).total_seconds()
1048 self.assertLess(duration, (file_count * tick_period) * 0.25)