X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fqa%2Ftasks%2Fcephfs%2Ftest_strays.py;fp=src%2Fceph%2Fqa%2Ftasks%2Fcephfs%2Ftest_strays.py;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=b64f3e931dca692e6e3681cf2e270c04f3987c5d;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/qa/tasks/cephfs/test_strays.py b/src/ceph/qa/tasks/cephfs/test_strays.py deleted file mode 100644 index b64f3e9..0000000 --- a/src/ceph/qa/tasks/cephfs/test_strays.py +++ /dev/null @@ -1,1049 +0,0 @@ -import json -import time -import logging -from textwrap import dedent -import datetime -import gevent -import datetime - -from teuthology.orchestra.run import CommandFailedError, Raw -from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology - -log = logging.getLogger(__name__) - - -class TestStrays(CephFSTestCase): - MDSS_REQUIRED = 2 - - OPS_THROTTLE = 1 - FILES_THROTTLE = 2 - - # Range of different file sizes used in throttle test's workload - throttle_workload_size_range = 16 - - @for_teuthology - def test_ops_throttle(self): - self._test_throttling(self.OPS_THROTTLE) - - @for_teuthology - def test_files_throttle(self): - self._test_throttling(self.FILES_THROTTLE) - - def test_dir_deletion(self): - """ - That when deleting a bunch of dentries and the containing - directory, everything gets purged. - Catches cases where the client might e.g. fail to trim - the unlinked dir from its cache. - """ - file_count = 1000 - create_script = dedent(""" - import os - - mount_path = "{mount_path}" - subdir = "delete_me" - size = {size} - file_count = {file_count} - os.mkdir(os.path.join(mount_path, subdir)) - for i in xrange(0, file_count): - filename = "{{0}}_{{1}}.bin".format(i, size) - f = open(os.path.join(mount_path, subdir, filename), 'w') - f.write(size * 'x') - f.close() - """.format( - mount_path=self.mount_a.mountpoint, - size=1024, - file_count=file_count - )) - - self.mount_a.run_python(create_script) - - # That the dirfrag object is created - self.fs.mds_asok(["flush", "journal"]) - dir_ino = self.mount_a.path_to_ino("delete_me") - self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0)) - - # Remove everything - self.mount_a.run_shell(["rm", "-rf", "delete_me"]) - self.fs.mds_asok(["flush", "journal"]) - - # That all the removed files get created as strays - strays = self.get_mdc_stat("strays_created") - self.assertEqual(strays, file_count + 1) - - # That the strays all get enqueued for purge - self.wait_until_equal( - lambda: self.get_mdc_stat("strays_enqueued"), - strays, - timeout=600 - - ) - - # That all the purge operations execute - self.wait_until_equal( - lambda: self.get_stat("purge_queue", "pq_executed"), - strays, - timeout=600 - ) - - # That finally, the directory metadata object is gone - self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0)) - - # That finally, the data objects are all gone - self.await_data_pool_empty() - - def _test_throttling(self, throttle_type): - self.data_log = [] - try: - return self._do_test_throttling(throttle_type) - except: - for l in self.data_log: - log.info(",".join([l_.__str__() for l_ in l])) - raise - - def _do_test_throttling(self, throttle_type): - """ - That the mds_max_purge_ops setting is respected - """ - - def set_throttles(files, ops): - """ - Helper for updating ops/files limits, and calculating effective - ops_per_pg setting to give the same ops limit. - """ - self.set_conf('mds', 'mds_max_purge_files', "%d" % files) - self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops) - - pgs = self.fs.mon_manager.get_pool_property( - self.fs.get_data_pool_name(), - "pg_num" - ) - ops_per_pg = float(ops) / pgs - self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg) - - # Test conditions depend on what we're going to be exercising. - # * Lift the threshold on whatever throttle we are *not* testing, so - # that the throttle of interest is the one that will be the bottleneck - # * Create either many small files (test file count throttling) or fewer - # large files (test op throttling) - if throttle_type == self.OPS_THROTTLE: - set_throttles(files=100000000, ops=16) - size_unit = 1024 * 1024 # big files, generate lots of ops - file_multiplier = 100 - elif throttle_type == self.FILES_THROTTLE: - # The default value of file limit is pretty permissive, so to avoid - # the test running too fast, create lots of files and set the limit - # pretty low. - set_throttles(ops=100000000, files=6) - size_unit = 1024 # small, numerous files - file_multiplier = 200 - else: - raise NotImplemented(throttle_type) - - # Pick up config changes - self.fs.mds_fail_restart() - self.fs.wait_for_daemons() - - create_script = dedent(""" - import os - - mount_path = "{mount_path}" - subdir = "delete_me" - size_unit = {size_unit} - file_multiplier = {file_multiplier} - os.mkdir(os.path.join(mount_path, subdir)) - for i in xrange(0, file_multiplier): - for size in xrange(0, {size_range}*size_unit, size_unit): - filename = "{{0}}_{{1}}.bin".format(i, size / size_unit) - f = open(os.path.join(mount_path, subdir, filename), 'w') - f.write(size * 'x') - f.close() - """.format( - mount_path=self.mount_a.mountpoint, - size_unit=size_unit, - file_multiplier=file_multiplier, - size_range=self.throttle_workload_size_range - )) - - self.mount_a.run_python(create_script) - - # We will run the deletion in the background, to reduce the risk of it completing before - # we have started monitoring the stray statistics. - def background(): - self.mount_a.run_shell(["rm", "-rf", "delete_me"]) - self.fs.mds_asok(["flush", "journal"]) - - background_thread = gevent.spawn(background) - - total_inodes = file_multiplier * self.throttle_workload_size_range + 1 - mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds')) - mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds')) - - # During this phase we look for the concurrent ops to exceed half - # the limit (a heuristic) and not exceed the limit (a correctness - # condition). - purge_timeout = 600 - elapsed = 0 - files_high_water = 0 - ops_high_water = 0 - - while True: - stats = self.fs.mds_asok(['perf', 'dump']) - mdc_stats = stats['mds_cache'] - pq_stats = stats['purge_queue'] - if elapsed >= purge_timeout: - raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats)) - - num_strays = mdc_stats['num_strays'] - num_strays_purging = pq_stats['pq_executing'] - num_purge_ops = pq_stats['pq_executing_ops'] - - self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops]) - - files_high_water = max(files_high_water, num_strays_purging) - ops_high_water = max(ops_high_water, num_purge_ops) - - total_strays_created = mdc_stats['strays_created'] - total_strays_purged = pq_stats['pq_executed'] - - if total_strays_purged == total_inodes: - log.info("Complete purge in {0} seconds".format(elapsed)) - break - elif total_strays_purged > total_inodes: - raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats)) - else: - if throttle_type == self.OPS_THROTTLE: - # 11 is filer_max_purge_ops plus one for the backtrace: - # limit is allowed to be overshot by this much. - if num_purge_ops > mds_max_purge_ops + 11: - raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format( - num_purge_ops, mds_max_purge_ops - )) - elif throttle_type == self.FILES_THROTTLE: - if num_strays_purging > mds_max_purge_files: - raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format( - num_strays_purging, mds_max_purge_files - )) - else: - raise NotImplemented(throttle_type) - - log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format( - num_strays_purging, num_strays, - total_strays_purged, total_strays_created - )) - time.sleep(1) - elapsed += 1 - - background_thread.join() - - # Check that we got up to a respectable rate during the purge. This is totally - # racy, but should be safeish unless the cluster is pathologically slow, or - # insanely fast such that the deletions all pass before we have polled the - # statistics. - if throttle_type == self.OPS_THROTTLE: - if ops_high_water < mds_max_purge_ops / 2: - raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format( - ops_high_water, mds_max_purge_ops - )) - elif throttle_type == self.FILES_THROTTLE: - if files_high_water < mds_max_purge_files / 2: - raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format( - ops_high_water, mds_max_purge_files - )) - - # Sanity check all MDC stray stats - stats = self.fs.mds_asok(['perf', 'dump']) - mdc_stats = stats['mds_cache'] - pq_stats = stats['purge_queue'] - self.assertEqual(mdc_stats['num_strays'], 0) - self.assertEqual(mdc_stats['num_strays_delayed'], 0) - self.assertEqual(pq_stats['pq_executing'], 0) - self.assertEqual(pq_stats['pq_executing_ops'], 0) - self.assertEqual(mdc_stats['strays_created'], total_inodes) - self.assertEqual(mdc_stats['strays_enqueued'], total_inodes) - self.assertEqual(pq_stats['pq_executed'], total_inodes) - - def get_mdc_stat(self, name, mds_id=None): - return self.get_stat("mds_cache", name, mds_id) - - def get_stat(self, subsys, name, mds_id=None): - return self.fs.mds_asok(['perf', 'dump', subsys, name], - mds_id=mds_id)[subsys][name] - - def _wait_for_counter(self, subsys, counter, expect_val, timeout=60, - mds_id=None): - self.wait_until_equal( - lambda: self.get_stat(subsys, counter, mds_id), - expect_val=expect_val, timeout=timeout, - reject_fn=lambda x: x > expect_val - ) - - def test_open_inode(self): - """ - That the case of a dentry unlinked while a client holds an - inode open is handled correctly. - - The inode should be moved into a stray dentry, while the original - dentry and directory should be purged. - - The inode's data should be purged when the client eventually closes - it. - """ - mount_a_client_id = self.mount_a.get_global_id() - - # Write some bytes to a file - size_mb = 8 - - # Hold the file open - p = self.mount_a.open_background("open_file") - self.mount_a.write_n_mb("open_file", size_mb) - open_file_ino = self.mount_a.path_to_ino("open_file") - - self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) - - # Unlink the dentry - self.mount_a.run_shell(["rm", "-f", "open_file"]) - - # Wait to see the stray count increment - self.wait_until_equal( - lambda: self.get_mdc_stat("num_strays"), - expect_val=1, timeout=60, reject_fn=lambda x: x > 1) - - # See that while the stray count has incremented, none have passed - # on to the purge queue - self.assertEqual(self.get_mdc_stat("strays_created"), 1) - self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0) - - # See that the client still holds 2 caps - self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) - - # See that the data objects remain in the data pool - self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024)) - - # Now close the file - self.mount_a.kill_background(p) - - # Wait to see the client cap count decrement - self.wait_until_equal( - lambda: self.get_session(mount_a_client_id)['num_caps'], - expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1 - ) - # Wait to see the purge counter increment, stray count go to zero - self._wait_for_counter("mds_cache", "strays_enqueued", 1) - self.wait_until_equal( - lambda: self.get_mdc_stat("num_strays"), - expect_val=0, timeout=6, reject_fn=lambda x: x > 1 - ) - self._wait_for_counter("purge_queue", "pq_executed", 1) - - # See that the data objects no longer exist - self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024)) - - self.await_data_pool_empty() - - def test_hardlink_reintegration(self): - """ - That removal of primary dentry of hardlinked inode results - in reintegration of inode into the previously-remote dentry, - rather than lingering as a stray indefinitely. - """ - # Write some bytes to file_a - size_mb = 8 - self.mount_a.run_shell(["mkdir", "dir_1"]) - self.mount_a.write_n_mb("dir_1/file_a", size_mb) - ino = self.mount_a.path_to_ino("dir_1/file_a") - - # Create a hardlink named file_b - self.mount_a.run_shell(["mkdir", "dir_2"]) - self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"]) - self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino) - - # Flush journal - self.fs.mds_asok(['flush', 'journal']) - - # See that backtrace for the file points to the file_a path - pre_unlink_bt = self.fs.read_backtrace(ino) - self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a") - - # empty mds cache. otherwise mds reintegrates stray when unlink finishes - self.mount_a.umount_wait() - self.fs.mds_asok(['flush', 'journal']) - self.fs.mds_fail_restart() - self.fs.wait_for_daemons() - self.mount_a.mount() - - # Unlink file_a - self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"]) - - # See that a stray was created - self.assertEqual(self.get_mdc_stat("num_strays"), 1) - self.assertEqual(self.get_mdc_stat("strays_created"), 1) - - # Wait, see that data objects are still present (i.e. that the - # stray did not advance to purging given time) - time.sleep(30) - self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024)) - self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0) - - # See that before reintegration, the inode's backtrace points to a stray dir - self.fs.mds_asok(['flush', 'journal']) - self.assertTrue(self.get_backtrace_path(ino).startswith("stray")) - - last_reintegrated = self.get_mdc_stat("strays_reintegrated") - - # Do a metadata operation on the remaining link (mv is heavy handed, but - # others like touch may be satisfied from caps without poking MDS) - self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"]) - - # Stray reintegration should happen as a result of the eval_remote call - # on responding to a client request. - self.wait_until_equal( - lambda: self.get_mdc_stat("num_strays"), - expect_val=0, - timeout=60 - ) - - # See the reintegration counter increment - curr_reintegrated = self.get_mdc_stat("strays_reintegrated") - self.assertGreater(curr_reintegrated, last_reintegrated) - last_reintegrated = curr_reintegrated - - # Flush the journal - self.fs.mds_asok(['flush', 'journal']) - - # See that the backtrace for the file points to the remaining link's path - post_reint_bt = self.fs.read_backtrace(ino) - self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c") - - # mds should reintegrates stray when unlink finishes - self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"]) - self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"]) - - # Stray reintegration should happen as a result of the notify_stray call - # on completion of unlink - self.wait_until_equal( - lambda: self.get_mdc_stat("num_strays"), - expect_val=0, - timeout=60 - ) - - # See the reintegration counter increment - curr_reintegrated = self.get_mdc_stat("strays_reintegrated") - self.assertGreater(curr_reintegrated, last_reintegrated) - last_reintegrated = curr_reintegrated - - # Flush the journal - self.fs.mds_asok(['flush', 'journal']) - - # See that the backtrace for the file points to the newest link's path - post_reint_bt = self.fs.read_backtrace(ino) - self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d") - - # Now really delete it - self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"]) - self._wait_for_counter("mds_cache", "strays_enqueued", 1) - self._wait_for_counter("purge_queue", "pq_executed", 1) - - self.assert_purge_idle() - self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024)) - - # We caused the inode to go stray 3 times - self.assertEqual(self.get_mdc_stat("strays_created"), 3) - # We purged it at the last - self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1) - - def test_mv_hardlink_cleanup(self): - """ - That when doing a rename from A to B, and B has hardlinks, - then we make a stray for B which is then reintegrated - into one of his hardlinks. - """ - # Create file_a, file_b, and a hardlink to file_b - size_mb = 8 - self.mount_a.write_n_mb("file_a", size_mb) - file_a_ino = self.mount_a.path_to_ino("file_a") - - self.mount_a.write_n_mb("file_b", size_mb) - file_b_ino = self.mount_a.path_to_ino("file_b") - - self.mount_a.run_shell(["ln", "file_b", "linkto_b"]) - self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino) - - # mv file_a file_b - self.mount_a.run_shell(["mv", "file_a", "file_b"]) - - # Stray reintegration should happen as a result of the notify_stray call on - # completion of rename - self.wait_until_equal( - lambda: self.get_mdc_stat("num_strays"), - expect_val=0, - timeout=60 - ) - - self.assertEqual(self.get_mdc_stat("strays_created"), 1) - self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1) - - # No data objects should have been deleted, as both files still have linkage. - self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024)) - self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024)) - - self.fs.mds_asok(['flush', 'journal']) - - post_reint_bt = self.fs.read_backtrace(file_b_ino) - self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b") - - def _setup_two_ranks(self): - # Set up two MDSs - self.fs.set_max_mds(2) - - # See that we have two active MDSs - self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30, - reject_fn=lambda v: v > 2 or v < 1) - - active_mds_names = self.fs.get_active_names() - rank_0_id = active_mds_names[0] - rank_1_id = active_mds_names[1] - log.info("Ranks 0 and 1 are {0} and {1}".format( - rank_0_id, rank_1_id)) - - # Get rid of other MDS daemons so that it's easier to know which - # daemons to expect in which ranks after restarts - for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}: - self.mds_cluster.mds_stop(unneeded_mds) - self.mds_cluster.mds_fail(unneeded_mds) - - return rank_0_id, rank_1_id - - def _force_migrate(self, to_id, path, watch_ino): - """ - :param to_id: MDS id to move it to - :param path: Filesystem path (string) to move - :param watch_ino: Inode number to look for at destination to confirm move - :return: None - """ - self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path]) - - # Poll the MDS cache dump to watch for the export completing - migrated = False - migrate_timeout = 60 - migrate_elapsed = 0 - while not migrated: - data = self.fs.mds_asok(["dump", "cache"], to_id) - for inode_data in data: - if inode_data['ino'] == watch_ino: - log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2))) - if inode_data['is_auth'] is True: - migrated = True - break - - if not migrated: - if migrate_elapsed > migrate_timeout: - raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed)) - else: - migrate_elapsed += 1 - time.sleep(1) - - def _is_stopped(self, rank): - mds_map = self.fs.get_mds_map() - return rank not in [i['rank'] for i in mds_map['info'].values()] - - def test_purge_on_shutdown(self): - """ - That when an MDS rank is shut down, its purge queue is - drained in the process. - """ - rank_0_id, rank_1_id = self._setup_two_ranks() - - self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") - self.mds_cluster.mds_fail_restart(rank_1_id) - self.fs.wait_for_daemons() - - file_count = 5 - - self.mount_a.create_n_files("delete_me/file", file_count) - - self._force_migrate(rank_1_id, "delete_me", - self.mount_a.path_to_ino("delete_me/file_0")) - - self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")]) - self.mount_a.umount_wait() - - # See all the strays go into purge queue - self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id) - self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id) - self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0) - - # See nothing get purged from the purge queue (yet) - time.sleep(10) - self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) - - # Shut down rank 1 - self.fs.set_max_mds(1) - self.fs.deactivate(1) - - # It shouldn't proceed past stopping because its still not allowed - # to purge - time.sleep(10) - self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) - self.assertFalse(self._is_stopped(1)) - - # Permit the daemon to start purging again - self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id), - 'injectargs', - "--mds_max_purge_files 100") - - # It should now proceed through shutdown - self.wait_until_true( - lambda: self._is_stopped(1), - timeout=60 - ) - - # ...and in the process purge all that data - self.await_data_pool_empty() - - def test_migration_on_shutdown(self): - """ - That when an MDS rank is shut down, any non-purgeable strays - get migrated to another rank. - """ - - rank_0_id, rank_1_id = self._setup_two_ranks() - - # Create a non-purgeable stray in a ~mds1 stray directory - # by doing a hard link and deleting the original file - self.mount_a.run_shell(["mkdir", "dir_1", "dir_2"]) - self.mount_a.run_shell(["touch", "dir_1/original"]) - self.mount_a.run_shell(["ln", "dir_1/original", "dir_2/linkto"]) - - self._force_migrate(rank_1_id, "dir_1", - self.mount_a.path_to_ino("dir_1/original")) - - # empty mds cache. otherwise mds reintegrates stray when unlink finishes - self.mount_a.umount_wait() - self.fs.mds_asok(['flush', 'journal'], rank_0_id) - self.fs.mds_asok(['flush', 'journal'], rank_1_id) - self.fs.mds_fail_restart() - self.fs.wait_for_daemons() - - active_mds_names = self.fs.get_active_names() - rank_0_id = active_mds_names[0] - rank_1_id = active_mds_names[1] - - self.mount_a.mount() - - self.mount_a.run_shell(["rm", "-f", "dir_1/original"]) - self.mount_a.umount_wait() - - self._wait_for_counter("mds_cache", "strays_created", 1, - mds_id=rank_1_id) - - # Shut down rank 1 - self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1") - self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1") - - # Wait til we get to a single active MDS mdsmap state - self.wait_until_true(lambda: self._is_stopped(1), timeout=120) - - # See that the stray counter on rank 0 has incremented - self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1) - - def assert_backtrace(self, ino, expected_path): - """ - Assert that the backtrace in the data pool for an inode matches - an expected /foo/bar path. - """ - expected_elements = expected_path.strip("/").split("/") - bt = self.fs.read_backtrace(ino) - actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']])) - self.assertListEqual(expected_elements, actual_elements) - - def get_backtrace_path(self, ino): - bt = self.fs.read_backtrace(ino) - elements = reversed([dn['dname'] for dn in bt['ancestors']]) - return "/".join(elements) - - def assert_purge_idle(self): - """ - Assert that the MDS perf counters indicate no strays exist and - no ongoing purge activity. Sanity check for when PurgeQueue should - be idle. - """ - mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache'] - pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue'] - self.assertEqual(mdc_stats["num_strays"], 0) - self.assertEqual(mdc_stats["num_strays_delayed"], 0) - self.assertEqual(pq_stats["pq_executing"], 0) - self.assertEqual(pq_stats["pq_executing_ops"], 0) - - def test_mv_cleanup(self): - """ - That when doing a rename from A to B, and B has no hardlinks, - then we make a stray for B and purge him. - """ - # Create file_a and file_b, write some to both - size_mb = 8 - self.mount_a.write_n_mb("file_a", size_mb) - file_a_ino = self.mount_a.path_to_ino("file_a") - self.mount_a.write_n_mb("file_b", size_mb) - file_b_ino = self.mount_a.path_to_ino("file_b") - - self.fs.mds_asok(['flush', 'journal']) - self.assert_backtrace(file_a_ino, "file_a") - self.assert_backtrace(file_b_ino, "file_b") - - # mv file_a file_b - self.mount_a.run_shell(['mv', 'file_a', 'file_b']) - - # See that stray counter increments - self.assertEqual(self.get_mdc_stat("strays_created"), 1) - # Wait for purge counter to increment - self._wait_for_counter("mds_cache", "strays_enqueued", 1) - self._wait_for_counter("purge_queue", "pq_executed", 1) - - self.assert_purge_idle() - - # file_b should have been purged - self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024)) - - # Backtrace should have updated from file_a to file_b - self.fs.mds_asok(['flush', 'journal']) - self.assert_backtrace(file_a_ino, "file_b") - - # file_a's data should still exist - self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024)) - - def _pool_df(self, pool_name): - """ - Return a dict like - { - "kb_used": 0, - "bytes_used": 0, - "max_avail": 19630292406, - "objects": 0 - } - - :param pool_name: Which pool (must exist) - """ - out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty") - for p in json.loads(out)['pools']: - if p['name'] == pool_name: - return p['stats'] - - raise RuntimeError("Pool '{0}' not found".format(pool_name)) - - def await_data_pool_empty(self): - self.wait_until_true( - lambda: self._pool_df( - self.fs.get_data_pool_name() - )['objects'] == 0, - timeout=60) - - def test_snapshot_remove(self): - """ - That removal of a snapshot that references a now-unlinked file results - in purging on the stray for the file. - """ - # Enable snapshots - self.fs.mon_manager.raw_cluster_cmd("mds", "set", "allow_new_snaps", "true", - "--yes-i-really-mean-it") - - # Create a dir with a file in it - size_mb = 8 - self.mount_a.run_shell(["mkdir", "snapdir"]) - self.mount_a.run_shell(["mkdir", "snapdir/subdir"]) - self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024) - file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a") - - # Snapshot the dir - self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"]) - - # Cause the head revision to deviate from the snapshot - self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb) - - # Flush the journal so that backtraces, dirfrag objects will actually be written - self.fs.mds_asok(["flush", "journal"]) - - # Unlink the file - self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"]) - self.mount_a.run_shell(["rmdir", "snapdir/subdir"]) - - # Unmount the client because when I come back to check the data is still - # in the file I don't want to just see what's in the page cache. - self.mount_a.umount_wait() - - self.assertEqual(self.get_mdc_stat("strays_created"), 2) - - # FIXME: at this stage we see a purge and the stray count drops to - # zero, but there's actually still a stray, so at the very - # least the StrayManager stats code is slightly off - - self.mount_a.mount() - - # See that the data from the snapshotted revision of the file is still present - # and correct - self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024) - - # Remove the snapshot - self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"]) - - # Purging file_a doesn't happen until after we've flushed the journal, because - # it is referenced by the snapshotted subdir, and the snapshot isn't really - # gone until the journal references to it are gone - self.fs.mds_asok(["flush", "journal"]) - - # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs. - # See also: http://tracker.ceph.com/issues/20072 - self.wait_until_true( - lambda: self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024), - timeout=60 - ) - - # See that a purge happens now - self._wait_for_counter("mds_cache", "strays_enqueued", 2) - self._wait_for_counter("purge_queue", "pq_executed", 2) - - self.await_data_pool_empty() - - def test_fancy_layout(self): - """ - purge stray file with fancy layout - """ - - file_name = "fancy_layout_file" - self.mount_a.run_shell(["touch", file_name]) - - file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608" - self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout) - - # 35MB requires 7 objects - size_mb = 35 - self.mount_a.write_n_mb(file_name, size_mb) - - self.mount_a.run_shell(["rm", "-f", file_name]) - self.fs.mds_asok(["flush", "journal"]) - - # can't use self.fs.data_objects_absent here, it does not support fancy layout - self.await_data_pool_empty() - - def test_dirfrag_limit(self): - """ - That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations). - - That fragmentation (forced) will allow more entries to be created. - - That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged. - """ - - self.fs.set_allow_dirfrags(True) - - LOW_LIMIT = 50 - for mds in self.fs.get_daemon_names(): - self.fs.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT)], mds) - - try: - self.mount_a.run_python(dedent(""" - import os - path = os.path.join("{path}", "subdir") - os.mkdir(path) - for n in range(0, {file_count}): - open(os.path.join(path, "%s" % n), 'w').write("%s" % n) - """.format( - path=self.mount_a.mountpoint, - file_count=LOW_LIMIT+1 - ))) - except CommandFailedError: - pass # ENOSPAC - else: - raise RuntimeError("fragment size exceeded") - - # Now test that we can go beyond the limit if we fragment the directory - - self.mount_a.run_python(dedent(""" - import os - path = os.path.join("{path}", "subdir2") - os.mkdir(path) - for n in range(0, {file_count}): - open(os.path.join(path, "%s" % n), 'w').write("%s" % n) - dfd = os.open(path, os.O_DIRECTORY) - os.fsync(dfd) - """.format( - path=self.mount_a.mountpoint, - file_count=LOW_LIMIT - ))) - - # Ensure that subdir2 is fragmented - mds_id = self.fs.get_active_names()[0] - self.fs.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id) - - # remount+flush (release client caps) - self.mount_a.umount_wait() - self.fs.mds_asok(["flush", "journal"], mds_id) - self.mount_a.mount() - self.mount_a.wait_until_mounted() - - # Create 50% more files than the current fragment limit - self.mount_a.run_python(dedent(""" - import os - path = os.path.join("{path}", "subdir2") - for n in range({file_count}, ({file_count}*3)//2): - open(os.path.join(path, "%s" % n), 'w').write("%s" % n) - """.format( - path=self.mount_a.mountpoint, - file_count=LOW_LIMIT - ))) - - # Now test the stray directory size is limited and recovers - strays_before = self.get_mdc_stat("strays_created") - try: - self.mount_a.run_python(dedent(""" - import os - path = os.path.join("{path}", "subdir3") - os.mkdir(path) - for n in range({file_count}): - fpath = os.path.join(path, "%s" % n) - f = open(fpath, 'w') - f.write("%s" % n) - f.close() - os.unlink(fpath) - """.format( - path=self.mount_a.mountpoint, - file_count=LOW_LIMIT*10 # 10 stray directories, should collide before this count - ))) - except CommandFailedError: - pass # ENOSPAC - else: - raise RuntimeError("fragment size exceeded") - - strays_after = self.get_mdc_stat("strays_created") - self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT) - - self._wait_for_counter("mds_cache", "strays_enqueued", strays_after) - self._wait_for_counter("purge_queue", "pq_executed", strays_after) - - self.mount_a.run_python(dedent(""" - import os - path = os.path.join("{path}", "subdir4") - os.mkdir(path) - for n in range({file_count}): - fpath = os.path.join(path, "%s" % n) - f = open(fpath, 'w') - f.write("%s" % n) - f.close() - os.unlink(fpath) - """.format( - path=self.mount_a.mountpoint, - file_count=LOW_LIMIT - ))) - - def test_purge_queue_upgrade(self): - """ - That when starting on a system with no purge queue in the metadata - pool, we silently create one. - :return: - """ - - self.mds_cluster.mds_stop() - self.mds_cluster.mds_fail() - self.fs.rados(["rm", "500.00000000"]) - self.mds_cluster.mds_restart() - self.fs.wait_for_daemons() - - def test_purge_queue_op_rate(self): - """ - A busy purge queue is meant to aggregate operations sufficiently - that our RADOS ops to the metadata pool are not O(files). Check - that that is so. - :return: - """ - - # For low rates of deletion, the rate of metadata ops actually - # will be o(files), so to see the desired behaviour we have to give - # the system a significant quantity, i.e. an order of magnitude - # more than the number of files it will purge at one time. - - max_purge_files = 2 - - self.set_conf('mds', 'mds_bal_frag', 'false') - self.set_conf('mds', 'mds_max_purge_files', "%d" % max_purge_files) - self.fs.mds_fail_restart() - self.fs.wait_for_daemons() - - phase_1_files = 256 - phase_2_files = 512 - - self.mount_a.run_shell(["mkdir", "phase1"]) - self.mount_a.create_n_files("phase1/file", phase_1_files) - - self.mount_a.run_shell(["mkdir", "phase2"]) - self.mount_a.create_n_files("phase2/file", phase_2_files) - - def unlink_and_count_ops(path, expected_deletions): - initial_ops = self.get_stat("objecter", "op") - initial_pq_executed = self.get_stat("purge_queue", "pq_executed") - - self.mount_a.run_shell(["rm", "-rf", path]) - - self._wait_for_counter( - "purge_queue", "pq_executed", initial_pq_executed + expected_deletions - ) - - final_ops = self.get_stat("objecter", "op") - - # Calculation of the *overhead* operations, i.e. do not include - # the operations where we actually delete files. - return final_ops - initial_ops - expected_deletions - - self.fs.mds_asok(['flush', 'journal']) - phase1_ops = unlink_and_count_ops("phase1/", phase_1_files + 1) - - self.fs.mds_asok(['flush', 'journal']) - phase2_ops = unlink_and_count_ops("phase2/", phase_2_files + 1) - - log.info("Phase 1: {0}".format(phase1_ops)) - log.info("Phase 2: {0}".format(phase2_ops)) - - # The success criterion is that deleting double the number - # of files doesn't generate double the number of overhead ops - # -- this comparison is a rough approximation of that rule. - self.assertTrue(phase2_ops < phase1_ops * 1.25) - - # Finally, check that our activity did include properly quiescing - # the queue (i.e. call to Journaler::write_head in the right place), - # by restarting the MDS and checking that it doesn't try re-executing - # any of the work we did. - self.fs.mds_asok(['flush', 'journal']) # flush to ensure no strays - # hanging around - self.fs.mds_fail_restart() - self.fs.wait_for_daemons() - time.sleep(10) - self.assertEqual(self.get_stat("purge_queue", "pq_executed"), 0) - - def test_replicated_delete_speed(self): - """ - That deletions of replicated metadata are not pathologically slow - """ - rank_0_id, rank_1_id = self._setup_two_ranks() - - self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") - self.mds_cluster.mds_fail_restart(rank_1_id) - self.fs.wait_for_daemons() - - file_count = 10 - - self.mount_a.create_n_files("delete_me/file", file_count) - - self._force_migrate(rank_1_id, "delete_me", - self.mount_a.path_to_ino("delete_me/file_0")) - - begin = datetime.datetime.now() - self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")]) - end = datetime.datetime.now() - - # What we're really checking here is that we are completing client - # operations immediately rather than delaying until the next tick. - tick_period = float(self.fs.get_config("mds_tick_interval", - service_type="mds")) - - duration = (end - begin).total_seconds() - self.assertLess(duration, (file_count * tick_period) * 0.25) -