X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fqa%2Ftasks%2Fcephfs%2Ftest_strays.py;fp=src%2Fceph%2Fqa%2Ftasks%2Fcephfs%2Ftest_strays.py;h=b64f3e931dca692e6e3681cf2e270c04f3987c5d;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/qa/tasks/cephfs/test_strays.py b/src/ceph/qa/tasks/cephfs/test_strays.py new file mode 100644 index 0000000..b64f3e9 --- /dev/null +++ b/src/ceph/qa/tasks/cephfs/test_strays.py @@ -0,0 +1,1049 @@ +import json +import time +import logging +from textwrap import dedent +import datetime +import gevent +import datetime + +from teuthology.orchestra.run import CommandFailedError, Raw +from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology + +log = logging.getLogger(__name__) + + +class TestStrays(CephFSTestCase): + MDSS_REQUIRED = 2 + + OPS_THROTTLE = 1 + FILES_THROTTLE = 2 + + # Range of different file sizes used in throttle test's workload + throttle_workload_size_range = 16 + + @for_teuthology + def test_ops_throttle(self): + self._test_throttling(self.OPS_THROTTLE) + + @for_teuthology + def test_files_throttle(self): + self._test_throttling(self.FILES_THROTTLE) + + def test_dir_deletion(self): + """ + That when deleting a bunch of dentries and the containing + directory, everything gets purged. + Catches cases where the client might e.g. fail to trim + the unlinked dir from its cache. + """ + file_count = 1000 + create_script = dedent(""" + import os + + mount_path = "{mount_path}" + subdir = "delete_me" + size = {size} + file_count = {file_count} + os.mkdir(os.path.join(mount_path, subdir)) + for i in xrange(0, file_count): + filename = "{{0}}_{{1}}.bin".format(i, size) + f = open(os.path.join(mount_path, subdir, filename), 'w') + f.write(size * 'x') + f.close() + """.format( + mount_path=self.mount_a.mountpoint, + size=1024, + file_count=file_count + )) + + self.mount_a.run_python(create_script) + + # That the dirfrag object is created + self.fs.mds_asok(["flush", "journal"]) + dir_ino = self.mount_a.path_to_ino("delete_me") + self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0)) + + # Remove everything + self.mount_a.run_shell(["rm", "-rf", "delete_me"]) + self.fs.mds_asok(["flush", "journal"]) + + # That all the removed files get created as strays + strays = self.get_mdc_stat("strays_created") + self.assertEqual(strays, file_count + 1) + + # That the strays all get enqueued for purge + self.wait_until_equal( + lambda: self.get_mdc_stat("strays_enqueued"), + strays, + timeout=600 + + ) + + # That all the purge operations execute + self.wait_until_equal( + lambda: self.get_stat("purge_queue", "pq_executed"), + strays, + timeout=600 + ) + + # That finally, the directory metadata object is gone + self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0)) + + # That finally, the data objects are all gone + self.await_data_pool_empty() + + def _test_throttling(self, throttle_type): + self.data_log = [] + try: + return self._do_test_throttling(throttle_type) + except: + for l in self.data_log: + log.info(",".join([l_.__str__() for l_ in l])) + raise + + def _do_test_throttling(self, throttle_type): + """ + That the mds_max_purge_ops setting is respected + """ + + def set_throttles(files, ops): + """ + Helper for updating ops/files limits, and calculating effective + ops_per_pg setting to give the same ops limit. + """ + self.set_conf('mds', 'mds_max_purge_files', "%d" % files) + self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops) + + pgs = self.fs.mon_manager.get_pool_property( + self.fs.get_data_pool_name(), + "pg_num" + ) + ops_per_pg = float(ops) / pgs + self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg) + + # Test conditions depend on what we're going to be exercising. + # * Lift the threshold on whatever throttle we are *not* testing, so + # that the throttle of interest is the one that will be the bottleneck + # * Create either many small files (test file count throttling) or fewer + # large files (test op throttling) + if throttle_type == self.OPS_THROTTLE: + set_throttles(files=100000000, ops=16) + size_unit = 1024 * 1024 # big files, generate lots of ops + file_multiplier = 100 + elif throttle_type == self.FILES_THROTTLE: + # The default value of file limit is pretty permissive, so to avoid + # the test running too fast, create lots of files and set the limit + # pretty low. + set_throttles(ops=100000000, files=6) + size_unit = 1024 # small, numerous files + file_multiplier = 200 + else: + raise NotImplemented(throttle_type) + + # Pick up config changes + self.fs.mds_fail_restart() + self.fs.wait_for_daemons() + + create_script = dedent(""" + import os + + mount_path = "{mount_path}" + subdir = "delete_me" + size_unit = {size_unit} + file_multiplier = {file_multiplier} + os.mkdir(os.path.join(mount_path, subdir)) + for i in xrange(0, file_multiplier): + for size in xrange(0, {size_range}*size_unit, size_unit): + filename = "{{0}}_{{1}}.bin".format(i, size / size_unit) + f = open(os.path.join(mount_path, subdir, filename), 'w') + f.write(size * 'x') + f.close() + """.format( + mount_path=self.mount_a.mountpoint, + size_unit=size_unit, + file_multiplier=file_multiplier, + size_range=self.throttle_workload_size_range + )) + + self.mount_a.run_python(create_script) + + # We will run the deletion in the background, to reduce the risk of it completing before + # we have started monitoring the stray statistics. + def background(): + self.mount_a.run_shell(["rm", "-rf", "delete_me"]) + self.fs.mds_asok(["flush", "journal"]) + + background_thread = gevent.spawn(background) + + total_inodes = file_multiplier * self.throttle_workload_size_range + 1 + mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds')) + mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds')) + + # During this phase we look for the concurrent ops to exceed half + # the limit (a heuristic) and not exceed the limit (a correctness + # condition). + purge_timeout = 600 + elapsed = 0 + files_high_water = 0 + ops_high_water = 0 + + while True: + stats = self.fs.mds_asok(['perf', 'dump']) + mdc_stats = stats['mds_cache'] + pq_stats = stats['purge_queue'] + if elapsed >= purge_timeout: + raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats)) + + num_strays = mdc_stats['num_strays'] + num_strays_purging = pq_stats['pq_executing'] + num_purge_ops = pq_stats['pq_executing_ops'] + + self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops]) + + files_high_water = max(files_high_water, num_strays_purging) + ops_high_water = max(ops_high_water, num_purge_ops) + + total_strays_created = mdc_stats['strays_created'] + total_strays_purged = pq_stats['pq_executed'] + + if total_strays_purged == total_inodes: + log.info("Complete purge in {0} seconds".format(elapsed)) + break + elif total_strays_purged > total_inodes: + raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats)) + else: + if throttle_type == self.OPS_THROTTLE: + # 11 is filer_max_purge_ops plus one for the backtrace: + # limit is allowed to be overshot by this much. + if num_purge_ops > mds_max_purge_ops + 11: + raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format( + num_purge_ops, mds_max_purge_ops + )) + elif throttle_type == self.FILES_THROTTLE: + if num_strays_purging > mds_max_purge_files: + raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format( + num_strays_purging, mds_max_purge_files + )) + else: + raise NotImplemented(throttle_type) + + log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format( + num_strays_purging, num_strays, + total_strays_purged, total_strays_created + )) + time.sleep(1) + elapsed += 1 + + background_thread.join() + + # Check that we got up to a respectable rate during the purge. This is totally + # racy, but should be safeish unless the cluster is pathologically slow, or + # insanely fast such that the deletions all pass before we have polled the + # statistics. + if throttle_type == self.OPS_THROTTLE: + if ops_high_water < mds_max_purge_ops / 2: + raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format( + ops_high_water, mds_max_purge_ops + )) + elif throttle_type == self.FILES_THROTTLE: + if files_high_water < mds_max_purge_files / 2: + raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format( + ops_high_water, mds_max_purge_files + )) + + # Sanity check all MDC stray stats + stats = self.fs.mds_asok(['perf', 'dump']) + mdc_stats = stats['mds_cache'] + pq_stats = stats['purge_queue'] + self.assertEqual(mdc_stats['num_strays'], 0) + self.assertEqual(mdc_stats['num_strays_delayed'], 0) + self.assertEqual(pq_stats['pq_executing'], 0) + self.assertEqual(pq_stats['pq_executing_ops'], 0) + self.assertEqual(mdc_stats['strays_created'], total_inodes) + self.assertEqual(mdc_stats['strays_enqueued'], total_inodes) + self.assertEqual(pq_stats['pq_executed'], total_inodes) + + def get_mdc_stat(self, name, mds_id=None): + return self.get_stat("mds_cache", name, mds_id) + + def get_stat(self, subsys, name, mds_id=None): + return self.fs.mds_asok(['perf', 'dump', subsys, name], + mds_id=mds_id)[subsys][name] + + def _wait_for_counter(self, subsys, counter, expect_val, timeout=60, + mds_id=None): + self.wait_until_equal( + lambda: self.get_stat(subsys, counter, mds_id), + expect_val=expect_val, timeout=timeout, + reject_fn=lambda x: x > expect_val + ) + + def test_open_inode(self): + """ + That the case of a dentry unlinked while a client holds an + inode open is handled correctly. + + The inode should be moved into a stray dentry, while the original + dentry and directory should be purged. + + The inode's data should be purged when the client eventually closes + it. + """ + mount_a_client_id = self.mount_a.get_global_id() + + # Write some bytes to a file + size_mb = 8 + + # Hold the file open + p = self.mount_a.open_background("open_file") + self.mount_a.write_n_mb("open_file", size_mb) + open_file_ino = self.mount_a.path_to_ino("open_file") + + self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) + + # Unlink the dentry + self.mount_a.run_shell(["rm", "-f", "open_file"]) + + # Wait to see the stray count increment + self.wait_until_equal( + lambda: self.get_mdc_stat("num_strays"), + expect_val=1, timeout=60, reject_fn=lambda x: x > 1) + + # See that while the stray count has incremented, none have passed + # on to the purge queue + self.assertEqual(self.get_mdc_stat("strays_created"), 1) + self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0) + + # See that the client still holds 2 caps + self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) + + # See that the data objects remain in the data pool + self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024)) + + # Now close the file + self.mount_a.kill_background(p) + + # Wait to see the client cap count decrement + self.wait_until_equal( + lambda: self.get_session(mount_a_client_id)['num_caps'], + expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1 + ) + # Wait to see the purge counter increment, stray count go to zero + self._wait_for_counter("mds_cache", "strays_enqueued", 1) + self.wait_until_equal( + lambda: self.get_mdc_stat("num_strays"), + expect_val=0, timeout=6, reject_fn=lambda x: x > 1 + ) + self._wait_for_counter("purge_queue", "pq_executed", 1) + + # See that the data objects no longer exist + self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024)) + + self.await_data_pool_empty() + + def test_hardlink_reintegration(self): + """ + That removal of primary dentry of hardlinked inode results + in reintegration of inode into the previously-remote dentry, + rather than lingering as a stray indefinitely. + """ + # Write some bytes to file_a + size_mb = 8 + self.mount_a.run_shell(["mkdir", "dir_1"]) + self.mount_a.write_n_mb("dir_1/file_a", size_mb) + ino = self.mount_a.path_to_ino("dir_1/file_a") + + # Create a hardlink named file_b + self.mount_a.run_shell(["mkdir", "dir_2"]) + self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"]) + self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino) + + # Flush journal + self.fs.mds_asok(['flush', 'journal']) + + # See that backtrace for the file points to the file_a path + pre_unlink_bt = self.fs.read_backtrace(ino) + self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a") + + # empty mds cache. otherwise mds reintegrates stray when unlink finishes + self.mount_a.umount_wait() + self.fs.mds_asok(['flush', 'journal']) + self.fs.mds_fail_restart() + self.fs.wait_for_daemons() + self.mount_a.mount() + + # Unlink file_a + self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"]) + + # See that a stray was created + self.assertEqual(self.get_mdc_stat("num_strays"), 1) + self.assertEqual(self.get_mdc_stat("strays_created"), 1) + + # Wait, see that data objects are still present (i.e. that the + # stray did not advance to purging given time) + time.sleep(30) + self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024)) + self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0) + + # See that before reintegration, the inode's backtrace points to a stray dir + self.fs.mds_asok(['flush', 'journal']) + self.assertTrue(self.get_backtrace_path(ino).startswith("stray")) + + last_reintegrated = self.get_mdc_stat("strays_reintegrated") + + # Do a metadata operation on the remaining link (mv is heavy handed, but + # others like touch may be satisfied from caps without poking MDS) + self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"]) + + # Stray reintegration should happen as a result of the eval_remote call + # on responding to a client request. + self.wait_until_equal( + lambda: self.get_mdc_stat("num_strays"), + expect_val=0, + timeout=60 + ) + + # See the reintegration counter increment + curr_reintegrated = self.get_mdc_stat("strays_reintegrated") + self.assertGreater(curr_reintegrated, last_reintegrated) + last_reintegrated = curr_reintegrated + + # Flush the journal + self.fs.mds_asok(['flush', 'journal']) + + # See that the backtrace for the file points to the remaining link's path + post_reint_bt = self.fs.read_backtrace(ino) + self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c") + + # mds should reintegrates stray when unlink finishes + self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"]) + self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"]) + + # Stray reintegration should happen as a result of the notify_stray call + # on completion of unlink + self.wait_until_equal( + lambda: self.get_mdc_stat("num_strays"), + expect_val=0, + timeout=60 + ) + + # See the reintegration counter increment + curr_reintegrated = self.get_mdc_stat("strays_reintegrated") + self.assertGreater(curr_reintegrated, last_reintegrated) + last_reintegrated = curr_reintegrated + + # Flush the journal + self.fs.mds_asok(['flush', 'journal']) + + # See that the backtrace for the file points to the newest link's path + post_reint_bt = self.fs.read_backtrace(ino) + self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d") + + # Now really delete it + self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"]) + self._wait_for_counter("mds_cache", "strays_enqueued", 1) + self._wait_for_counter("purge_queue", "pq_executed", 1) + + self.assert_purge_idle() + self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024)) + + # We caused the inode to go stray 3 times + self.assertEqual(self.get_mdc_stat("strays_created"), 3) + # We purged it at the last + self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1) + + def test_mv_hardlink_cleanup(self): + """ + That when doing a rename from A to B, and B has hardlinks, + then we make a stray for B which is then reintegrated + into one of his hardlinks. + """ + # Create file_a, file_b, and a hardlink to file_b + size_mb = 8 + self.mount_a.write_n_mb("file_a", size_mb) + file_a_ino = self.mount_a.path_to_ino("file_a") + + self.mount_a.write_n_mb("file_b", size_mb) + file_b_ino = self.mount_a.path_to_ino("file_b") + + self.mount_a.run_shell(["ln", "file_b", "linkto_b"]) + self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino) + + # mv file_a file_b + self.mount_a.run_shell(["mv", "file_a", "file_b"]) + + # Stray reintegration should happen as a result of the notify_stray call on + # completion of rename + self.wait_until_equal( + lambda: self.get_mdc_stat("num_strays"), + expect_val=0, + timeout=60 + ) + + self.assertEqual(self.get_mdc_stat("strays_created"), 1) + self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1) + + # No data objects should have been deleted, as both files still have linkage. + self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024)) + self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024)) + + self.fs.mds_asok(['flush', 'journal']) + + post_reint_bt = self.fs.read_backtrace(file_b_ino) + self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b") + + def _setup_two_ranks(self): + # Set up two MDSs + self.fs.set_max_mds(2) + + # See that we have two active MDSs + self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30, + reject_fn=lambda v: v > 2 or v < 1) + + active_mds_names = self.fs.get_active_names() + rank_0_id = active_mds_names[0] + rank_1_id = active_mds_names[1] + log.info("Ranks 0 and 1 are {0} and {1}".format( + rank_0_id, rank_1_id)) + + # Get rid of other MDS daemons so that it's easier to know which + # daemons to expect in which ranks after restarts + for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}: + self.mds_cluster.mds_stop(unneeded_mds) + self.mds_cluster.mds_fail(unneeded_mds) + + return rank_0_id, rank_1_id + + def _force_migrate(self, to_id, path, watch_ino): + """ + :param to_id: MDS id to move it to + :param path: Filesystem path (string) to move + :param watch_ino: Inode number to look for at destination to confirm move + :return: None + """ + self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path]) + + # Poll the MDS cache dump to watch for the export completing + migrated = False + migrate_timeout = 60 + migrate_elapsed = 0 + while not migrated: + data = self.fs.mds_asok(["dump", "cache"], to_id) + for inode_data in data: + if inode_data['ino'] == watch_ino: + log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2))) + if inode_data['is_auth'] is True: + migrated = True + break + + if not migrated: + if migrate_elapsed > migrate_timeout: + raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed)) + else: + migrate_elapsed += 1 + time.sleep(1) + + def _is_stopped(self, rank): + mds_map = self.fs.get_mds_map() + return rank not in [i['rank'] for i in mds_map['info'].values()] + + def test_purge_on_shutdown(self): + """ + That when an MDS rank is shut down, its purge queue is + drained in the process. + """ + rank_0_id, rank_1_id = self._setup_two_ranks() + + self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") + self.mds_cluster.mds_fail_restart(rank_1_id) + self.fs.wait_for_daemons() + + file_count = 5 + + self.mount_a.create_n_files("delete_me/file", file_count) + + self._force_migrate(rank_1_id, "delete_me", + self.mount_a.path_to_ino("delete_me/file_0")) + + self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")]) + self.mount_a.umount_wait() + + # See all the strays go into purge queue + self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id) + self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id) + self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0) + + # See nothing get purged from the purge queue (yet) + time.sleep(10) + self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) + + # Shut down rank 1 + self.fs.set_max_mds(1) + self.fs.deactivate(1) + + # It shouldn't proceed past stopping because its still not allowed + # to purge + time.sleep(10) + self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) + self.assertFalse(self._is_stopped(1)) + + # Permit the daemon to start purging again + self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id), + 'injectargs', + "--mds_max_purge_files 100") + + # It should now proceed through shutdown + self.wait_until_true( + lambda: self._is_stopped(1), + timeout=60 + ) + + # ...and in the process purge all that data + self.await_data_pool_empty() + + def test_migration_on_shutdown(self): + """ + That when an MDS rank is shut down, any non-purgeable strays + get migrated to another rank. + """ + + rank_0_id, rank_1_id = self._setup_two_ranks() + + # Create a non-purgeable stray in a ~mds1 stray directory + # by doing a hard link and deleting the original file + self.mount_a.run_shell(["mkdir", "dir_1", "dir_2"]) + self.mount_a.run_shell(["touch", "dir_1/original"]) + self.mount_a.run_shell(["ln", "dir_1/original", "dir_2/linkto"]) + + self._force_migrate(rank_1_id, "dir_1", + self.mount_a.path_to_ino("dir_1/original")) + + # empty mds cache. otherwise mds reintegrates stray when unlink finishes + self.mount_a.umount_wait() + self.fs.mds_asok(['flush', 'journal'], rank_0_id) + self.fs.mds_asok(['flush', 'journal'], rank_1_id) + self.fs.mds_fail_restart() + self.fs.wait_for_daemons() + + active_mds_names = self.fs.get_active_names() + rank_0_id = active_mds_names[0] + rank_1_id = active_mds_names[1] + + self.mount_a.mount() + + self.mount_a.run_shell(["rm", "-f", "dir_1/original"]) + self.mount_a.umount_wait() + + self._wait_for_counter("mds_cache", "strays_created", 1, + mds_id=rank_1_id) + + # Shut down rank 1 + self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1") + self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1") + + # Wait til we get to a single active MDS mdsmap state + self.wait_until_true(lambda: self._is_stopped(1), timeout=120) + + # See that the stray counter on rank 0 has incremented + self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1) + + def assert_backtrace(self, ino, expected_path): + """ + Assert that the backtrace in the data pool for an inode matches + an expected /foo/bar path. + """ + expected_elements = expected_path.strip("/").split("/") + bt = self.fs.read_backtrace(ino) + actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']])) + self.assertListEqual(expected_elements, actual_elements) + + def get_backtrace_path(self, ino): + bt = self.fs.read_backtrace(ino) + elements = reversed([dn['dname'] for dn in bt['ancestors']]) + return "/".join(elements) + + def assert_purge_idle(self): + """ + Assert that the MDS perf counters indicate no strays exist and + no ongoing purge activity. Sanity check for when PurgeQueue should + be idle. + """ + mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache'] + pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue'] + self.assertEqual(mdc_stats["num_strays"], 0) + self.assertEqual(mdc_stats["num_strays_delayed"], 0) + self.assertEqual(pq_stats["pq_executing"], 0) + self.assertEqual(pq_stats["pq_executing_ops"], 0) + + def test_mv_cleanup(self): + """ + That when doing a rename from A to B, and B has no hardlinks, + then we make a stray for B and purge him. + """ + # Create file_a and file_b, write some to both + size_mb = 8 + self.mount_a.write_n_mb("file_a", size_mb) + file_a_ino = self.mount_a.path_to_ino("file_a") + self.mount_a.write_n_mb("file_b", size_mb) + file_b_ino = self.mount_a.path_to_ino("file_b") + + self.fs.mds_asok(['flush', 'journal']) + self.assert_backtrace(file_a_ino, "file_a") + self.assert_backtrace(file_b_ino, "file_b") + + # mv file_a file_b + self.mount_a.run_shell(['mv', 'file_a', 'file_b']) + + # See that stray counter increments + self.assertEqual(self.get_mdc_stat("strays_created"), 1) + # Wait for purge counter to increment + self._wait_for_counter("mds_cache", "strays_enqueued", 1) + self._wait_for_counter("purge_queue", "pq_executed", 1) + + self.assert_purge_idle() + + # file_b should have been purged + self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024)) + + # Backtrace should have updated from file_a to file_b + self.fs.mds_asok(['flush', 'journal']) + self.assert_backtrace(file_a_ino, "file_b") + + # file_a's data should still exist + self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024)) + + def _pool_df(self, pool_name): + """ + Return a dict like + { + "kb_used": 0, + "bytes_used": 0, + "max_avail": 19630292406, + "objects": 0 + } + + :param pool_name: Which pool (must exist) + """ + out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty") + for p in json.loads(out)['pools']: + if p['name'] == pool_name: + return p['stats'] + + raise RuntimeError("Pool '{0}' not found".format(pool_name)) + + def await_data_pool_empty(self): + self.wait_until_true( + lambda: self._pool_df( + self.fs.get_data_pool_name() + )['objects'] == 0, + timeout=60) + + def test_snapshot_remove(self): + """ + That removal of a snapshot that references a now-unlinked file results + in purging on the stray for the file. + """ + # Enable snapshots + self.fs.mon_manager.raw_cluster_cmd("mds", "set", "allow_new_snaps", "true", + "--yes-i-really-mean-it") + + # Create a dir with a file in it + size_mb = 8 + self.mount_a.run_shell(["mkdir", "snapdir"]) + self.mount_a.run_shell(["mkdir", "snapdir/subdir"]) + self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024) + file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a") + + # Snapshot the dir + self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"]) + + # Cause the head revision to deviate from the snapshot + self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb) + + # Flush the journal so that backtraces, dirfrag objects will actually be written + self.fs.mds_asok(["flush", "journal"]) + + # Unlink the file + self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"]) + self.mount_a.run_shell(["rmdir", "snapdir/subdir"]) + + # Unmount the client because when I come back to check the data is still + # in the file I don't want to just see what's in the page cache. + self.mount_a.umount_wait() + + self.assertEqual(self.get_mdc_stat("strays_created"), 2) + + # FIXME: at this stage we see a purge and the stray count drops to + # zero, but there's actually still a stray, so at the very + # least the StrayManager stats code is slightly off + + self.mount_a.mount() + + # See that the data from the snapshotted revision of the file is still present + # and correct + self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024) + + # Remove the snapshot + self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"]) + + # Purging file_a doesn't happen until after we've flushed the journal, because + # it is referenced by the snapshotted subdir, and the snapshot isn't really + # gone until the journal references to it are gone + self.fs.mds_asok(["flush", "journal"]) + + # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs. + # See also: http://tracker.ceph.com/issues/20072 + self.wait_until_true( + lambda: self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024), + timeout=60 + ) + + # See that a purge happens now + self._wait_for_counter("mds_cache", "strays_enqueued", 2) + self._wait_for_counter("purge_queue", "pq_executed", 2) + + self.await_data_pool_empty() + + def test_fancy_layout(self): + """ + purge stray file with fancy layout + """ + + file_name = "fancy_layout_file" + self.mount_a.run_shell(["touch", file_name]) + + file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608" + self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout) + + # 35MB requires 7 objects + size_mb = 35 + self.mount_a.write_n_mb(file_name, size_mb) + + self.mount_a.run_shell(["rm", "-f", file_name]) + self.fs.mds_asok(["flush", "journal"]) + + # can't use self.fs.data_objects_absent here, it does not support fancy layout + self.await_data_pool_empty() + + def test_dirfrag_limit(self): + """ + That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations). + + That fragmentation (forced) will allow more entries to be created. + + That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged. + """ + + self.fs.set_allow_dirfrags(True) + + LOW_LIMIT = 50 + for mds in self.fs.get_daemon_names(): + self.fs.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT)], mds) + + try: + self.mount_a.run_python(dedent(""" + import os + path = os.path.join("{path}", "subdir") + os.mkdir(path) + for n in range(0, {file_count}): + open(os.path.join(path, "%s" % n), 'w').write("%s" % n) + """.format( + path=self.mount_a.mountpoint, + file_count=LOW_LIMIT+1 + ))) + except CommandFailedError: + pass # ENOSPAC + else: + raise RuntimeError("fragment size exceeded") + + # Now test that we can go beyond the limit if we fragment the directory + + self.mount_a.run_python(dedent(""" + import os + path = os.path.join("{path}", "subdir2") + os.mkdir(path) + for n in range(0, {file_count}): + open(os.path.join(path, "%s" % n), 'w').write("%s" % n) + dfd = os.open(path, os.O_DIRECTORY) + os.fsync(dfd) + """.format( + path=self.mount_a.mountpoint, + file_count=LOW_LIMIT + ))) + + # Ensure that subdir2 is fragmented + mds_id = self.fs.get_active_names()[0] + self.fs.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id) + + # remount+flush (release client caps) + self.mount_a.umount_wait() + self.fs.mds_asok(["flush", "journal"], mds_id) + self.mount_a.mount() + self.mount_a.wait_until_mounted() + + # Create 50% more files than the current fragment limit + self.mount_a.run_python(dedent(""" + import os + path = os.path.join("{path}", "subdir2") + for n in range({file_count}, ({file_count}*3)//2): + open(os.path.join(path, "%s" % n), 'w').write("%s" % n) + """.format( + path=self.mount_a.mountpoint, + file_count=LOW_LIMIT + ))) + + # Now test the stray directory size is limited and recovers + strays_before = self.get_mdc_stat("strays_created") + try: + self.mount_a.run_python(dedent(""" + import os + path = os.path.join("{path}", "subdir3") + os.mkdir(path) + for n in range({file_count}): + fpath = os.path.join(path, "%s" % n) + f = open(fpath, 'w') + f.write("%s" % n) + f.close() + os.unlink(fpath) + """.format( + path=self.mount_a.mountpoint, + file_count=LOW_LIMIT*10 # 10 stray directories, should collide before this count + ))) + except CommandFailedError: + pass # ENOSPAC + else: + raise RuntimeError("fragment size exceeded") + + strays_after = self.get_mdc_stat("strays_created") + self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT) + + self._wait_for_counter("mds_cache", "strays_enqueued", strays_after) + self._wait_for_counter("purge_queue", "pq_executed", strays_after) + + self.mount_a.run_python(dedent(""" + import os + path = os.path.join("{path}", "subdir4") + os.mkdir(path) + for n in range({file_count}): + fpath = os.path.join(path, "%s" % n) + f = open(fpath, 'w') + f.write("%s" % n) + f.close() + os.unlink(fpath) + """.format( + path=self.mount_a.mountpoint, + file_count=LOW_LIMIT + ))) + + def test_purge_queue_upgrade(self): + """ + That when starting on a system with no purge queue in the metadata + pool, we silently create one. + :return: + """ + + self.mds_cluster.mds_stop() + self.mds_cluster.mds_fail() + self.fs.rados(["rm", "500.00000000"]) + self.mds_cluster.mds_restart() + self.fs.wait_for_daemons() + + def test_purge_queue_op_rate(self): + """ + A busy purge queue is meant to aggregate operations sufficiently + that our RADOS ops to the metadata pool are not O(files). Check + that that is so. + :return: + """ + + # For low rates of deletion, the rate of metadata ops actually + # will be o(files), so to see the desired behaviour we have to give + # the system a significant quantity, i.e. an order of magnitude + # more than the number of files it will purge at one time. + + max_purge_files = 2 + + self.set_conf('mds', 'mds_bal_frag', 'false') + self.set_conf('mds', 'mds_max_purge_files', "%d" % max_purge_files) + self.fs.mds_fail_restart() + self.fs.wait_for_daemons() + + phase_1_files = 256 + phase_2_files = 512 + + self.mount_a.run_shell(["mkdir", "phase1"]) + self.mount_a.create_n_files("phase1/file", phase_1_files) + + self.mount_a.run_shell(["mkdir", "phase2"]) + self.mount_a.create_n_files("phase2/file", phase_2_files) + + def unlink_and_count_ops(path, expected_deletions): + initial_ops = self.get_stat("objecter", "op") + initial_pq_executed = self.get_stat("purge_queue", "pq_executed") + + self.mount_a.run_shell(["rm", "-rf", path]) + + self._wait_for_counter( + "purge_queue", "pq_executed", initial_pq_executed + expected_deletions + ) + + final_ops = self.get_stat("objecter", "op") + + # Calculation of the *overhead* operations, i.e. do not include + # the operations where we actually delete files. + return final_ops - initial_ops - expected_deletions + + self.fs.mds_asok(['flush', 'journal']) + phase1_ops = unlink_and_count_ops("phase1/", phase_1_files + 1) + + self.fs.mds_asok(['flush', 'journal']) + phase2_ops = unlink_and_count_ops("phase2/", phase_2_files + 1) + + log.info("Phase 1: {0}".format(phase1_ops)) + log.info("Phase 2: {0}".format(phase2_ops)) + + # The success criterion is that deleting double the number + # of files doesn't generate double the number of overhead ops + # -- this comparison is a rough approximation of that rule. + self.assertTrue(phase2_ops < phase1_ops * 1.25) + + # Finally, check that our activity did include properly quiescing + # the queue (i.e. call to Journaler::write_head in the right place), + # by restarting the MDS and checking that it doesn't try re-executing + # any of the work we did. + self.fs.mds_asok(['flush', 'journal']) # flush to ensure no strays + # hanging around + self.fs.mds_fail_restart() + self.fs.wait_for_daemons() + time.sleep(10) + self.assertEqual(self.get_stat("purge_queue", "pq_executed"), 0) + + def test_replicated_delete_speed(self): + """ + That deletions of replicated metadata are not pathologically slow + """ + rank_0_id, rank_1_id = self._setup_two_ranks() + + self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") + self.mds_cluster.mds_fail_restart(rank_1_id) + self.fs.wait_for_daemons() + + file_count = 10 + + self.mount_a.create_n_files("delete_me/file", file_count) + + self._force_migrate(rank_1_id, "delete_me", + self.mount_a.path_to_ino("delete_me/file_0")) + + begin = datetime.datetime.now() + self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")]) + end = datetime.datetime.now() + + # What we're really checking here is that we are completing client + # operations immediately rather than delaying until the next tick. + tick_period = float(self.fs.get_config("mds_tick_interval", + service_type="mds")) + + duration = (end - begin).total_seconds() + self.assertLess(duration, (file_count * tick_period) * 0.25) +