initial code repo
[stor4nfv.git] / src / ceph / qa / tasks / cephfs / filesystem.py
diff --git a/src/ceph/qa/tasks/cephfs/filesystem.py b/src/ceph/qa/tasks/cephfs/filesystem.py
new file mode 100644 (file)
index 0000000..9638fd5
--- /dev/null
@@ -0,0 +1,1213 @@
+
+from StringIO import StringIO
+import json
+import logging
+from gevent import Greenlet
+import os
+import time
+import datetime
+import re
+import errno
+import random
+
+from teuthology.exceptions import CommandFailedError
+from teuthology import misc
+from teuthology.nuke import clear_firewall
+from teuthology.parallel import parallel
+from tasks.ceph_manager import write_conf
+from tasks import ceph_manager
+
+
+log = logging.getLogger(__name__)
+
+
+DAEMON_WAIT_TIMEOUT = 120
+ROOT_INO = 1
+
+
+class ObjectNotFound(Exception):
+    def __init__(self, object_name):
+        self._object_name = object_name
+
+    def __str__(self):
+        return "Object not found: '{0}'".format(self._object_name)
+
+class FSStatus(object):
+    """
+    Operations on a snapshot of the FSMap.
+    """
+    def __init__(self, mon_manager):
+        self.mon = mon_manager
+        self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
+
+    def __str__(self):
+        return json.dumps(self.map, indent = 2, sort_keys = True)
+
+    # Expose the fsmap for manual inspection.
+    def __getitem__(self, key):
+        """
+        Get a field from the fsmap.
+        """
+        return self.map[key]
+
+    def get_filesystems(self):
+        """
+        Iterator for all filesystems.
+        """
+        for fs in self.map['filesystems']:
+            yield fs
+
+    def get_all(self):
+        """
+        Iterator for all the mds_info components in the FSMap.
+        """
+        for info in self.get_standbys():
+            yield info
+        for fs in self.map['filesystems']:
+            for info in fs['mdsmap']['info'].values():
+                yield info
+
+    def get_standbys(self):
+        """
+        Iterator for all standbys.
+        """
+        for info in self.map['standbys']:
+            yield info
+
+    def get_fsmap(self, fscid):
+        """
+        Get the fsmap for the given FSCID.
+        """
+        for fs in self.map['filesystems']:
+            if fscid is None or fs['id'] == fscid:
+                return fs
+        raise RuntimeError("FSCID {0} not in map".format(fscid))
+
+    def get_fsmap_byname(self, name):
+        """
+        Get the fsmap for the given file system name.
+        """
+        for fs in self.map['filesystems']:
+            if name is None or fs['mdsmap']['fs_name'] == name:
+                return fs
+        raise RuntimeError("FS {0} not in map".format(name))
+
+    def get_replays(self, fscid):
+        """
+        Get the standby:replay MDS for the given FSCID.
+        """
+        fs = self.get_fsmap(fscid)
+        for info in fs['mdsmap']['info'].values():
+            if info['state'] == 'up:standby-replay':
+                yield info
+
+    def get_ranks(self, fscid):
+        """
+        Get the ranks for the given FSCID.
+        """
+        fs = self.get_fsmap(fscid)
+        for info in fs['mdsmap']['info'].values():
+            if info['rank'] >= 0:
+                yield info
+
+    def get_rank(self, fscid, rank):
+        """
+        Get the rank for the given FSCID.
+        """
+        for info in self.get_ranks(fscid):
+            if info['rank'] == rank:
+                return info
+        raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
+
+    def get_mds(self, name):
+        """
+        Get the info for the given MDS name.
+        """
+        for info in self.get_all():
+            if info['name'] == name:
+                return info
+        return None
+
+    def get_mds_addr(self, name):
+        """
+        Return the instance addr as a string, like "10.214.133.138:6807\/10825"
+        """
+        info = self.get_mds(name)
+        if info:
+            return info['addr']
+        else:
+            log.warn(json.dumps(list(self.get_all()), indent=2))  # dump for debugging
+            raise RuntimeError("MDS id '{0}' not found in map".format(name))
+
+class CephCluster(object):
+    @property
+    def admin_remote(self):
+        first_mon = misc.get_first_mon(self._ctx, None)
+        (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
+        return result
+
+    def __init__(self, ctx):
+        self._ctx = ctx
+        self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
+
+    def get_config(self, key, service_type=None):
+        """
+        Get config from mon by default, or a specific service if caller asks for it
+        """
+        if service_type is None:
+            service_type = 'mon'
+
+        service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
+        return self.json_asok(['config', 'get', key], service_type, service_id)[key]
+
+    def set_ceph_conf(self, subsys, key, value):
+        if subsys not in self._ctx.ceph['ceph'].conf:
+            self._ctx.ceph['ceph'].conf[subsys] = {}
+        self._ctx.ceph['ceph'].conf[subsys][key] = value
+        write_conf(self._ctx)  # XXX because we don't have the ceph task's config object, if they
+                               # used a different config path this won't work.
+
+    def clear_ceph_conf(self, subsys, key):
+        del self._ctx.ceph['ceph'].conf[subsys][key]
+        write_conf(self._ctx)
+
+    def json_asok(self, command, service_type, service_id):
+        proc = self.mon_manager.admin_socket(service_type, service_id, command)
+        response_data = proc.stdout.getvalue()
+        log.info("_json_asok output: {0}".format(response_data))
+        if response_data.strip():
+            return json.loads(response_data)
+        else:
+            return None
+
+
+class MDSCluster(CephCluster):
+    """
+    Collective operations on all the MDS daemons in the Ceph cluster.  These
+    daemons may be in use by various Filesystems.
+
+    For the benefit of pre-multi-filesystem tests, this class is also
+    a parent of Filesystem.  The correct way to use MDSCluster going forward is
+    as a separate instance outside of your (multiple) Filesystem instances.
+    """
+    def __init__(self, ctx):
+        super(MDSCluster, self).__init__(ctx)
+
+        self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
+
+        if len(self.mds_ids) == 0:
+            raise RuntimeError("This task requires at least one MDS")
+
+        if hasattr(self._ctx, "daemons"):
+            # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
+            self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
+
+    def _one_or_all(self, mds_id, cb, in_parallel=True):
+        """
+        Call a callback for a single named MDS, or for all.
+
+        Note that the parallelism here isn't for performance, it's to avoid being overly kind
+        to the cluster by waiting a graceful ssh-latency of time between doing things, and to
+        avoid being overly kind by executing them in a particular order.  However, some actions
+        don't cope with being done in parallel, so it's optional (`in_parallel`)
+
+        :param mds_id: MDS daemon name, or None
+        :param cb: Callback taking single argument of MDS daemon name
+        :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
+        """
+        if mds_id is None:
+            if in_parallel:
+                with parallel() as p:
+                    for mds_id in self.mds_ids:
+                        p.spawn(cb, mds_id)
+            else:
+                for mds_id in self.mds_ids:
+                    cb(mds_id)
+        else:
+            cb(mds_id)
+
+    def get_config(self, key, service_type=None):
+        """
+        get_config specialization of service_type="mds"
+        """
+        if service_type != "mds":
+            return super(MDSCluster, self).get_config(key, service_type)
+
+        # Some tests stop MDS daemons, don't send commands to a dead one:
+        service_id = random.sample(filter(lambda i: self.mds_daemons[i].running(), self.mds_daemons), 1)[0]
+        return self.json_asok(['config', 'get', key], service_type, service_id)[key]
+
+    def mds_stop(self, mds_id=None):
+        """
+        Stop the MDS daemon process(se).  If it held a rank, that rank
+        will eventually go laggy.
+        """
+        self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
+
+    def mds_fail(self, mds_id=None):
+        """
+        Inform MDSMonitor of the death of the daemon process(es).  If it held
+        a rank, that rank will be relinquished.
+        """
+        self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
+
+    def mds_restart(self, mds_id=None):
+        self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
+
+    def mds_fail_restart(self, mds_id=None):
+        """
+        Variation on restart that includes marking MDSs as failed, so that doing this
+        operation followed by waiting for healthy daemon states guarantees that they
+        have gone down and come up, rather than potentially seeing the healthy states
+        that existed before the restart.
+        """
+        def _fail_restart(id_):
+            self.mds_daemons[id_].stop()
+            self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
+            self.mds_daemons[id_].restart()
+
+        self._one_or_all(mds_id, _fail_restart)
+
+    def newfs(self, name='cephfs', create=True):
+        return Filesystem(self._ctx, name=name, create=create)
+
+    def status(self):
+        return FSStatus(self.mon_manager)
+
+    def delete_all_filesystems(self):
+        """
+        Remove all filesystems that exist, and any pools in use by them.
+        """
+        pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
+        pool_id_name = {}
+        for pool in pools:
+            pool_id_name[pool['pool']] = pool['pool_name']
+
+        # mark cluster down for each fs to prevent churn during deletion
+        status = self.status()
+        for fs in status.get_filesystems():
+            self.mon_manager.raw_cluster_cmd("fs", "set", fs['mdsmap']['fs_name'], "cluster_down", "true")
+
+        # get a new copy as actives may have since changed
+        status = self.status()
+        for fs in status.get_filesystems():
+            mdsmap = fs['mdsmap']
+            metadata_pool = pool_id_name[mdsmap['metadata_pool']]
+
+            for gid in mdsmap['up'].values():
+                self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
+
+            self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
+            self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
+                                             metadata_pool, metadata_pool,
+                                             '--yes-i-really-really-mean-it')
+            for data_pool in mdsmap['data_pools']:
+                data_pool = pool_id_name[data_pool]
+                try:
+                    self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
+                                                     data_pool, data_pool,
+                                                     '--yes-i-really-really-mean-it')
+                except CommandFailedError as e:
+                    if e.exitstatus == 16: # EBUSY, this data pool is used
+                        pass               # by two metadata pools, let the 2nd
+                    else:                  # pass delete it
+                        raise
+
+    def get_standby_daemons(self):
+        return set([s['name'] for s in self.status().get_standbys()])
+
+    def get_mds_hostnames(self):
+        result = set()
+        for mds_id in self.mds_ids:
+            mds_remote = self.mon_manager.find_remote('mds', mds_id)
+            result.add(mds_remote.hostname)
+
+        return list(result)
+
+    def set_clients_block(self, blocked, mds_id=None):
+        """
+        Block (using iptables) client communications to this MDS.  Be careful: if
+        other services are running on this MDS, or other MDSs try to talk to this
+        MDS, their communications may also be blocked as collatoral damage.
+
+        :param mds_id: Optional ID of MDS to block, default to all
+        :return:
+        """
+        da_flag = "-A" if blocked else "-D"
+
+        def set_block(_mds_id):
+            remote = self.mon_manager.find_remote('mds', _mds_id)
+            status = self.status()
+
+            addr = status.get_mds_addr(_mds_id)
+            ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
+
+            remote.run(
+                args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
+                      "comment", "--comment", "teuthology"])
+            remote.run(
+                args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
+                      "comment", "--comment", "teuthology"])
+
+        self._one_or_all(mds_id, set_block, in_parallel=False)
+
+    def clear_firewall(self):
+        clear_firewall(self._ctx)
+
+    def get_mds_info(self, mds_id):
+        return FSStatus(self.mon_manager).get_mds(mds_id)
+
+    def is_full(self):
+        flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
+        return 'full' in flags
+
+    def is_pool_full(self, pool_name):
+        pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
+        for pool in pools:
+            if pool['pool_name'] == pool_name:
+                return 'full' in pool['flags_names'].split(",")
+
+        raise RuntimeError("Pool not found '{0}'".format(pool_name))
+
+class Filesystem(MDSCluster):
+    """
+    This object is for driving a CephFS filesystem.  The MDS daemons driven by
+    MDSCluster may be shared with other Filesystems.
+    """
+    def __init__(self, ctx, fscid=None, name=None, create=False,
+                 ec_profile=None):
+        super(Filesystem, self).__init__(ctx)
+
+        self.name = name
+        self.ec_profile = ec_profile
+        self.id = None
+        self.metadata_pool_name = None
+        self.metadata_overlay = False
+        self.data_pool_name = None
+        self.data_pools = None
+
+        client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
+        self.client_id = client_list[0]
+        self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
+
+        if name is not None:
+            if fscid is not None:
+                raise RuntimeError("cannot specify fscid when creating fs")
+            if create and not self.legacy_configured():
+                self.create()
+        else:
+            if fscid is not None:
+                self.id = fscid
+                self.getinfo(refresh = True)
+
+        # Stash a reference to the first created filesystem on ctx, so
+        # that if someone drops to the interactive shell they can easily
+        # poke our methods.
+        if not hasattr(self._ctx, "filesystem"):
+            self._ctx.filesystem = self
+
+    def getinfo(self, refresh = False):
+        status = self.status()
+        if self.id is not None:
+            fsmap = status.get_fsmap(self.id)
+        elif self.name is not None:
+            fsmap = status.get_fsmap_byname(self.name)
+        else:
+            fss = [fs for fs in status.get_filesystems()]
+            if len(fss) == 1:
+                fsmap = fss[0]
+            elif len(fss) == 0:
+                raise RuntimeError("no file system available")
+            else:
+                raise RuntimeError("more than one file system available")
+        self.id = fsmap['id']
+        self.name = fsmap['mdsmap']['fs_name']
+        self.get_pool_names(status = status, refresh = refresh)
+        return status
+
+    def set_metadata_overlay(self, overlay):
+        if self.id is not None:
+            raise RuntimeError("cannot specify fscid when configuring overlay")
+        self.metadata_overlay = overlay
+
+    def deactivate(self, rank):
+        if rank < 0:
+            raise RuntimeError("invalid rank")
+        elif rank == 0:
+            raise RuntimeError("cannot deactivate rank 0")
+        self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
+
+    def set_max_mds(self, max_mds):
+        self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds)
+
+    def set_allow_dirfrags(self, yes):
+        self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_dirfrags", str(yes).lower(), '--yes-i-really-mean-it')
+
+    def get_pgs_per_fs_pool(self):
+        """
+        Calculate how many PGs to use when creating a pool, in order to avoid raising any
+        health warnings about mon_pg_warn_min_per_osd
+
+        :return: an integer number of PGs
+        """
+        pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd'))
+        osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
+        return pg_warn_min_per_osd * osd_count
+
+    def create(self):
+        if self.name is None:
+            self.name = "cephfs"
+        if self.metadata_pool_name is None:
+            self.metadata_pool_name = "{0}_metadata".format(self.name)
+        if self.data_pool_name is None:
+            data_pool_name = "{0}_data".format(self.name)
+        else:
+            data_pool_name = self.data_pool_name
+
+        log.info("Creating filesystem '{0}'".format(self.name))
+
+        pgs_per_fs_pool = self.get_pgs_per_fs_pool()
+
+        self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
+                                         self.metadata_pool_name, pgs_per_fs_pool.__str__())
+        if self.metadata_overlay:
+            self.mon_manager.raw_cluster_cmd('fs', 'new',
+                                             self.name, self.metadata_pool_name, data_pool_name,
+                                             '--allow-dangerous-metadata-overlay')
+        else:
+            if self.ec_profile:
+                log.info("EC profile is %s", self.ec_profile)
+                cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
+                cmd.extend(self.ec_profile)
+                self.mon_manager.raw_cluster_cmd(*cmd)
+                self.mon_manager.raw_cluster_cmd(
+                    'osd', 'pool', 'create',
+                    data_pool_name, pgs_per_fs_pool.__str__(), 'erasure',
+                    data_pool_name)
+                self.mon_manager.raw_cluster_cmd(
+                    'osd', 'pool', 'set',
+                    data_pool_name, 'allow_ec_overwrites', 'true')
+            else:
+                self.mon_manager.raw_cluster_cmd(
+                    'osd', 'pool', 'create',
+                    data_pool_name, pgs_per_fs_pool.__str__())
+            self.mon_manager.raw_cluster_cmd('fs', 'new',
+                                             self.name, self.metadata_pool_name, data_pool_name)
+        self.check_pool_application(self.metadata_pool_name)
+        self.check_pool_application(data_pool_name)
+        # Turn off spurious standby count warnings from modifying max_mds in tests.
+        try:
+            self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
+        except CommandFailedError as e:
+            if e.exitstatus == 22:
+                # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
+                pass
+            else:
+                raise
+
+        self.getinfo(refresh = True)
+
+        
+    def check_pool_application(self, pool_name):
+        osd_map = self.mon_manager.get_osd_dump_json()
+        for pool in osd_map['pools']:
+            if pool['pool_name'] == pool_name:
+                if "application_metadata" in pool:
+                    if not "cephfs" in pool['application_metadata']:
+                        raise RuntimeError("Pool %p does not name cephfs as application!".\
+                                           format(pool_name))
+        
+
+    def __del__(self):
+        if getattr(self._ctx, "filesystem", None) == self:
+            delattr(self._ctx, "filesystem")
+
+    def exists(self):
+        """
+        Whether a filesystem exists in the mon's filesystem list
+        """
+        fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
+        return self.name in [fs['name'] for fs in fs_list]
+
+    def legacy_configured(self):
+        """
+        Check if a legacy (i.e. pre "fs new") filesystem configuration is present.  If this is
+        the case, the caller should avoid using Filesystem.create
+        """
+        try:
+            out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
+            pools = json.loads(out_text)
+            metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
+            if metadata_pool_exists:
+                self.metadata_pool_name = 'metadata'
+        except CommandFailedError as e:
+            # For use in upgrade tests, Ceph cuttlefish and earlier don't support
+            # structured output (--format) from the CLI.
+            if e.exitstatus == 22:
+                metadata_pool_exists = True
+            else:
+                raise
+
+        return metadata_pool_exists
+
+    def _df(self):
+        return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
+
+    def get_mds_map(self):
+        return self.status().get_fsmap(self.id)['mdsmap']
+
+    def add_data_pool(self, name):
+        self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
+        self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
+        self.get_pool_names(refresh = True)
+        for poolid, fs_name in self.data_pools.items():
+            if name == fs_name:
+                return poolid
+        raise RuntimeError("could not get just created pool '{0}'".format(name))
+
+    def get_pool_names(self, refresh = False, status = None):
+        if refresh or self.metadata_pool_name is None or self.data_pools is None:
+            if status is None:
+                status = self.status()
+            fsmap = status.get_fsmap(self.id)
+
+            osd_map = self.mon_manager.get_osd_dump_json()
+            id_to_name = {}
+            for p in osd_map['pools']:
+                id_to_name[p['pool']] = p['pool_name']
+
+            self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
+            self.data_pools = {}
+            for data_pool in fsmap['mdsmap']['data_pools']:
+                self.data_pools[data_pool] = id_to_name[data_pool]
+
+    def get_data_pool_name(self, refresh = False):
+        if refresh or self.data_pools is None:
+            self.get_pool_names(refresh = True)
+        assert(len(self.data_pools) == 1)
+        return self.data_pools.values()[0]
+
+    def get_data_pool_id(self, refresh = False):
+        """
+        Don't call this if you have multiple data pools
+        :return: integer
+        """
+        if refresh or self.data_pools is None:
+            self.get_pool_names(refresh = True)
+        assert(len(self.data_pools) == 1)
+        return self.data_pools.keys()[0]
+
+    def get_data_pool_names(self, refresh = False):
+        if refresh or self.data_pools is None:
+            self.get_pool_names(refresh = True)
+        return self.data_pools.values()
+
+    def get_metadata_pool_name(self):
+        return self.metadata_pool_name
+
+    def set_data_pool_name(self, name):
+        if self.id is not None:
+            raise RuntimeError("can't set filesystem name if its fscid is set")
+        self.data_pool_name = name
+
+    def get_namespace_id(self):
+        return self.id
+
+    def get_pool_df(self, pool_name):
+        """
+        Return a dict like:
+        {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
+        """
+        for pool_df in self._df()['pools']:
+            if pool_df['name'] == pool_name:
+                return pool_df['stats']
+
+        raise RuntimeError("Pool name '{0}' not found".format(pool_name))
+
+    def get_usage(self):
+        return self._df()['stats']['total_used_bytes']
+
+    def are_daemons_healthy(self):
+        """
+        Return true if all daemons are in one of active, standby, standby-replay, and
+        at least max_mds daemons are in 'active'.
+
+        Unlike most of Filesystem, this function is tolerant of new-style `fs`
+        commands being missing, because we are part of the ceph installation
+        process during upgrade suites, so must fall back to old style commands
+        when we get an EINVAL on a new style command.
+
+        :return:
+        """
+
+        active_count = 0
+        try:
+            mds_map = self.get_mds_map()
+        except CommandFailedError as cfe:
+            # Old version, fall back to non-multi-fs commands
+            if cfe.exitstatus == errno.EINVAL:
+                mds_map = json.loads(
+                        self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
+            else:
+                raise
+
+        log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
+
+        for mds_id, mds_status in mds_map['info'].items():
+            if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
+                log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
+                return False
+            elif mds_status['state'] == 'up:active':
+                active_count += 1
+
+        log.info("are_daemons_healthy: {0}/{1}".format(
+            active_count, mds_map['max_mds']
+        ))
+
+        if active_count >= mds_map['max_mds']:
+            # The MDSMap says these guys are active, but let's check they really are
+            for mds_id, mds_status in mds_map['info'].items():
+                if mds_status['state'] == 'up:active':
+                    try:
+                        daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
+                    except CommandFailedError as cfe:
+                        if cfe.exitstatus == errno.EINVAL:
+                            # Old version, can't do this check
+                            continue
+                        else:
+                            # MDS not even running
+                            return False
+
+                    if daemon_status['state'] != 'up:active':
+                        # MDS hasn't taken the latest map yet
+                        return False
+
+            return True
+        else:
+            return False
+
+    def get_daemon_names(self, state=None):
+        """
+        Return MDS daemon names of those daemons in the given state
+        :param state:
+        :return:
+        """
+        status = self.get_mds_map()
+        result = []
+        for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
+            if mds_status['state'] == state or state is None:
+                result.append(mds_status['name'])
+
+        return result
+
+    def get_active_names(self):
+        """
+        Return MDS daemon names of those daemons holding ranks
+        in state up:active
+
+        :return: list of strings like ['a', 'b'], sorted by rank
+        """
+        return self.get_daemon_names("up:active")
+
+    def get_all_mds_rank(self):
+        status = self.get_mds_map()
+        result = []
+        for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
+            if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
+                result.append(mds_status['rank'])
+
+        return result
+
+    def get_rank_names(self):
+        """
+        Return MDS daemon names of those daemons holding a rank,
+        sorted by rank.  This includes e.g. up:replay/reconnect
+        as well as active, but does not include standby or
+        standby-replay.
+        """
+        status = self.get_mds_map()
+        result = []
+        for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
+            if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
+                result.append(mds_status['name'])
+
+        return result
+
+    def wait_for_daemons(self, timeout=None):
+        """
+        Wait until all daemons are healthy
+        :return:
+        """
+
+        if timeout is None:
+            timeout = DAEMON_WAIT_TIMEOUT
+
+        elapsed = 0
+        while True:
+            if self.are_daemons_healthy():
+                return
+            else:
+                time.sleep(1)
+                elapsed += 1
+
+            if elapsed > timeout:
+                raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
+
+    def get_lone_mds_id(self):
+        """
+        Get a single MDS ID: the only one if there is only one
+        configured, else the only one currently holding a rank,
+        else raise an error.
+        """
+        if len(self.mds_ids) != 1:
+            alive = self.get_rank_names()
+            if len(alive) == 1:
+                return alive[0]
+            else:
+                raise ValueError("Explicit MDS argument required when multiple MDSs in use")
+        else:
+            return self.mds_ids[0]
+
+    def recreate(self):
+        log.info("Creating new filesystem")
+        self.delete_all_filesystems()
+        self.id = None
+        self.create()
+
+    def put_metadata_object_raw(self, object_id, infile):
+        """
+        Save an object to the metadata pool
+        """
+        temp_bin_path = infile
+        self.client_remote.run(args=[
+            'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
+        ])
+
+    def get_metadata_object_raw(self, object_id):
+        """
+        Retrieve an object from the metadata pool and store it in a file.
+        """
+        temp_bin_path = '/tmp/' + object_id + '.bin'
+
+        self.client_remote.run(args=[
+            'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
+        ])
+
+        return temp_bin_path
+
+    def get_metadata_object(self, object_type, object_id):
+        """
+        Retrieve an object from the metadata pool, pass it through
+        ceph-dencoder to dump it to JSON, and return the decoded object.
+        """
+        temp_bin_path = '/tmp/out.bin'
+
+        self.client_remote.run(args=[
+            'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
+        ])
+
+        stdout = StringIO()
+        self.client_remote.run(args=[
+            'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
+        ], stdout=stdout)
+        dump_json = stdout.getvalue().strip()
+        try:
+            dump = json.loads(dump_json)
+        except (TypeError, ValueError):
+            log.error("Failed to decode JSON: '{0}'".format(dump_json))
+            raise
+
+        return dump
+
+    def get_journal_version(self):
+        """
+        Read the JournalPointer and Journal::Header objects to learn the version of
+        encoding in use.
+        """
+        journal_pointer_object = '400.00000000'
+        journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
+        journal_ino = journal_pointer_dump['journal_pointer']['front']
+
+        journal_header_object = "{0:x}.00000000".format(journal_ino)
+        journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
+
+        version = journal_header_dump['journal_header']['stream_format']
+        log.info("Read journal version {0}".format(version))
+
+        return version
+
+    def mds_asok(self, command, mds_id=None):
+        if mds_id is None:
+            mds_id = self.get_lone_mds_id()
+
+        return self.json_asok(command, 'mds', mds_id)
+
+    def read_cache(self, path, depth=None):
+        cmd = ["dump", "tree", path]
+        if depth is not None:
+            cmd.append(depth.__str__())
+        result = self.mds_asok(cmd)
+        if len(result) == 0:
+            raise RuntimeError("Path not found in cache: {0}".format(path))
+
+        return result
+
+    def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
+        """
+        Block until the MDS reaches a particular state, or a failure condition
+        is met.
+
+        When there are multiple MDSs, succeed when exaclty one MDS is in the
+        goal state, or fail when any MDS is in the reject state.
+
+        :param goal_state: Return once the MDS is in this state
+        :param reject: Fail if the MDS enters this state before the goal state
+        :param timeout: Fail if this many seconds pass before reaching goal
+        :return: number of seconds waited, rounded down to integer
+        """
+
+        started_at = time.time()
+        while True:
+            status = self.status()
+            if rank is not None:
+                mds_info = status.get_rank(self.id, rank)
+                current_state = mds_info['state'] if mds_info else None
+                log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
+            elif mds_id is not None:
+                # mds_info is None if no daemon with this ID exists in the map
+                mds_info = status.get_mds(mds_id)
+                current_state = mds_info['state'] if mds_info else None
+                log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
+            else:
+                # In general, look for a single MDS
+                states = [m['state'] for m in status.get_ranks(self.id)]
+                if [s for s in states if s == goal_state] == [goal_state]:
+                    current_state = goal_state
+                elif reject in states:
+                    current_state = reject
+                else:
+                    current_state = None
+                log.info("mapped states {0} to {1}".format(states, current_state))
+
+            elapsed = time.time() - started_at
+            if current_state == goal_state:
+                log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
+                return elapsed
+            elif reject is not None and current_state == reject:
+                raise RuntimeError("MDS in reject state {0}".format(current_state))
+            elif timeout is not None and elapsed > timeout:
+                log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
+                raise RuntimeError(
+                    "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
+                        elapsed, goal_state, current_state
+                    ))
+            else:
+                time.sleep(1)
+
+    def _read_data_xattr(self, ino_no, xattr_name, type, pool):
+        mds_id = self.mds_ids[0]
+        remote = self.mds_daemons[mds_id].remote
+        if pool is None:
+            pool = self.get_data_pool_name()
+
+        obj_name = "{0:x}.00000000".format(ino_no)
+
+        args = [
+            os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
+        ]
+        try:
+            proc = remote.run(
+                args=args,
+                stdout=StringIO())
+        except CommandFailedError as e:
+            log.error(e.__str__())
+            raise ObjectNotFound(obj_name)
+
+        data = proc.stdout.getvalue()
+
+        p = remote.run(
+            args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
+            stdout=StringIO(),
+            stdin=data
+        )
+
+        return json.loads(p.stdout.getvalue().strip())
+
+    def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
+        """
+        Write to an xattr of the 0th data object of an inode.  Will
+        succeed whether the object and/or xattr already exist or not.
+
+        :param ino_no: integer inode number
+        :param xattr_name: string name of the xattr
+        :param data: byte array data to write to the xattr
+        :param pool: name of data pool or None to use primary data pool
+        :return: None
+        """
+        remote = self.mds_daemons[self.mds_ids[0]].remote
+        if pool is None:
+            pool = self.get_data_pool_name()
+
+        obj_name = "{0:x}.00000000".format(ino_no)
+        args = [
+            os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
+            obj_name, xattr_name, data
+        ]
+        remote.run(
+            args=args,
+            stdout=StringIO())
+
+    def read_backtrace(self, ino_no, pool=None):
+        """
+        Read the backtrace from the data pool, return a dict in the format
+        given by inode_backtrace_t::dump, which is something like:
+
+        ::
+
+            rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
+            ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
+
+            { "ino": 1099511627778,
+              "ancestors": [
+                    { "dirino": 1,
+                      "dname": "blah",
+                      "version": 11}],
+              "pool": 1,
+              "old_pools": []}
+
+        :param pool: name of pool to read backtrace from.  If omitted, FS must have only
+                     one data pool and that will be used.
+        """
+        return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
+
+    def read_layout(self, ino_no, pool=None):
+        """
+        Read 'layout' xattr of an inode and parse the result, returning a dict like:
+        ::
+            {
+                "stripe_unit": 4194304,
+                "stripe_count": 1,
+                "object_size": 4194304,
+                "pool_id": 1,
+                "pool_ns": "",
+            }
+
+        :param pool: name of pool to read backtrace from.  If omitted, FS must have only
+                     one data pool and that will be used.
+        """
+        return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
+
+    def _enumerate_data_objects(self, ino, size):
+        """
+        Get the list of expected data objects for a range, and the list of objects
+        that really exist.
+
+        :return a tuple of two lists of strings (expected, actual)
+        """
+        stripe_size = 1024 * 1024 * 4
+
+        size = max(stripe_size, size)
+
+        want_objects = [
+            "{0:x}.{1:08x}".format(ino, n)
+            for n in range(0, ((size - 1) / stripe_size) + 1)
+        ]
+
+        exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
+
+        return want_objects, exist_objects
+
+    def data_objects_present(self, ino, size):
+        """
+        Check that *all* the expected data objects for an inode are present in the data pool
+        """
+
+        want_objects, exist_objects = self._enumerate_data_objects(ino, size)
+        missing = set(want_objects) - set(exist_objects)
+
+        if missing:
+            log.info("Objects missing (ino {0}, size {1}): {2}".format(
+                ino, size, missing
+            ))
+            return False
+        else:
+            log.info("All objects for ino {0} size {1} found".format(ino, size))
+            return True
+
+    def data_objects_absent(self, ino, size):
+        want_objects, exist_objects = self._enumerate_data_objects(ino, size)
+        present = set(want_objects) & set(exist_objects)
+
+        if present:
+            log.info("Objects not absent (ino {0}, size {1}): {2}".format(
+                ino, size, present
+            ))
+            return False
+        else:
+            log.info("All objects for ino {0} size {1} are absent".format(ino, size))
+            return True
+
+    def dirfrag_exists(self, ino, frag):
+        try:
+            self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
+        except CommandFailedError as e:
+            return False
+        else:
+            return True
+
+    def rados(self, args, pool=None, namespace=None, stdin_data=None):
+        """
+        Call into the `rados` CLI from an MDS
+        """
+
+        if pool is None:
+            pool = self.get_metadata_pool_name()
+
+        # Doesn't matter which MDS we use to run rados commands, they all
+        # have access to the pools
+        mds_id = self.mds_ids[0]
+        remote = self.mds_daemons[mds_id].remote
+
+        # NB we could alternatively use librados pybindings for this, but it's a one-liner
+        # using the `rados` CLI
+        args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
+                (["--namespace", namespace] if namespace else []) +
+                args)
+        p = remote.run(
+            args=args,
+            stdin=stdin_data,
+            stdout=StringIO())
+        return p.stdout.getvalue().strip()
+
+    def list_dirfrag(self, dir_ino):
+        """
+        Read the named object and return the list of omap keys
+
+        :return a list of 0 or more strings
+        """
+
+        dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
+
+        try:
+            key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
+        except CommandFailedError as e:
+            log.error(e.__str__())
+            raise ObjectNotFound(dirfrag_obj_name)
+
+        return key_list_str.split("\n") if key_list_str else []
+
+    def erase_metadata_objects(self, prefix):
+        """
+        For all objects in the metadata pool matching the prefix,
+        erase them.
+
+        This O(N) with the number of objects in the pool, so only suitable
+        for use on toy test filesystems.
+        """
+        all_objects = self.rados(["ls"]).split("\n")
+        matching_objects = [o for o in all_objects if o.startswith(prefix)]
+        for o in matching_objects:
+            self.rados(["rm", o])
+
+    def erase_mds_objects(self, rank):
+        """
+        Erase all the per-MDS objects for a particular rank.  This includes
+        inotable, sessiontable, journal
+        """
+
+        def obj_prefix(multiplier):
+            """
+            MDS object naming conventions like rank 1's
+            journal is at 201.***
+            """
+            return "%x." % (multiplier * 0x100 + rank)
+
+        # MDS_INO_LOG_OFFSET
+        self.erase_metadata_objects(obj_prefix(2))
+        # MDS_INO_LOG_BACKUP_OFFSET
+        self.erase_metadata_objects(obj_prefix(3))
+        # MDS_INO_LOG_POINTER_OFFSET
+        self.erase_metadata_objects(obj_prefix(4))
+        # MDSTables & SessionMap
+        self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
+
+    @property
+    def _prefix(self):
+        """
+        Override this to set a different
+        """
+        return ""
+
+    def _run_tool(self, tool, args, rank=None, quiet=False):
+        # Tests frequently have [client] configuration that jacks up
+        # the objecter log level (unlikely to be interesting here)
+        # and does not set the mds log level (very interesting here)
+        if quiet:
+            base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
+        else:
+            base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
+
+        if rank is not None:
+            base_args.extend(["--rank", "%d" % rank])
+
+        t1 = datetime.datetime.now()
+        r = self.tool_remote.run(
+            args=base_args + args,
+            stdout=StringIO()).stdout.getvalue().strip()
+        duration = datetime.datetime.now() - t1
+        log.info("Ran {0} in time {1}, result:\n{2}".format(
+            base_args + args, duration, r
+        ))
+        return r
+
+    @property
+    def tool_remote(self):
+        """
+        An arbitrary remote to use when invoking recovery tools.  Use an MDS host because
+        it'll definitely have keys with perms to access cephfs metadata pool.  This is public
+        so that tests can use this remote to go get locally written output files from the tools.
+        """
+        mds_id = self.mds_ids[0]
+        return self.mds_daemons[mds_id].remote
+
+    def journal_tool(self, args, rank=None, quiet=False):
+        """
+        Invoke cephfs-journal-tool with the passed arguments, and return its stdout
+        """
+        return self._run_tool("cephfs-journal-tool", args, rank, quiet)
+
+    def table_tool(self, args, quiet=False):
+        """
+        Invoke cephfs-table-tool with the passed arguments, and return its stdout
+        """
+        return self._run_tool("cephfs-table-tool", args, None, quiet)
+
+    def data_scan(self, args, quiet=False, worker_count=1):
+        """
+        Invoke cephfs-data-scan with the passed arguments, and return its stdout
+
+        :param worker_count: if greater than 1, multiple workers will be run
+                             in parallel and the return value will be None
+        """
+
+        workers = []
+
+        for n in range(0, worker_count):
+            if worker_count > 1:
+                # data-scan args first token is a command, followed by args to it.
+                # insert worker arguments after the command.
+                cmd = args[0]
+                worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
+            else:
+                worker_args = args
+
+            workers.append(Greenlet.spawn(lambda wargs=worker_args:
+                                          self._run_tool("cephfs-data-scan", wargs, None, quiet)))
+
+        for w in workers:
+            w.get()
+
+        if worker_count == 1:
+            return workers[0].value
+        else:
+            return None