remove ceph code
[stor4nfv.git] / src / ceph / qa / tasks / cephfs / filesystem.py
diff --git a/src/ceph/qa/tasks/cephfs/filesystem.py b/src/ceph/qa/tasks/cephfs/filesystem.py
deleted file mode 100644 (file)
index 9638fd5..0000000
+++ /dev/null
@@ -1,1213 +0,0 @@
-
-from StringIO import StringIO
-import json
-import logging
-from gevent import Greenlet
-import os
-import time
-import datetime
-import re
-import errno
-import random
-
-from teuthology.exceptions import CommandFailedError
-from teuthology import misc
-from teuthology.nuke import clear_firewall
-from teuthology.parallel import parallel
-from tasks.ceph_manager import write_conf
-from tasks import ceph_manager
-
-
-log = logging.getLogger(__name__)
-
-
-DAEMON_WAIT_TIMEOUT = 120
-ROOT_INO = 1
-
-
-class ObjectNotFound(Exception):
-    def __init__(self, object_name):
-        self._object_name = object_name
-
-    def __str__(self):
-        return "Object not found: '{0}'".format(self._object_name)
-
-class FSStatus(object):
-    """
-    Operations on a snapshot of the FSMap.
-    """
-    def __init__(self, mon_manager):
-        self.mon = mon_manager
-        self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
-
-    def __str__(self):
-        return json.dumps(self.map, indent = 2, sort_keys = True)
-
-    # Expose the fsmap for manual inspection.
-    def __getitem__(self, key):
-        """
-        Get a field from the fsmap.
-        """
-        return self.map[key]
-
-    def get_filesystems(self):
-        """
-        Iterator for all filesystems.
-        """
-        for fs in self.map['filesystems']:
-            yield fs
-
-    def get_all(self):
-        """
-        Iterator for all the mds_info components in the FSMap.
-        """
-        for info in self.get_standbys():
-            yield info
-        for fs in self.map['filesystems']:
-            for info in fs['mdsmap']['info'].values():
-                yield info
-
-    def get_standbys(self):
-        """
-        Iterator for all standbys.
-        """
-        for info in self.map['standbys']:
-            yield info
-
-    def get_fsmap(self, fscid):
-        """
-        Get the fsmap for the given FSCID.
-        """
-        for fs in self.map['filesystems']:
-            if fscid is None or fs['id'] == fscid:
-                return fs
-        raise RuntimeError("FSCID {0} not in map".format(fscid))
-
-    def get_fsmap_byname(self, name):
-        """
-        Get the fsmap for the given file system name.
-        """
-        for fs in self.map['filesystems']:
-            if name is None or fs['mdsmap']['fs_name'] == name:
-                return fs
-        raise RuntimeError("FS {0} not in map".format(name))
-
-    def get_replays(self, fscid):
-        """
-        Get the standby:replay MDS for the given FSCID.
-        """
-        fs = self.get_fsmap(fscid)
-        for info in fs['mdsmap']['info'].values():
-            if info['state'] == 'up:standby-replay':
-                yield info
-
-    def get_ranks(self, fscid):
-        """
-        Get the ranks for the given FSCID.
-        """
-        fs = self.get_fsmap(fscid)
-        for info in fs['mdsmap']['info'].values():
-            if info['rank'] >= 0:
-                yield info
-
-    def get_rank(self, fscid, rank):
-        """
-        Get the rank for the given FSCID.
-        """
-        for info in self.get_ranks(fscid):
-            if info['rank'] == rank:
-                return info
-        raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
-
-    def get_mds(self, name):
-        """
-        Get the info for the given MDS name.
-        """
-        for info in self.get_all():
-            if info['name'] == name:
-                return info
-        return None
-
-    def get_mds_addr(self, name):
-        """
-        Return the instance addr as a string, like "10.214.133.138:6807\/10825"
-        """
-        info = self.get_mds(name)
-        if info:
-            return info['addr']
-        else:
-            log.warn(json.dumps(list(self.get_all()), indent=2))  # dump for debugging
-            raise RuntimeError("MDS id '{0}' not found in map".format(name))
-
-class CephCluster(object):
-    @property
-    def admin_remote(self):
-        first_mon = misc.get_first_mon(self._ctx, None)
-        (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
-        return result
-
-    def __init__(self, ctx):
-        self._ctx = ctx
-        self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
-
-    def get_config(self, key, service_type=None):
-        """
-        Get config from mon by default, or a specific service if caller asks for it
-        """
-        if service_type is None:
-            service_type = 'mon'
-
-        service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
-        return self.json_asok(['config', 'get', key], service_type, service_id)[key]
-
-    def set_ceph_conf(self, subsys, key, value):
-        if subsys not in self._ctx.ceph['ceph'].conf:
-            self._ctx.ceph['ceph'].conf[subsys] = {}
-        self._ctx.ceph['ceph'].conf[subsys][key] = value
-        write_conf(self._ctx)  # XXX because we don't have the ceph task's config object, if they
-                               # used a different config path this won't work.
-
-    def clear_ceph_conf(self, subsys, key):
-        del self._ctx.ceph['ceph'].conf[subsys][key]
-        write_conf(self._ctx)
-
-    def json_asok(self, command, service_type, service_id):
-        proc = self.mon_manager.admin_socket(service_type, service_id, command)
-        response_data = proc.stdout.getvalue()
-        log.info("_json_asok output: {0}".format(response_data))
-        if response_data.strip():
-            return json.loads(response_data)
-        else:
-            return None
-
-
-class MDSCluster(CephCluster):
-    """
-    Collective operations on all the MDS daemons in the Ceph cluster.  These
-    daemons may be in use by various Filesystems.
-
-    For the benefit of pre-multi-filesystem tests, this class is also
-    a parent of Filesystem.  The correct way to use MDSCluster going forward is
-    as a separate instance outside of your (multiple) Filesystem instances.
-    """
-    def __init__(self, ctx):
-        super(MDSCluster, self).__init__(ctx)
-
-        self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
-
-        if len(self.mds_ids) == 0:
-            raise RuntimeError("This task requires at least one MDS")
-
-        if hasattr(self._ctx, "daemons"):
-            # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
-            self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
-
-    def _one_or_all(self, mds_id, cb, in_parallel=True):
-        """
-        Call a callback for a single named MDS, or for all.
-
-        Note that the parallelism here isn't for performance, it's to avoid being overly kind
-        to the cluster by waiting a graceful ssh-latency of time between doing things, and to
-        avoid being overly kind by executing them in a particular order.  However, some actions
-        don't cope with being done in parallel, so it's optional (`in_parallel`)
-
-        :param mds_id: MDS daemon name, or None
-        :param cb: Callback taking single argument of MDS daemon name
-        :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
-        """
-        if mds_id is None:
-            if in_parallel:
-                with parallel() as p:
-                    for mds_id in self.mds_ids:
-                        p.spawn(cb, mds_id)
-            else:
-                for mds_id in self.mds_ids:
-                    cb(mds_id)
-        else:
-            cb(mds_id)
-
-    def get_config(self, key, service_type=None):
-        """
-        get_config specialization of service_type="mds"
-        """
-        if service_type != "mds":
-            return super(MDSCluster, self).get_config(key, service_type)
-
-        # Some tests stop MDS daemons, don't send commands to a dead one:
-        service_id = random.sample(filter(lambda i: self.mds_daemons[i].running(), self.mds_daemons), 1)[0]
-        return self.json_asok(['config', 'get', key], service_type, service_id)[key]
-
-    def mds_stop(self, mds_id=None):
-        """
-        Stop the MDS daemon process(se).  If it held a rank, that rank
-        will eventually go laggy.
-        """
-        self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
-
-    def mds_fail(self, mds_id=None):
-        """
-        Inform MDSMonitor of the death of the daemon process(es).  If it held
-        a rank, that rank will be relinquished.
-        """
-        self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
-
-    def mds_restart(self, mds_id=None):
-        self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
-
-    def mds_fail_restart(self, mds_id=None):
-        """
-        Variation on restart that includes marking MDSs as failed, so that doing this
-        operation followed by waiting for healthy daemon states guarantees that they
-        have gone down and come up, rather than potentially seeing the healthy states
-        that existed before the restart.
-        """
-        def _fail_restart(id_):
-            self.mds_daemons[id_].stop()
-            self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
-            self.mds_daemons[id_].restart()
-
-        self._one_or_all(mds_id, _fail_restart)
-
-    def newfs(self, name='cephfs', create=True):
-        return Filesystem(self._ctx, name=name, create=create)
-
-    def status(self):
-        return FSStatus(self.mon_manager)
-
-    def delete_all_filesystems(self):
-        """
-        Remove all filesystems that exist, and any pools in use by them.
-        """
-        pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
-        pool_id_name = {}
-        for pool in pools:
-            pool_id_name[pool['pool']] = pool['pool_name']
-
-        # mark cluster down for each fs to prevent churn during deletion
-        status = self.status()
-        for fs in status.get_filesystems():
-            self.mon_manager.raw_cluster_cmd("fs", "set", fs['mdsmap']['fs_name'], "cluster_down", "true")
-
-        # get a new copy as actives may have since changed
-        status = self.status()
-        for fs in status.get_filesystems():
-            mdsmap = fs['mdsmap']
-            metadata_pool = pool_id_name[mdsmap['metadata_pool']]
-
-            for gid in mdsmap['up'].values():
-                self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
-
-            self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
-            self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
-                                             metadata_pool, metadata_pool,
-                                             '--yes-i-really-really-mean-it')
-            for data_pool in mdsmap['data_pools']:
-                data_pool = pool_id_name[data_pool]
-                try:
-                    self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
-                                                     data_pool, data_pool,
-                                                     '--yes-i-really-really-mean-it')
-                except CommandFailedError as e:
-                    if e.exitstatus == 16: # EBUSY, this data pool is used
-                        pass               # by two metadata pools, let the 2nd
-                    else:                  # pass delete it
-                        raise
-
-    def get_standby_daemons(self):
-        return set([s['name'] for s in self.status().get_standbys()])
-
-    def get_mds_hostnames(self):
-        result = set()
-        for mds_id in self.mds_ids:
-            mds_remote = self.mon_manager.find_remote('mds', mds_id)
-            result.add(mds_remote.hostname)
-
-        return list(result)
-
-    def set_clients_block(self, blocked, mds_id=None):
-        """
-        Block (using iptables) client communications to this MDS.  Be careful: if
-        other services are running on this MDS, or other MDSs try to talk to this
-        MDS, their communications may also be blocked as collatoral damage.
-
-        :param mds_id: Optional ID of MDS to block, default to all
-        :return:
-        """
-        da_flag = "-A" if blocked else "-D"
-
-        def set_block(_mds_id):
-            remote = self.mon_manager.find_remote('mds', _mds_id)
-            status = self.status()
-
-            addr = status.get_mds_addr(_mds_id)
-            ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
-
-            remote.run(
-                args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
-                      "comment", "--comment", "teuthology"])
-            remote.run(
-                args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
-                      "comment", "--comment", "teuthology"])
-
-        self._one_or_all(mds_id, set_block, in_parallel=False)
-
-    def clear_firewall(self):
-        clear_firewall(self._ctx)
-
-    def get_mds_info(self, mds_id):
-        return FSStatus(self.mon_manager).get_mds(mds_id)
-
-    def is_full(self):
-        flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
-        return 'full' in flags
-
-    def is_pool_full(self, pool_name):
-        pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
-        for pool in pools:
-            if pool['pool_name'] == pool_name:
-                return 'full' in pool['flags_names'].split(",")
-
-        raise RuntimeError("Pool not found '{0}'".format(pool_name))
-
-class Filesystem(MDSCluster):
-    """
-    This object is for driving a CephFS filesystem.  The MDS daemons driven by
-    MDSCluster may be shared with other Filesystems.
-    """
-    def __init__(self, ctx, fscid=None, name=None, create=False,
-                 ec_profile=None):
-        super(Filesystem, self).__init__(ctx)
-
-        self.name = name
-        self.ec_profile = ec_profile
-        self.id = None
-        self.metadata_pool_name = None
-        self.metadata_overlay = False
-        self.data_pool_name = None
-        self.data_pools = None
-
-        client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
-        self.client_id = client_list[0]
-        self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
-
-        if name is not None:
-            if fscid is not None:
-                raise RuntimeError("cannot specify fscid when creating fs")
-            if create and not self.legacy_configured():
-                self.create()
-        else:
-            if fscid is not None:
-                self.id = fscid
-                self.getinfo(refresh = True)
-
-        # Stash a reference to the first created filesystem on ctx, so
-        # that if someone drops to the interactive shell they can easily
-        # poke our methods.
-        if not hasattr(self._ctx, "filesystem"):
-            self._ctx.filesystem = self
-
-    def getinfo(self, refresh = False):
-        status = self.status()
-        if self.id is not None:
-            fsmap = status.get_fsmap(self.id)
-        elif self.name is not None:
-            fsmap = status.get_fsmap_byname(self.name)
-        else:
-            fss = [fs for fs in status.get_filesystems()]
-            if len(fss) == 1:
-                fsmap = fss[0]
-            elif len(fss) == 0:
-                raise RuntimeError("no file system available")
-            else:
-                raise RuntimeError("more than one file system available")
-        self.id = fsmap['id']
-        self.name = fsmap['mdsmap']['fs_name']
-        self.get_pool_names(status = status, refresh = refresh)
-        return status
-
-    def set_metadata_overlay(self, overlay):
-        if self.id is not None:
-            raise RuntimeError("cannot specify fscid when configuring overlay")
-        self.metadata_overlay = overlay
-
-    def deactivate(self, rank):
-        if rank < 0:
-            raise RuntimeError("invalid rank")
-        elif rank == 0:
-            raise RuntimeError("cannot deactivate rank 0")
-        self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
-
-    def set_max_mds(self, max_mds):
-        self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds)
-
-    def set_allow_dirfrags(self, yes):
-        self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_dirfrags", str(yes).lower(), '--yes-i-really-mean-it')
-
-    def get_pgs_per_fs_pool(self):
-        """
-        Calculate how many PGs to use when creating a pool, in order to avoid raising any
-        health warnings about mon_pg_warn_min_per_osd
-
-        :return: an integer number of PGs
-        """
-        pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd'))
-        osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
-        return pg_warn_min_per_osd * osd_count
-
-    def create(self):
-        if self.name is None:
-            self.name = "cephfs"
-        if self.metadata_pool_name is None:
-            self.metadata_pool_name = "{0}_metadata".format(self.name)
-        if self.data_pool_name is None:
-            data_pool_name = "{0}_data".format(self.name)
-        else:
-            data_pool_name = self.data_pool_name
-
-        log.info("Creating filesystem '{0}'".format(self.name))
-
-        pgs_per_fs_pool = self.get_pgs_per_fs_pool()
-
-        self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
-                                         self.metadata_pool_name, pgs_per_fs_pool.__str__())
-        if self.metadata_overlay:
-            self.mon_manager.raw_cluster_cmd('fs', 'new',
-                                             self.name, self.metadata_pool_name, data_pool_name,
-                                             '--allow-dangerous-metadata-overlay')
-        else:
-            if self.ec_profile:
-                log.info("EC profile is %s", self.ec_profile)
-                cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
-                cmd.extend(self.ec_profile)
-                self.mon_manager.raw_cluster_cmd(*cmd)
-                self.mon_manager.raw_cluster_cmd(
-                    'osd', 'pool', 'create',
-                    data_pool_name, pgs_per_fs_pool.__str__(), 'erasure',
-                    data_pool_name)
-                self.mon_manager.raw_cluster_cmd(
-                    'osd', 'pool', 'set',
-                    data_pool_name, 'allow_ec_overwrites', 'true')
-            else:
-                self.mon_manager.raw_cluster_cmd(
-                    'osd', 'pool', 'create',
-                    data_pool_name, pgs_per_fs_pool.__str__())
-            self.mon_manager.raw_cluster_cmd('fs', 'new',
-                                             self.name, self.metadata_pool_name, data_pool_name)
-        self.check_pool_application(self.metadata_pool_name)
-        self.check_pool_application(data_pool_name)
-        # Turn off spurious standby count warnings from modifying max_mds in tests.
-        try:
-            self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
-        except CommandFailedError as e:
-            if e.exitstatus == 22:
-                # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
-                pass
-            else:
-                raise
-
-        self.getinfo(refresh = True)
-
-        
-    def check_pool_application(self, pool_name):
-        osd_map = self.mon_manager.get_osd_dump_json()
-        for pool in osd_map['pools']:
-            if pool['pool_name'] == pool_name:
-                if "application_metadata" in pool:
-                    if not "cephfs" in pool['application_metadata']:
-                        raise RuntimeError("Pool %p does not name cephfs as application!".\
-                                           format(pool_name))
-        
-
-    def __del__(self):
-        if getattr(self._ctx, "filesystem", None) == self:
-            delattr(self._ctx, "filesystem")
-
-    def exists(self):
-        """
-        Whether a filesystem exists in the mon's filesystem list
-        """
-        fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
-        return self.name in [fs['name'] for fs in fs_list]
-
-    def legacy_configured(self):
-        """
-        Check if a legacy (i.e. pre "fs new") filesystem configuration is present.  If this is
-        the case, the caller should avoid using Filesystem.create
-        """
-        try:
-            out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
-            pools = json.loads(out_text)
-            metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
-            if metadata_pool_exists:
-                self.metadata_pool_name = 'metadata'
-        except CommandFailedError as e:
-            # For use in upgrade tests, Ceph cuttlefish and earlier don't support
-            # structured output (--format) from the CLI.
-            if e.exitstatus == 22:
-                metadata_pool_exists = True
-            else:
-                raise
-
-        return metadata_pool_exists
-
-    def _df(self):
-        return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
-
-    def get_mds_map(self):
-        return self.status().get_fsmap(self.id)['mdsmap']
-
-    def add_data_pool(self, name):
-        self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
-        self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
-        self.get_pool_names(refresh = True)
-        for poolid, fs_name in self.data_pools.items():
-            if name == fs_name:
-                return poolid
-        raise RuntimeError("could not get just created pool '{0}'".format(name))
-
-    def get_pool_names(self, refresh = False, status = None):
-        if refresh or self.metadata_pool_name is None or self.data_pools is None:
-            if status is None:
-                status = self.status()
-            fsmap = status.get_fsmap(self.id)
-
-            osd_map = self.mon_manager.get_osd_dump_json()
-            id_to_name = {}
-            for p in osd_map['pools']:
-                id_to_name[p['pool']] = p['pool_name']
-
-            self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
-            self.data_pools = {}
-            for data_pool in fsmap['mdsmap']['data_pools']:
-                self.data_pools[data_pool] = id_to_name[data_pool]
-
-    def get_data_pool_name(self, refresh = False):
-        if refresh or self.data_pools is None:
-            self.get_pool_names(refresh = True)
-        assert(len(self.data_pools) == 1)
-        return self.data_pools.values()[0]
-
-    def get_data_pool_id(self, refresh = False):
-        """
-        Don't call this if you have multiple data pools
-        :return: integer
-        """
-        if refresh or self.data_pools is None:
-            self.get_pool_names(refresh = True)
-        assert(len(self.data_pools) == 1)
-        return self.data_pools.keys()[0]
-
-    def get_data_pool_names(self, refresh = False):
-        if refresh or self.data_pools is None:
-            self.get_pool_names(refresh = True)
-        return self.data_pools.values()
-
-    def get_metadata_pool_name(self):
-        return self.metadata_pool_name
-
-    def set_data_pool_name(self, name):
-        if self.id is not None:
-            raise RuntimeError("can't set filesystem name if its fscid is set")
-        self.data_pool_name = name
-
-    def get_namespace_id(self):
-        return self.id
-
-    def get_pool_df(self, pool_name):
-        """
-        Return a dict like:
-        {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
-        """
-        for pool_df in self._df()['pools']:
-            if pool_df['name'] == pool_name:
-                return pool_df['stats']
-
-        raise RuntimeError("Pool name '{0}' not found".format(pool_name))
-
-    def get_usage(self):
-        return self._df()['stats']['total_used_bytes']
-
-    def are_daemons_healthy(self):
-        """
-        Return true if all daemons are in one of active, standby, standby-replay, and
-        at least max_mds daemons are in 'active'.
-
-        Unlike most of Filesystem, this function is tolerant of new-style `fs`
-        commands being missing, because we are part of the ceph installation
-        process during upgrade suites, so must fall back to old style commands
-        when we get an EINVAL on a new style command.
-
-        :return:
-        """
-
-        active_count = 0
-        try:
-            mds_map = self.get_mds_map()
-        except CommandFailedError as cfe:
-            # Old version, fall back to non-multi-fs commands
-            if cfe.exitstatus == errno.EINVAL:
-                mds_map = json.loads(
-                        self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
-            else:
-                raise
-
-        log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
-
-        for mds_id, mds_status in mds_map['info'].items():
-            if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
-                log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
-                return False
-            elif mds_status['state'] == 'up:active':
-                active_count += 1
-
-        log.info("are_daemons_healthy: {0}/{1}".format(
-            active_count, mds_map['max_mds']
-        ))
-
-        if active_count >= mds_map['max_mds']:
-            # The MDSMap says these guys are active, but let's check they really are
-            for mds_id, mds_status in mds_map['info'].items():
-                if mds_status['state'] == 'up:active':
-                    try:
-                        daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
-                    except CommandFailedError as cfe:
-                        if cfe.exitstatus == errno.EINVAL:
-                            # Old version, can't do this check
-                            continue
-                        else:
-                            # MDS not even running
-                            return False
-
-                    if daemon_status['state'] != 'up:active':
-                        # MDS hasn't taken the latest map yet
-                        return False
-
-            return True
-        else:
-            return False
-
-    def get_daemon_names(self, state=None):
-        """
-        Return MDS daemon names of those daemons in the given state
-        :param state:
-        :return:
-        """
-        status = self.get_mds_map()
-        result = []
-        for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
-            if mds_status['state'] == state or state is None:
-                result.append(mds_status['name'])
-
-        return result
-
-    def get_active_names(self):
-        """
-        Return MDS daemon names of those daemons holding ranks
-        in state up:active
-
-        :return: list of strings like ['a', 'b'], sorted by rank
-        """
-        return self.get_daemon_names("up:active")
-
-    def get_all_mds_rank(self):
-        status = self.get_mds_map()
-        result = []
-        for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
-            if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
-                result.append(mds_status['rank'])
-
-        return result
-
-    def get_rank_names(self):
-        """
-        Return MDS daemon names of those daemons holding a rank,
-        sorted by rank.  This includes e.g. up:replay/reconnect
-        as well as active, but does not include standby or
-        standby-replay.
-        """
-        status = self.get_mds_map()
-        result = []
-        for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
-            if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
-                result.append(mds_status['name'])
-
-        return result
-
-    def wait_for_daemons(self, timeout=None):
-        """
-        Wait until all daemons are healthy
-        :return:
-        """
-
-        if timeout is None:
-            timeout = DAEMON_WAIT_TIMEOUT
-
-        elapsed = 0
-        while True:
-            if self.are_daemons_healthy():
-                return
-            else:
-                time.sleep(1)
-                elapsed += 1
-
-            if elapsed > timeout:
-                raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
-
-    def get_lone_mds_id(self):
-        """
-        Get a single MDS ID: the only one if there is only one
-        configured, else the only one currently holding a rank,
-        else raise an error.
-        """
-        if len(self.mds_ids) != 1:
-            alive = self.get_rank_names()
-            if len(alive) == 1:
-                return alive[0]
-            else:
-                raise ValueError("Explicit MDS argument required when multiple MDSs in use")
-        else:
-            return self.mds_ids[0]
-
-    def recreate(self):
-        log.info("Creating new filesystem")
-        self.delete_all_filesystems()
-        self.id = None
-        self.create()
-
-    def put_metadata_object_raw(self, object_id, infile):
-        """
-        Save an object to the metadata pool
-        """
-        temp_bin_path = infile
-        self.client_remote.run(args=[
-            'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
-        ])
-
-    def get_metadata_object_raw(self, object_id):
-        """
-        Retrieve an object from the metadata pool and store it in a file.
-        """
-        temp_bin_path = '/tmp/' + object_id + '.bin'
-
-        self.client_remote.run(args=[
-            'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
-        ])
-
-        return temp_bin_path
-
-    def get_metadata_object(self, object_type, object_id):
-        """
-        Retrieve an object from the metadata pool, pass it through
-        ceph-dencoder to dump it to JSON, and return the decoded object.
-        """
-        temp_bin_path = '/tmp/out.bin'
-
-        self.client_remote.run(args=[
-            'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
-        ])
-
-        stdout = StringIO()
-        self.client_remote.run(args=[
-            'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
-        ], stdout=stdout)
-        dump_json = stdout.getvalue().strip()
-        try:
-            dump = json.loads(dump_json)
-        except (TypeError, ValueError):
-            log.error("Failed to decode JSON: '{0}'".format(dump_json))
-            raise
-
-        return dump
-
-    def get_journal_version(self):
-        """
-        Read the JournalPointer and Journal::Header objects to learn the version of
-        encoding in use.
-        """
-        journal_pointer_object = '400.00000000'
-        journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
-        journal_ino = journal_pointer_dump['journal_pointer']['front']
-
-        journal_header_object = "{0:x}.00000000".format(journal_ino)
-        journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
-
-        version = journal_header_dump['journal_header']['stream_format']
-        log.info("Read journal version {0}".format(version))
-
-        return version
-
-    def mds_asok(self, command, mds_id=None):
-        if mds_id is None:
-            mds_id = self.get_lone_mds_id()
-
-        return self.json_asok(command, 'mds', mds_id)
-
-    def read_cache(self, path, depth=None):
-        cmd = ["dump", "tree", path]
-        if depth is not None:
-            cmd.append(depth.__str__())
-        result = self.mds_asok(cmd)
-        if len(result) == 0:
-            raise RuntimeError("Path not found in cache: {0}".format(path))
-
-        return result
-
-    def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
-        """
-        Block until the MDS reaches a particular state, or a failure condition
-        is met.
-
-        When there are multiple MDSs, succeed when exaclty one MDS is in the
-        goal state, or fail when any MDS is in the reject state.
-
-        :param goal_state: Return once the MDS is in this state
-        :param reject: Fail if the MDS enters this state before the goal state
-        :param timeout: Fail if this many seconds pass before reaching goal
-        :return: number of seconds waited, rounded down to integer
-        """
-
-        started_at = time.time()
-        while True:
-            status = self.status()
-            if rank is not None:
-                mds_info = status.get_rank(self.id, rank)
-                current_state = mds_info['state'] if mds_info else None
-                log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
-            elif mds_id is not None:
-                # mds_info is None if no daemon with this ID exists in the map
-                mds_info = status.get_mds(mds_id)
-                current_state = mds_info['state'] if mds_info else None
-                log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
-            else:
-                # In general, look for a single MDS
-                states = [m['state'] for m in status.get_ranks(self.id)]
-                if [s for s in states if s == goal_state] == [goal_state]:
-                    current_state = goal_state
-                elif reject in states:
-                    current_state = reject
-                else:
-                    current_state = None
-                log.info("mapped states {0} to {1}".format(states, current_state))
-
-            elapsed = time.time() - started_at
-            if current_state == goal_state:
-                log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
-                return elapsed
-            elif reject is not None and current_state == reject:
-                raise RuntimeError("MDS in reject state {0}".format(current_state))
-            elif timeout is not None and elapsed > timeout:
-                log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
-                raise RuntimeError(
-                    "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
-                        elapsed, goal_state, current_state
-                    ))
-            else:
-                time.sleep(1)
-
-    def _read_data_xattr(self, ino_no, xattr_name, type, pool):
-        mds_id = self.mds_ids[0]
-        remote = self.mds_daemons[mds_id].remote
-        if pool is None:
-            pool = self.get_data_pool_name()
-
-        obj_name = "{0:x}.00000000".format(ino_no)
-
-        args = [
-            os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
-        ]
-        try:
-            proc = remote.run(
-                args=args,
-                stdout=StringIO())
-        except CommandFailedError as e:
-            log.error(e.__str__())
-            raise ObjectNotFound(obj_name)
-
-        data = proc.stdout.getvalue()
-
-        p = remote.run(
-            args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
-            stdout=StringIO(),
-            stdin=data
-        )
-
-        return json.loads(p.stdout.getvalue().strip())
-
-    def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
-        """
-        Write to an xattr of the 0th data object of an inode.  Will
-        succeed whether the object and/or xattr already exist or not.
-
-        :param ino_no: integer inode number
-        :param xattr_name: string name of the xattr
-        :param data: byte array data to write to the xattr
-        :param pool: name of data pool or None to use primary data pool
-        :return: None
-        """
-        remote = self.mds_daemons[self.mds_ids[0]].remote
-        if pool is None:
-            pool = self.get_data_pool_name()
-
-        obj_name = "{0:x}.00000000".format(ino_no)
-        args = [
-            os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
-            obj_name, xattr_name, data
-        ]
-        remote.run(
-            args=args,
-            stdout=StringIO())
-
-    def read_backtrace(self, ino_no, pool=None):
-        """
-        Read the backtrace from the data pool, return a dict in the format
-        given by inode_backtrace_t::dump, which is something like:
-
-        ::
-
-            rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
-            ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
-
-            { "ino": 1099511627778,
-              "ancestors": [
-                    { "dirino": 1,
-                      "dname": "blah",
-                      "version": 11}],
-              "pool": 1,
-              "old_pools": []}
-
-        :param pool: name of pool to read backtrace from.  If omitted, FS must have only
-                     one data pool and that will be used.
-        """
-        return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
-
-    def read_layout(self, ino_no, pool=None):
-        """
-        Read 'layout' xattr of an inode and parse the result, returning a dict like:
-        ::
-            {
-                "stripe_unit": 4194304,
-                "stripe_count": 1,
-                "object_size": 4194304,
-                "pool_id": 1,
-                "pool_ns": "",
-            }
-
-        :param pool: name of pool to read backtrace from.  If omitted, FS must have only
-                     one data pool and that will be used.
-        """
-        return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
-
-    def _enumerate_data_objects(self, ino, size):
-        """
-        Get the list of expected data objects for a range, and the list of objects
-        that really exist.
-
-        :return a tuple of two lists of strings (expected, actual)
-        """
-        stripe_size = 1024 * 1024 * 4
-
-        size = max(stripe_size, size)
-
-        want_objects = [
-            "{0:x}.{1:08x}".format(ino, n)
-            for n in range(0, ((size - 1) / stripe_size) + 1)
-        ]
-
-        exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
-
-        return want_objects, exist_objects
-
-    def data_objects_present(self, ino, size):
-        """
-        Check that *all* the expected data objects for an inode are present in the data pool
-        """
-
-        want_objects, exist_objects = self._enumerate_data_objects(ino, size)
-        missing = set(want_objects) - set(exist_objects)
-
-        if missing:
-            log.info("Objects missing (ino {0}, size {1}): {2}".format(
-                ino, size, missing
-            ))
-            return False
-        else:
-            log.info("All objects for ino {0} size {1} found".format(ino, size))
-            return True
-
-    def data_objects_absent(self, ino, size):
-        want_objects, exist_objects = self._enumerate_data_objects(ino, size)
-        present = set(want_objects) & set(exist_objects)
-
-        if present:
-            log.info("Objects not absent (ino {0}, size {1}): {2}".format(
-                ino, size, present
-            ))
-            return False
-        else:
-            log.info("All objects for ino {0} size {1} are absent".format(ino, size))
-            return True
-
-    def dirfrag_exists(self, ino, frag):
-        try:
-            self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
-        except CommandFailedError as e:
-            return False
-        else:
-            return True
-
-    def rados(self, args, pool=None, namespace=None, stdin_data=None):
-        """
-        Call into the `rados` CLI from an MDS
-        """
-
-        if pool is None:
-            pool = self.get_metadata_pool_name()
-
-        # Doesn't matter which MDS we use to run rados commands, they all
-        # have access to the pools
-        mds_id = self.mds_ids[0]
-        remote = self.mds_daemons[mds_id].remote
-
-        # NB we could alternatively use librados pybindings for this, but it's a one-liner
-        # using the `rados` CLI
-        args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
-                (["--namespace", namespace] if namespace else []) +
-                args)
-        p = remote.run(
-            args=args,
-            stdin=stdin_data,
-            stdout=StringIO())
-        return p.stdout.getvalue().strip()
-
-    def list_dirfrag(self, dir_ino):
-        """
-        Read the named object and return the list of omap keys
-
-        :return a list of 0 or more strings
-        """
-
-        dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
-
-        try:
-            key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
-        except CommandFailedError as e:
-            log.error(e.__str__())
-            raise ObjectNotFound(dirfrag_obj_name)
-
-        return key_list_str.split("\n") if key_list_str else []
-
-    def erase_metadata_objects(self, prefix):
-        """
-        For all objects in the metadata pool matching the prefix,
-        erase them.
-
-        This O(N) with the number of objects in the pool, so only suitable
-        for use on toy test filesystems.
-        """
-        all_objects = self.rados(["ls"]).split("\n")
-        matching_objects = [o for o in all_objects if o.startswith(prefix)]
-        for o in matching_objects:
-            self.rados(["rm", o])
-
-    def erase_mds_objects(self, rank):
-        """
-        Erase all the per-MDS objects for a particular rank.  This includes
-        inotable, sessiontable, journal
-        """
-
-        def obj_prefix(multiplier):
-            """
-            MDS object naming conventions like rank 1's
-            journal is at 201.***
-            """
-            return "%x." % (multiplier * 0x100 + rank)
-
-        # MDS_INO_LOG_OFFSET
-        self.erase_metadata_objects(obj_prefix(2))
-        # MDS_INO_LOG_BACKUP_OFFSET
-        self.erase_metadata_objects(obj_prefix(3))
-        # MDS_INO_LOG_POINTER_OFFSET
-        self.erase_metadata_objects(obj_prefix(4))
-        # MDSTables & SessionMap
-        self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
-
-    @property
-    def _prefix(self):
-        """
-        Override this to set a different
-        """
-        return ""
-
-    def _run_tool(self, tool, args, rank=None, quiet=False):
-        # Tests frequently have [client] configuration that jacks up
-        # the objecter log level (unlikely to be interesting here)
-        # and does not set the mds log level (very interesting here)
-        if quiet:
-            base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
-        else:
-            base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
-
-        if rank is not None:
-            base_args.extend(["--rank", "%d" % rank])
-
-        t1 = datetime.datetime.now()
-        r = self.tool_remote.run(
-            args=base_args + args,
-            stdout=StringIO()).stdout.getvalue().strip()
-        duration = datetime.datetime.now() - t1
-        log.info("Ran {0} in time {1}, result:\n{2}".format(
-            base_args + args, duration, r
-        ))
-        return r
-
-    @property
-    def tool_remote(self):
-        """
-        An arbitrary remote to use when invoking recovery tools.  Use an MDS host because
-        it'll definitely have keys with perms to access cephfs metadata pool.  This is public
-        so that tests can use this remote to go get locally written output files from the tools.
-        """
-        mds_id = self.mds_ids[0]
-        return self.mds_daemons[mds_id].remote
-
-    def journal_tool(self, args, rank=None, quiet=False):
-        """
-        Invoke cephfs-journal-tool with the passed arguments, and return its stdout
-        """
-        return self._run_tool("cephfs-journal-tool", args, rank, quiet)
-
-    def table_tool(self, args, quiet=False):
-        """
-        Invoke cephfs-table-tool with the passed arguments, and return its stdout
-        """
-        return self._run_tool("cephfs-table-tool", args, None, quiet)
-
-    def data_scan(self, args, quiet=False, worker_count=1):
-        """
-        Invoke cephfs-data-scan with the passed arguments, and return its stdout
-
-        :param worker_count: if greater than 1, multiple workers will be run
-                             in parallel and the return value will be None
-        """
-
-        workers = []
-
-        for n in range(0, worker_count):
-            if worker_count > 1:
-                # data-scan args first token is a command, followed by args to it.
-                # insert worker arguments after the command.
-                cmd = args[0]
-                worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
-            else:
-                worker_args = args
-
-            workers.append(Greenlet.spawn(lambda wargs=worker_args:
-                                          self._run_tool("cephfs-data-scan", wargs, None, quiet)))
-
-        for w in workers:
-            w.get()
-
-        if worker_count == 1:
-            return workers[0].value
-        else:
-            return None