initial code repo
[stor4nfv.git] / src / ceph / qa / tasks / ceph.py
diff --git a/src/ceph/qa/tasks/ceph.py b/src/ceph/qa/tasks/ceph.py
new file mode 100644 (file)
index 0000000..72f2653
--- /dev/null
@@ -0,0 +1,1688 @@
+"""
+Ceph cluster task.
+
+Handle the setup, starting, and clean-up of a Ceph cluster.
+"""
+from cStringIO import StringIO
+
+import argparse
+import contextlib
+import errno
+import logging
+import os
+import json
+import time
+import gevent
+import socket
+
+from paramiko import SSHException
+from ceph_manager import CephManager, write_conf
+from tasks.cephfs.filesystem import Filesystem
+from teuthology import misc as teuthology
+from teuthology import contextutil
+from teuthology import exceptions
+from teuthology.orchestra import run
+import ceph_client as cclient
+from teuthology.orchestra.daemon import DaemonGroup
+
+CEPH_ROLE_TYPES = ['mon', 'mgr', 'osd', 'mds', 'rgw']
+
+log = logging.getLogger(__name__)
+
+
+def generate_caps(type_):
+    """
+    Each call will return the next capability for each system type
+    (essentially a subset of possible role values).  Valid types are osd,
+    mds and client.
+    """
+    defaults = dict(
+        osd=dict(
+            mon='allow *',
+            mgr='allow *',
+            osd='allow *',
+        ),
+        mgr=dict(
+            mon='allow profile mgr',
+            osd='allow *',
+            mds='allow *',
+        ),
+        mds=dict(
+            mon='allow *',
+            mgr='allow *',
+            osd='allow *',
+            mds='allow',
+        ),
+        client=dict(
+            mon='allow rw',
+            mgr='allow r',
+            osd='allow rwx',
+            mds='allow',
+        ),
+    )
+    for subsystem, capability in defaults[type_].items():
+        yield '--cap'
+        yield subsystem
+        yield capability
+
+
+@contextlib.contextmanager
+def ceph_log(ctx, config):
+    """
+    Create /var/log/ceph log directory that is open to everyone.
+    Add valgrind and profiling-logger directories.
+
+    :param ctx: Context
+    :param config: Configuration
+    """
+    log.info('Making ceph log dir writeable by non-root...')
+    run.wait(
+        ctx.cluster.run(
+            args=[
+                'sudo',
+                'chmod',
+                '777',
+                '/var/log/ceph',
+            ],
+            wait=False,
+        )
+    )
+    log.info('Disabling ceph logrotate...')
+    run.wait(
+        ctx.cluster.run(
+            args=[
+                'sudo',
+                'rm', '-f', '--',
+                '/etc/logrotate.d/ceph',
+            ],
+            wait=False,
+        )
+    )
+    log.info('Creating extra log directories...')
+    run.wait(
+        ctx.cluster.run(
+            args=[
+                'sudo',
+                'install', '-d', '-m0777', '--',
+                '/var/log/ceph/valgrind',
+                '/var/log/ceph/profiling-logger',
+            ],
+            wait=False,
+        )
+    )
+
+    class Rotater(object):
+        stop_event = gevent.event.Event()
+
+        def invoke_logrotate(self):
+            # 1) install ceph-test.conf in /etc/logrotate.d
+            # 2) continuously loop over logrotate invocation with ceph-test.conf
+            while not self.stop_event.is_set():
+                self.stop_event.wait(timeout=30)
+                try:
+                    run.wait(
+                        ctx.cluster.run(
+                            args=['sudo', 'logrotate', '/etc/logrotate.d/ceph-test.conf'
+                                  ],
+                            wait=False,
+                        )
+                    )
+                except exceptions.ConnectionLostError as e:
+                    # Some tests may power off nodes during test, in which
+                    # case we will see connection errors that we should ignore.
+                    log.debug("Missed logrotate, node '{0}' is offline".format(
+                        e.node))
+                except EOFError as e:
+                    # Paramiko sometimes raises this when it fails to
+                    # connect to a node during open_session.  As with
+                    # ConnectionLostError, we ignore this because nodes
+                    # are allowed to get power cycled during tests.
+                    log.debug("Missed logrotate, EOFError")
+                except SSHException as e:
+                    log.debug("Missed logrotate, SSHException")
+                except socket.error as e:
+                    if e.errno == errno.EHOSTUNREACH:
+                        log.debug("Missed logrotate, host unreachable")
+                    else:
+                        raise
+
+        def begin(self):
+            self.thread = gevent.spawn(self.invoke_logrotate)
+
+        def end(self):
+            self.stop_event.set()
+            self.thread.get()
+
+    def write_rotate_conf(ctx, daemons):
+        testdir = teuthology.get_testdir(ctx)
+        rotate_conf_path = os.path.join(os.path.dirname(__file__), 'logrotate.conf')
+        with file(rotate_conf_path, 'rb') as f:
+            conf = ""
+            for daemon, size in daemons.iteritems():
+                log.info('writing logrotate stanza for {daemon}'.format(daemon=daemon))
+                conf += f.read().format(daemon_type=daemon, max_size=size)
+                f.seek(0, 0)
+
+            for remote in ctx.cluster.remotes.iterkeys():
+                teuthology.write_file(remote=remote,
+                                      path='{tdir}/logrotate.ceph-test.conf'.format(tdir=testdir),
+                                      data=StringIO(conf)
+                                      )
+                remote.run(
+                    args=[
+                        'sudo',
+                        'mv',
+                        '{tdir}/logrotate.ceph-test.conf'.format(tdir=testdir),
+                        '/etc/logrotate.d/ceph-test.conf',
+                        run.Raw('&&'),
+                        'sudo',
+                        'chmod',
+                        '0644',
+                        '/etc/logrotate.d/ceph-test.conf',
+                        run.Raw('&&'),
+                        'sudo',
+                        'chown',
+                        'root.root',
+                        '/etc/logrotate.d/ceph-test.conf'
+                    ]
+                )
+                remote.chcon('/etc/logrotate.d/ceph-test.conf',
+                             'system_u:object_r:etc_t:s0')
+
+    if ctx.config.get('log-rotate'):
+        daemons = ctx.config.get('log-rotate')
+        log.info('Setting up log rotation with ' + str(daemons))
+        write_rotate_conf(ctx, daemons)
+        logrotater = Rotater()
+        logrotater.begin()
+    try:
+        yield
+
+    finally:
+        if ctx.config.get('log-rotate'):
+            log.info('Shutting down logrotate')
+            logrotater.end()
+            ctx.cluster.run(
+                args=['sudo', 'rm', '/etc/logrotate.d/ceph-test.conf'
+                      ]
+            )
+        if ctx.archive is not None and \
+                not (ctx.config.get('archive-on-error') and ctx.summary['success']):
+            # and logs
+            log.info('Compressing logs...')
+            run.wait(
+                ctx.cluster.run(
+                    args=[
+                        'sudo',
+                        'find',
+                        '/var/log/ceph',
+                        '-name',
+                        '*.log',
+                        '-print0',
+                        run.Raw('|'),
+                        'sudo',
+                        'xargs',
+                        '-0',
+                        '--no-run-if-empty',
+                        '--',
+                        'gzip',
+                        '--',
+                    ],
+                    wait=False,
+                ),
+            )
+
+            log.info('Archiving logs...')
+            path = os.path.join(ctx.archive, 'remote')
+            os.makedirs(path)
+            for remote in ctx.cluster.remotes.iterkeys():
+                sub = os.path.join(path, remote.shortname)
+                os.makedirs(sub)
+                teuthology.pull_directory(remote, '/var/log/ceph',
+                                          os.path.join(sub, 'log'))
+
+
+def assign_devs(roles, devs):
+    """
+    Create a dictionary of devs indexed by roles
+
+    :param roles: List of roles
+    :param devs: Corresponding list of devices.
+    :returns: Dictionary of devs indexed by roles.
+    """
+    return dict(zip(roles, devs))
+
+
+@contextlib.contextmanager
+def valgrind_post(ctx, config):
+    """
+    After the tests run, look throught all the valgrind logs.  Exceptions are raised
+    if textual errors occured in the logs, or if valgrind exceptions were detected in
+    the logs.
+
+    :param ctx: Context
+    :param config: Configuration
+    """
+    try:
+        yield
+    finally:
+        lookup_procs = list()
+        log.info('Checking for errors in any valgrind logs...')
+        for remote in ctx.cluster.remotes.iterkeys():
+            # look at valgrind logs for each node
+            proc = remote.run(
+                args=[
+                    'sudo',
+                    'zgrep',
+                    '<kind>',
+                    run.Raw('/var/log/ceph/valgrind/*'),
+                    '/dev/null',  # include a second file so that we always get a filename prefix on the output
+                    run.Raw('|'),
+                    'sort',
+                    run.Raw('|'),
+                    'uniq',
+                ],
+                wait=False,
+                check_status=False,
+                stdout=StringIO(),
+            )
+            lookup_procs.append((proc, remote))
+
+        valgrind_exception = None
+        for (proc, remote) in lookup_procs:
+            proc.wait()
+            out = proc.stdout.getvalue()
+            for line in out.split('\n'):
+                if line == '':
+                    continue
+                try:
+                    (file, kind) = line.split(':')
+                except Exception:
+                    log.error('failed to split line %s', line)
+                    raise
+                log.debug('file %s kind %s', file, kind)
+                if (file.find('mds') >= 0) and kind.find('Lost') > 0:
+                    continue
+                log.error('saw valgrind issue %s in %s', kind, file)
+                valgrind_exception = Exception('saw valgrind issues')
+
+        if config.get('expect_valgrind_errors'):
+            if not valgrind_exception:
+                raise Exception('expected valgrind issues and found none')
+        else:
+            if valgrind_exception:
+                raise valgrind_exception
+
+
+@contextlib.contextmanager
+def crush_setup(ctx, config):
+    cluster_name = config['cluster']
+    first_mon = teuthology.get_first_mon(ctx, config, cluster_name)
+    (mon_remote,) = ctx.cluster.only(first_mon).remotes.iterkeys()
+
+    profile = config.get('crush_tunables', 'default')
+    log.info('Setting crush tunables to %s', profile)
+    mon_remote.run(
+        args=['sudo', 'ceph', '--cluster', cluster_name,
+              'osd', 'crush', 'tunables', profile])
+    yield
+
+
+@contextlib.contextmanager
+def create_rbd_pool(ctx, config):
+    cluster_name = config['cluster']
+    first_mon = teuthology.get_first_mon(ctx, config, cluster_name)
+    (mon_remote,) = ctx.cluster.only(first_mon).remotes.iterkeys()
+    log.info('Waiting for OSDs to come up')
+    teuthology.wait_until_osds_up(
+        ctx,
+        cluster=ctx.cluster,
+        remote=mon_remote,
+        ceph_cluster=cluster_name,
+    )
+    if config.get('create_rbd_pool', True):
+        log.info('Creating RBD pool')
+        mon_remote.run(
+            args=['sudo', 'ceph', '--cluster', cluster_name,
+                  'osd', 'pool', 'create', 'rbd', '8'])
+        mon_remote.run(
+            args=[
+                'sudo', 'ceph', '--cluster', cluster_name,
+                'osd', 'pool', 'application', 'enable',
+                'rbd', 'rbd', '--yes-i-really-mean-it'
+            ],
+            check_status=False)
+    yield
+
+@contextlib.contextmanager
+def cephfs_setup(ctx, config):
+    cluster_name = config['cluster']
+    testdir = teuthology.get_testdir(ctx)
+    coverage_dir = '{tdir}/archive/coverage'.format(tdir=testdir)
+
+    first_mon = teuthology.get_first_mon(ctx, config, cluster_name)
+    (mon_remote,) = ctx.cluster.only(first_mon).remotes.iterkeys()
+    mdss = ctx.cluster.only(teuthology.is_type('mds', cluster_name))
+    # If there are any MDSs, then create a filesystem for them to use
+    # Do this last because requires mon cluster to be up and running
+    if mdss.remotes:
+        log.info('Setting up CephFS filesystem...')
+
+        fs = Filesystem(ctx, name='cephfs', create=True,
+                        ec_profile=config.get('cephfs_ec_profile', None))
+
+        is_active_mds = lambda role: 'mds.' in role and not role.endswith('-s') and '-s-' not in role
+        all_roles = [item for remote_roles in mdss.remotes.values() for item in remote_roles]
+        num_active = len([r for r in all_roles if is_active_mds(r)])
+
+        fs.set_max_mds(num_active)
+        fs.set_allow_dirfrags(True)
+
+    yield
+
+
+@contextlib.contextmanager
+def cluster(ctx, config):
+    """
+    Handle the creation and removal of a ceph cluster.
+
+    On startup:
+        Create directories needed for the cluster.
+        Create remote journals for all osds.
+        Create and set keyring.
+        Copy the monmap to tht test systems.
+        Setup mon nodes.
+        Setup mds nodes.
+        Mkfs osd nodes.
+        Add keyring information to monmaps
+        Mkfs mon nodes.
+
+    On exit:
+        If errors occured, extract a failure message and store in ctx.summary.
+        Unmount all test files and temporary journaling files.
+        Save the monitor information and archive all ceph logs.
+        Cleanup the keyring setup, and remove all monitor map and data files left over.
+
+    :param ctx: Context
+    :param config: Configuration
+    """
+    if ctx.config.get('use_existing_cluster', False) is True:
+        log.info("'use_existing_cluster' is true; skipping cluster creation")
+        yield
+
+    testdir = teuthology.get_testdir(ctx)
+    cluster_name = config['cluster']
+    data_dir = '{tdir}/{cluster}.data'.format(tdir=testdir, cluster=cluster_name)
+    log.info('Creating ceph cluster %s...', cluster_name)
+    run.wait(
+        ctx.cluster.run(
+            args=[
+                'install', '-d', '-m0755', '--',
+                data_dir,
+            ],
+            wait=False,
+        )
+    )
+
+    run.wait(
+        ctx.cluster.run(
+            args=[
+                'sudo',
+                'install', '-d', '-m0777', '--', '/var/run/ceph',
+            ],
+            wait=False,
+        )
+    )
+
+    devs_to_clean = {}
+    remote_to_roles_to_devs = {}
+    remote_to_roles_to_journals = {}
+    osds = ctx.cluster.only(teuthology.is_type('osd', cluster_name))
+    for remote, roles_for_host in osds.remotes.iteritems():
+        devs = teuthology.get_scratch_devices(remote)
+        roles_to_devs = {}
+        roles_to_journals = {}
+        if config.get('fs'):
+            log.info('fs option selected, checking for scratch devs')
+            log.info('found devs: %s' % (str(devs),))
+            devs_id_map = teuthology.get_wwn_id_map(remote, devs)
+            iddevs = devs_id_map.values()
+            roles_to_devs = assign_devs(
+                teuthology.cluster_roles_of_type(roles_for_host, 'osd', cluster_name), iddevs
+            )
+            if len(roles_to_devs) < len(iddevs):
+                iddevs = iddevs[len(roles_to_devs):]
+            devs_to_clean[remote] = []
+
+        if config.get('block_journal'):
+            log.info('block journal enabled')
+            roles_to_journals = assign_devs(
+                teuthology.cluster_roles_of_type(roles_for_host, 'osd', cluster_name), iddevs
+            )
+            log.info('journal map: %s', roles_to_journals)
+
+        if config.get('tmpfs_journal'):
+            log.info('tmpfs journal enabled')
+            roles_to_journals = {}
+            remote.run(args=['sudo', 'mount', '-t', 'tmpfs', 'tmpfs', '/mnt'])
+            for role in teuthology.cluster_roles_of_type(roles_for_host, 'osd', cluster_name):
+                tmpfs = '/mnt/' + role
+                roles_to_journals[role] = tmpfs
+                remote.run(args=['truncate', '-s', '1500M', tmpfs])
+            log.info('journal map: %s', roles_to_journals)
+
+        log.info('dev map: %s' % (str(roles_to_devs),))
+        remote_to_roles_to_devs[remote] = roles_to_devs
+        remote_to_roles_to_journals[remote] = roles_to_journals
+
+    log.info('Generating config...')
+    remotes_and_roles = ctx.cluster.remotes.items()
+    roles = [role_list for (remote, role_list) in remotes_and_roles]
+    ips = [host for (host, port) in
+           (remote.ssh.get_transport().getpeername() for (remote, role_list) in remotes_and_roles)]
+    conf = teuthology.skeleton_config(ctx, roles=roles, ips=ips, cluster=cluster_name)
+    for remote, roles_to_journals in remote_to_roles_to_journals.iteritems():
+        for role, journal in roles_to_journals.iteritems():
+            name = teuthology.ceph_role(role)
+            if name not in conf:
+                conf[name] = {}
+            conf[name]['osd journal'] = journal
+    for section, keys in config['conf'].iteritems():
+        for key, value in keys.iteritems():
+            log.info("[%s] %s = %s" % (section, key, value))
+            if section not in conf:
+                conf[section] = {}
+            conf[section][key] = value
+
+    if config.get('tmpfs_journal'):
+        conf['journal dio'] = False
+
+    if not hasattr(ctx, 'ceph'):
+        ctx.ceph = {}
+    ctx.ceph[cluster_name] = argparse.Namespace()
+    ctx.ceph[cluster_name].conf = conf
+
+    default_keyring = '/etc/ceph/{cluster}.keyring'.format(cluster=cluster_name)
+    keyring_path = config.get('keyring_path', default_keyring)
+
+    coverage_dir = '{tdir}/archive/coverage'.format(tdir=testdir)
+
+    firstmon = teuthology.get_first_mon(ctx, config, cluster_name)
+
+    log.info('Setting up %s...' % firstmon)
+    ctx.cluster.only(firstmon).run(
+        args=[
+            'sudo',
+            'adjust-ulimits',
+            'ceph-coverage',
+            coverage_dir,
+            'ceph-authtool',
+            '--create-keyring',
+            keyring_path,
+        ],
+    )
+    ctx.cluster.only(firstmon).run(
+        args=[
+            'sudo',
+            'adjust-ulimits',
+            'ceph-coverage',
+            coverage_dir,
+            'ceph-authtool',
+            '--gen-key',
+            '--name=mon.',
+            keyring_path,
+        ],
+    )
+    ctx.cluster.only(firstmon).run(
+        args=[
+            'sudo',
+            'chmod',
+            '0644',
+            keyring_path,
+        ],
+    )
+    (mon0_remote,) = ctx.cluster.only(firstmon).remotes.keys()
+    monmap_path = '{tdir}/{cluster}.monmap'.format(tdir=testdir,
+                                                   cluster=cluster_name)
+    fsid = teuthology.create_simple_monmap(
+        ctx,
+        remote=mon0_remote,
+        conf=conf,
+        path=monmap_path,
+    )
+    if not 'global' in conf:
+        conf['global'] = {}
+    conf['global']['fsid'] = fsid
+
+    default_conf_path = '/etc/ceph/{cluster}.conf'.format(cluster=cluster_name)
+    conf_path = config.get('conf_path', default_conf_path)
+    log.info('Writing %s for FSID %s...' % (conf_path, fsid))
+    write_conf(ctx, conf_path, cluster_name)
+
+    log.info('Creating admin key on %s...' % firstmon)
+    ctx.cluster.only(firstmon).run(
+        args=[
+            'sudo',
+            'adjust-ulimits',
+            'ceph-coverage',
+            coverage_dir,
+            'ceph-authtool',
+            '--gen-key',
+            '--name=client.admin',
+            '--set-uid=0',
+            '--cap', 'mon', 'allow *',
+            '--cap', 'osd', 'allow *',
+            '--cap', 'mds', 'allow *',
+            '--cap', 'mgr', 'allow *',
+            keyring_path,
+        ],
+    )
+
+    log.info('Copying monmap to all nodes...')
+    keyring = teuthology.get_file(
+        remote=mon0_remote,
+        path=keyring_path,
+    )
+    monmap = teuthology.get_file(
+        remote=mon0_remote,
+        path=monmap_path,
+    )
+
+    for rem in ctx.cluster.remotes.iterkeys():
+        # copy mon key and initial monmap
+        log.info('Sending monmap to node {remote}'.format(remote=rem))
+        teuthology.sudo_write_file(
+            remote=rem,
+            path=keyring_path,
+            data=keyring,
+            perms='0644'
+        )
+        teuthology.write_file(
+            remote=rem,
+            path=monmap_path,
+            data=monmap,
+        )
+
+    log.info('Setting up mon nodes...')
+    mons = ctx.cluster.only(teuthology.is_type('mon', cluster_name))
+
+    if not config.get('skip_mgr_daemons', False):
+        log.info('Setting up mgr nodes...')
+        mgrs = ctx.cluster.only(teuthology.is_type('mgr', cluster_name))
+        for remote, roles_for_host in mgrs.remotes.iteritems():
+            for role in teuthology.cluster_roles_of_type(roles_for_host, 'mgr',
+                                                         cluster_name):
+                _, _, id_ = teuthology.split_role(role)
+                mgr_dir = '/var/lib/ceph/mgr/{cluster}-{id}'.format(
+                    cluster=cluster_name,
+                    id=id_,
+                )
+                remote.run(
+                    args=[
+                        'sudo',
+                        'mkdir',
+                        '-p',
+                        mgr_dir,
+                        run.Raw('&&'),
+                        'sudo',
+                        'adjust-ulimits',
+                        'ceph-coverage',
+                        coverage_dir,
+                        'ceph-authtool',
+                        '--create-keyring',
+                        '--gen-key',
+                        '--name=mgr.{id}'.format(id=id_),
+                        mgr_dir + '/keyring',
+                    ],
+                )
+
+    log.info('Setting up mds nodes...')
+    mdss = ctx.cluster.only(teuthology.is_type('mds', cluster_name))
+    for remote, roles_for_host in mdss.remotes.iteritems():
+        for role in teuthology.cluster_roles_of_type(roles_for_host, 'mds',
+                                                     cluster_name):
+            _, _, id_ = teuthology.split_role(role)
+            mds_dir = '/var/lib/ceph/mds/{cluster}-{id}'.format(
+                cluster=cluster_name,
+                id=id_,
+            )
+            remote.run(
+                args=[
+                    'sudo',
+                    'mkdir',
+                    '-p',
+                    mds_dir,
+                    run.Raw('&&'),
+                    'sudo',
+                    'adjust-ulimits',
+                    'ceph-coverage',
+                    coverage_dir,
+                    'ceph-authtool',
+                    '--create-keyring',
+                    '--gen-key',
+                    '--name=mds.{id}'.format(id=id_),
+                    mds_dir + '/keyring',
+                ],
+            )
+
+    cclient.create_keyring(ctx, cluster_name)
+    log.info('Running mkfs on osd nodes...')
+
+    if not hasattr(ctx, 'disk_config'):
+        ctx.disk_config = argparse.Namespace()
+    if not hasattr(ctx.disk_config, 'remote_to_roles_to_dev'):
+        ctx.disk_config.remote_to_roles_to_dev = {}
+    if not hasattr(ctx.disk_config, 'remote_to_roles_to_journals'):
+        ctx.disk_config.remote_to_roles_to_journals = {}
+    if not hasattr(ctx.disk_config, 'remote_to_roles_to_dev_mount_options'):
+        ctx.disk_config.remote_to_roles_to_dev_mount_options = {}
+    if not hasattr(ctx.disk_config, 'remote_to_roles_to_dev_fstype'):
+        ctx.disk_config.remote_to_roles_to_dev_fstype = {}
+
+    teuthology.deep_merge(ctx.disk_config.remote_to_roles_to_dev, remote_to_roles_to_devs)
+    teuthology.deep_merge(ctx.disk_config.remote_to_roles_to_journals, remote_to_roles_to_journals)
+
+    log.info("ctx.disk_config.remote_to_roles_to_dev: {r}".format(r=str(ctx.disk_config.remote_to_roles_to_dev)))
+    for remote, roles_for_host in osds.remotes.iteritems():
+        roles_to_devs = remote_to_roles_to_devs[remote]
+        roles_to_journals = remote_to_roles_to_journals[remote]
+
+        for role in teuthology.cluster_roles_of_type(roles_for_host, 'osd', cluster_name):
+            _, _, id_ = teuthology.split_role(role)
+            mnt_point = '/var/lib/ceph/osd/{cluster}-{id}'.format(cluster=cluster_name, id=id_)
+            remote.run(
+                args=[
+                    'sudo',
+                    'mkdir',
+                    '-p',
+                    mnt_point,
+                ])
+            log.info(str(roles_to_devs))
+            log.info(str(roles_to_journals))
+            log.info(role)
+            if roles_to_devs.get(role):
+                dev = roles_to_devs[role]
+                fs = config.get('fs')
+                package = None
+                mkfs_options = config.get('mkfs_options')
+                mount_options = config.get('mount_options')
+                if fs == 'btrfs':
+                    # package = 'btrfs-tools'
+                    if mount_options is None:
+                        mount_options = ['noatime', 'user_subvol_rm_allowed']
+                    if mkfs_options is None:
+                        mkfs_options = ['-m', 'single',
+                                        '-l', '32768',
+                                        '-n', '32768']
+                if fs == 'xfs':
+                    # package = 'xfsprogs'
+                    if mount_options is None:
+                        mount_options = ['noatime']
+                    if mkfs_options is None:
+                        mkfs_options = ['-f', '-i', 'size=2048']
+                if fs == 'ext4' or fs == 'ext3':
+                    if mount_options is None:
+                        mount_options = ['noatime', 'user_xattr']
+
+                if mount_options is None:
+                    mount_options = []
+                if mkfs_options is None:
+                    mkfs_options = []
+                mkfs = ['mkfs.%s' % fs] + mkfs_options
+                log.info('%s on %s on %s' % (mkfs, dev, remote))
+                if package is not None:
+                    remote.run(
+                        args=[
+                            'sudo',
+                            'apt-get', 'install', '-y', package
+                        ],
+                        stdout=StringIO(),
+                    )
+
+                try:
+                    remote.run(args=['yes', run.Raw('|')] + ['sudo'] + mkfs + [dev])
+                except run.CommandFailedError:
+                    # Newer btfs-tools doesn't prompt for overwrite, use -f
+                    if '-f' not in mount_options:
+                        mkfs_options.append('-f')
+                        mkfs = ['mkfs.%s' % fs] + mkfs_options
+                        log.info('%s on %s on %s' % (mkfs, dev, remote))
+                    remote.run(args=['yes', run.Raw('|')] + ['sudo'] + mkfs + [dev])
+
+                log.info('mount %s on %s -o %s' % (dev, remote,
+                                                   ','.join(mount_options)))
+                remote.run(
+                    args=[
+                        'sudo',
+                        'mount',
+                        '-t', fs,
+                        '-o', ','.join(mount_options),
+                        dev,
+                        mnt_point,
+                    ]
+                )
+                remote.run(
+                    args=[
+                        'sudo', '/sbin/restorecon', mnt_point,
+                    ],
+                    check_status=False,
+                )
+                if not remote in ctx.disk_config.remote_to_roles_to_dev_mount_options:
+                    ctx.disk_config.remote_to_roles_to_dev_mount_options[remote] = {}
+                ctx.disk_config.remote_to_roles_to_dev_mount_options[remote][role] = mount_options
+                if not remote in ctx.disk_config.remote_to_roles_to_dev_fstype:
+                    ctx.disk_config.remote_to_roles_to_dev_fstype[remote] = {}
+                ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role] = fs
+                devs_to_clean[remote].append(mnt_point)
+
+        for role in teuthology.cluster_roles_of_type(roles_for_host, 'osd', cluster_name):
+            _, _, id_ = teuthology.split_role(role)
+            remote.run(
+                args=[
+                    'sudo',
+                    'MALLOC_CHECK_=3',
+                    'adjust-ulimits',
+                    'ceph-coverage',
+                    coverage_dir,
+                    'ceph-osd',
+                    '--cluster',
+                    cluster_name,
+                    '--mkfs',
+                    '--mkkey',
+                    '-i', id_,
+                    '--monmap', monmap_path,
+                ],
+            )
+
+    log.info('Reading keys from all nodes...')
+    keys_fp = StringIO()
+    keys = []
+    for remote, roles_for_host in ctx.cluster.remotes.iteritems():
+        for type_ in ['mgr',  'mds', 'osd']:
+            if type_ == 'mgr' and config.get('skip_mgr_daemons', False):
+                continue
+            for role in teuthology.cluster_roles_of_type(roles_for_host, type_, cluster_name):
+                _, _, id_ = teuthology.split_role(role)
+                data = teuthology.get_file(
+                    remote=remote,
+                    path='/var/lib/ceph/{type}/{cluster}-{id}/keyring'.format(
+                        type=type_,
+                        id=id_,
+                        cluster=cluster_name,
+                    ),
+                    sudo=True,
+                )
+                keys.append((type_, id_, data))
+                keys_fp.write(data)
+    for remote, roles_for_host in ctx.cluster.remotes.iteritems():
+        for role in teuthology.cluster_roles_of_type(roles_for_host, 'client', cluster_name):
+            _, _, id_ = teuthology.split_role(role)
+            data = teuthology.get_file(
+                remote=remote,
+                path='/etc/ceph/{cluster}.client.{id}.keyring'.format(id=id_, cluster=cluster_name)
+            )
+            keys.append(('client', id_, data))
+            keys_fp.write(data)
+
+    log.info('Adding keys to all mons...')
+    writes = mons.run(
+        args=[
+            'sudo', 'tee', '-a',
+            keyring_path,
+        ],
+        stdin=run.PIPE,
+        wait=False,
+        stdout=StringIO(),
+    )
+    keys_fp.seek(0)
+    teuthology.feed_many_stdins_and_close(keys_fp, writes)
+    run.wait(writes)
+    for type_, id_, data in keys:
+        run.wait(
+            mons.run(
+                args=[
+                         'sudo',
+                         'adjust-ulimits',
+                         'ceph-coverage',
+                         coverage_dir,
+                         'ceph-authtool',
+                         keyring_path,
+                         '--name={type}.{id}'.format(
+                             type=type_,
+                             id=id_,
+                         ),
+                     ] + list(generate_caps(type_)),
+                wait=False,
+            ),
+        )
+
+    log.info('Running mkfs on mon nodes...')
+    for remote, roles_for_host in mons.remotes.iteritems():
+        for role in teuthology.cluster_roles_of_type(roles_for_host, 'mon', cluster_name):
+            _, _, id_ = teuthology.split_role(role)
+            remote.run(
+                args=[
+                    'sudo',
+                    'mkdir',
+                    '-p',
+                    '/var/lib/ceph/mon/{cluster}-{id}'.format(id=id_, cluster=cluster_name),
+                ],
+            )
+            remote.run(
+                args=[
+                    'sudo',
+                    'adjust-ulimits',
+                    'ceph-coverage',
+                    coverage_dir,
+                    'ceph-mon',
+                    '--cluster', cluster_name,
+                    '--mkfs',
+                    '-i', id_,
+                    '--monmap', monmap_path,
+                    '--keyring', keyring_path,
+                ],
+            )
+
+    run.wait(
+        mons.run(
+            args=[
+                'rm',
+                '--',
+                monmap_path,
+            ],
+            wait=False,
+        ),
+    )
+
+    try:
+        yield
+    except Exception:
+        # we need to know this below
+        ctx.summary['success'] = False
+        raise
+    finally:
+        (mon0_remote,) = ctx.cluster.only(firstmon).remotes.keys()
+
+        log.info('Checking cluster log for badness...')
+
+        def first_in_ceph_log(pattern, excludes):
+            """
+            Find the first occurence of the pattern specified in the Ceph log,
+            Returns None if none found.
+
+            :param pattern: Pattern scanned for.
+            :param excludes: Patterns to ignore.
+            :return: First line of text (or None if not found)
+            """
+            args = [
+                'sudo',
+                'egrep', pattern,
+                '/var/log/ceph/{cluster}.log'.format(cluster=cluster_name),
+            ]
+            for exclude in excludes:
+                args.extend([run.Raw('|'), 'egrep', '-v', exclude])
+            args.extend([
+                run.Raw('|'), 'head', '-n', '1',
+            ])
+            r = mon0_remote.run(
+                stdout=StringIO(),
+                args=args,
+            )
+            stdout = r.stdout.getvalue()
+            if stdout != '':
+                return stdout
+            return None
+
+        if first_in_ceph_log('\[ERR\]|\[WRN\]|\[SEC\]',
+                             config['log_whitelist']) is not None:
+            log.warning('Found errors (ERR|WRN|SEC) in cluster log')
+            ctx.summary['success'] = False
+            # use the most severe problem as the failure reason
+            if 'failure_reason' not in ctx.summary:
+                for pattern in ['\[SEC\]', '\[ERR\]', '\[WRN\]']:
+                    match = first_in_ceph_log(pattern, config['log_whitelist'])
+                    if match is not None:
+                        ctx.summary['failure_reason'] = \
+                            '"{match}" in cluster log'.format(
+                                match=match.rstrip('\n'),
+                            )
+                        break
+
+        for remote, dirs in devs_to_clean.iteritems():
+            for dir_ in dirs:
+                log.info('Unmounting %s on %s' % (dir_, remote))
+                try:
+                    remote.run(
+                        args=[
+                            'sync',
+                            run.Raw('&&'),
+                            'sudo',
+                            'umount',
+                            '-f',
+                            dir_
+                        ]
+                    )
+                except Exception as e:
+                    remote.run(args=[
+                        'sudo',
+                        run.Raw('PATH=/usr/sbin:$PATH'),
+                        'lsof',
+                        run.Raw(';'),
+                        'ps', 'auxf',
+                    ])
+                    raise e
+
+        if config.get('tmpfs_journal'):
+            log.info('tmpfs journal enabled - unmounting tmpfs at /mnt')
+            for remote, roles_for_host in osds.remotes.iteritems():
+                remote.run(
+                    args=['sudo', 'umount', '-f', '/mnt'],
+                    check_status=False,
+                )
+
+        if ctx.archive is not None and \
+                not (ctx.config.get('archive-on-error') and ctx.summary['success']):
+
+            # archive mon data, too
+            log.info('Archiving mon data...')
+            path = os.path.join(ctx.archive, 'data')
+            try:
+                os.makedirs(path)
+            except OSError as e:
+                if e.errno == errno.EEXIST:
+                    pass
+                else:
+                    raise
+            for remote, roles in mons.remotes.iteritems():
+                for role in roles:
+                    is_mon = teuthology.is_type('mon', cluster_name)
+                    if is_mon(role):
+                        _, _, id_ = teuthology.split_role(role)
+                        mon_dir = '/var/lib/ceph/mon/' + \
+                                  '{0}-{1}'.format(cluster_name, id_)
+                        teuthology.pull_directory_tarball(
+                            remote,
+                            mon_dir,
+                            path + '/' + role + '.tgz')
+
+        log.info('Cleaning ceph cluster...')
+        run.wait(
+            ctx.cluster.run(
+                args=[
+                    'sudo',
+                    'rm',
+                    '-rf',
+                    '--',
+                    conf_path,
+                    keyring_path,
+                    data_dir,
+                    monmap_path,
+                    run.Raw('{tdir}/../*.pid'.format(tdir=testdir)),
+                ],
+                wait=False,
+            ),
+        )
+
+
+def osd_scrub_pgs(ctx, config):
+    """
+    Scrub pgs when we exit.
+
+    First make sure all pgs are active and clean.
+    Next scrub all osds.
+    Then periodically check until all pgs have scrub time stamps that
+    indicate the last scrub completed.  Time out if no progess is made
+    here after two minutes.
+    """
+    retries = 40
+    delays = 20
+    cluster_name = config['cluster']
+    manager = ctx.managers[cluster_name]
+    all_clean = False
+    for _ in range(0, retries):
+        stats = manager.get_pg_stats()
+        bad = [stat['pgid'] for stat in stats if 'active+clean' not in stat['state']]
+        if not bad:
+            all_clean = True
+            break
+        log.info(
+            "Waiting for all PGs to be active and clean, waiting on %s" % bad)
+        time.sleep(delays)
+    if not all_clean:
+        raise RuntimeError("Scrubbing terminated -- not all pgs were active and clean.")
+    check_time_now = time.localtime()
+    time.sleep(1)
+    all_roles = teuthology.all_roles(ctx.cluster)
+    for role in teuthology.cluster_roles_of_type(all_roles, 'osd', cluster_name):
+        log.info("Scrubbing {osd}".format(osd=role))
+        _, _, id_ = teuthology.split_role(role)
+        # allow this to fail; in certain cases the OSD might not be up
+        # at this point.  we will catch all pgs below.
+        try:
+            manager.raw_cluster_cmd('osd', 'deep-scrub', id_)
+        except run.CommandFailedError:
+            pass
+    prev_good = 0
+    gap_cnt = 0
+    loop = True
+    while loop:
+        stats = manager.get_pg_stats()
+        timez = [(stat['pgid'],stat['last_scrub_stamp']) for stat in stats]
+        loop = False
+        thiscnt = 0
+        for (pgid, tmval) in timez:
+            pgtm = time.strptime(tmval[0:tmval.find('.')], '%Y-%m-%d %H:%M:%S')
+            if pgtm > check_time_now:
+                thiscnt += 1
+            else:
+                log.info('pgid %s last_scrub_stamp %s %s <= %s', pgid, tmval, pgtm, check_time_now)
+                loop = True
+        if thiscnt > prev_good:
+            prev_good = thiscnt
+            gap_cnt = 0
+        else:
+            gap_cnt += 1
+            if gap_cnt % 6 == 0:
+                for (pgid, tmval) in timez:
+                    # re-request scrub every so often in case the earlier
+                    # request was missed.  do not do it everytime because
+                    # the scrub may be in progress or not reported yet and
+                    # we will starve progress.
+                    manager.raw_cluster_cmd('pg', 'deep-scrub', pgid)
+            if gap_cnt > retries:
+                raise RuntimeError('Exiting scrub checking -- not all pgs scrubbed.')
+        if loop:
+            log.info('Still waiting for all pgs to be scrubbed.')
+            time.sleep(delays)
+
+
+@contextlib.contextmanager
+def run_daemon(ctx, config, type_):
+    """
+    Run daemons for a role type.  Handle the startup and termination of a a daemon.
+    On startup -- set coverages, cpu_profile, valgrind values for all remotes,
+    and a max_mds value for one mds.
+    On cleanup -- Stop all existing daemons of this type.
+
+    :param ctx: Context
+    :param config: Configuration
+    :paran type_: Role type
+    """
+    cluster_name = config['cluster']
+    log.info('Starting %s daemons in cluster %s...', type_, cluster_name)
+    testdir = teuthology.get_testdir(ctx)
+    daemons = ctx.cluster.only(teuthology.is_type(type_, cluster_name))
+
+    # check whether any daemons if this type are configured
+    if daemons is None:
+        return
+    coverage_dir = '{tdir}/archive/coverage'.format(tdir=testdir)
+
+    daemon_signal = 'kill'
+    if config.get('coverage') or config.get('valgrind') is not None:
+        daemon_signal = 'term'
+
+    # create osds in order.  (this only matters for pre-luminous, which might
+    # be hammer, which doesn't take an id_ argument to legacy 'osd create').
+    osd_uuids  = {}
+    for remote, roles_for_host in daemons.remotes.iteritems():
+        is_type_ = teuthology.is_type(type_, cluster_name)
+        for role in roles_for_host:
+            if not is_type_(role):
+                continue
+            _, _, id_ = teuthology.split_role(role)
+
+
+            if type_ == 'osd':
+                datadir='/var/lib/ceph/osd/{cluster}-{id}'.format(
+                    cluster=cluster_name, id=id_)
+                osd_uuid = teuthology.get_file(
+                    remote=remote,
+                    path=datadir + '/fsid',
+                    sudo=True,
+                ).strip()
+                osd_uuids[id_] = osd_uuid
+    for osd_id in range(len(osd_uuids)):
+        id_ = str(osd_id)
+        osd_uuid = osd_uuids.get(id_)
+        try:
+            remote.run(
+                args=[
+                'sudo', 'ceph', '--cluster', cluster_name,
+                    'osd', 'new', osd_uuid, id_,
+                ]
+            )
+        except:
+            # fallback to pre-luminous (hammer or jewel)
+            remote.run(
+                args=[
+                'sudo', 'ceph', '--cluster', cluster_name,
+                    'osd', 'create', osd_uuid,
+                ]
+            )
+            if config.get('add_osds_to_crush'):
+                remote.run(
+                args=[
+                    'sudo', 'ceph', '--cluster', cluster_name,
+                    'osd', 'crush', 'create-or-move', 'osd.' + id_,
+                    '1.0', 'host=localhost', 'root=default',
+                ]
+            )
+
+    for remote, roles_for_host in daemons.remotes.iteritems():
+        is_type_ = teuthology.is_type(type_, cluster_name)
+        for role in roles_for_host:
+            if not is_type_(role):
+                continue
+            _, _, id_ = teuthology.split_role(role)
+
+            run_cmd = [
+                'sudo',
+                'adjust-ulimits',
+                'ceph-coverage',
+                coverage_dir,
+                'daemon-helper',
+                daemon_signal,
+            ]
+            run_cmd_tail = [
+                'ceph-%s' % (type_),
+                '-f',
+                '--cluster', cluster_name,
+                '-i', id_]
+
+            if type_ in config.get('cpu_profile', []):
+                profile_path = '/var/log/ceph/profiling-logger/%s.prof' % (role)
+                run_cmd.extend(['env', 'CPUPROFILE=%s' % profile_path])
+
+            if config.get('valgrind') is not None:
+                valgrind_args = None
+                if type_ in config['valgrind']:
+                    valgrind_args = config['valgrind'][type_]
+                if role in config['valgrind']:
+                    valgrind_args = config['valgrind'][role]
+                run_cmd = teuthology.get_valgrind_args(testdir, role,
+                                                       run_cmd,
+                                                       valgrind_args)
+
+            run_cmd.extend(run_cmd_tail)
+
+            # always register mgr; don't necessarily start
+            ctx.daemons.register_daemon(
+                remote, type_, id_,
+                cluster=cluster_name,
+                args=run_cmd,
+                logger=log.getChild(role),
+                stdin=run.PIPE,
+                wait=False
+            )
+            if type_ != 'mgr' or not config.get('skip_mgr_daemons', False):
+                role = cluster_name + '.' + type_
+                ctx.daemons.get_daemon(type_, id_, cluster_name).restart()
+
+    try:
+        yield
+    finally:
+        teuthology.stop_daemons_of_type(ctx, type_, cluster_name)
+
+
+def healthy(ctx, config):
+    """
+    Wait for all osd's to be up, and for the ceph health monitor to return HEALTH_OK.
+
+    :param ctx: Context
+    :param config: Configuration
+    """
+    config = config if isinstance(config, dict) else dict()
+    cluster_name = config.get('cluster', 'ceph')
+    log.info('Waiting until %s daemons up and pgs clean...', cluster_name)
+    manager = ctx.managers[cluster_name]
+    try:
+        manager.wait_for_mgr_available(timeout=30)
+    except (run.CommandFailedError, AssertionError) as e:
+        log.info('ignoring mgr wait error, probably testing upgrade: %s', e)
+
+    firstmon = teuthology.get_first_mon(ctx, config, cluster_name)
+    (mon0_remote,) = ctx.cluster.only(firstmon).remotes.keys()
+    teuthology.wait_until_osds_up(
+        ctx,
+        cluster=ctx.cluster,
+        remote=mon0_remote,
+        ceph_cluster=cluster_name,
+    )
+
+    try:
+        manager.flush_all_pg_stats()
+    except (run.CommandFailedError, Exception) as e:
+        log.info('ignoring flush pg stats error, probably testing upgrade: %s', e)
+    manager.wait_for_clean()
+
+    log.info('Waiting until ceph cluster %s is healthy...', cluster_name)
+    teuthology.wait_until_healthy(
+        ctx,
+        remote=mon0_remote,
+        ceph_cluster=cluster_name,
+    )
+
+    if ctx.cluster.only(teuthology.is_type('mds', cluster_name)).remotes:
+        # Some MDSs exist, wait for them to be healthy
+        ceph_fs = Filesystem(ctx) # TODO: make Filesystem cluster-aware
+        ceph_fs.wait_for_daemons(timeout=300)
+
+
+def wait_for_osds_up(ctx, config):
+    """
+    Wait for all osd's to come up.
+
+    :param ctx: Context
+    :param config: Configuration
+    """
+    log.info('Waiting until ceph osds are all up...')
+    cluster_name = config.get('cluster', 'ceph')
+    firstmon = teuthology.get_first_mon(ctx, config, cluster_name)
+    (mon0_remote,) = ctx.cluster.only(firstmon).remotes.keys()
+    teuthology.wait_until_osds_up(
+        ctx,
+        cluster=ctx.cluster,
+        remote=mon0_remote
+    )
+
+
+def wait_for_mon_quorum(ctx, config):
+    """
+    Check renote ceph status until all monitors are up.
+
+    :param ctx: Context
+    :param config: Configuration
+    """
+    if isinstance(config, dict):
+        mons = config['daemons']
+        cluster_name = config.get('cluster', 'ceph')
+    else:
+        assert isinstance(config, list)
+        mons = config
+        cluster_name = 'ceph'
+    firstmon = teuthology.get_first_mon(ctx, config, cluster_name)
+    (remote,) = ctx.cluster.only(firstmon).remotes.keys()
+    with contextutil.safe_while(sleep=10, tries=60,
+                                action='wait for monitor quorum') as proceed:
+        while proceed():
+            r = remote.run(
+                args=[
+                    'sudo',
+                    'ceph',
+                    'quorum_status',
+                ],
+                stdout=StringIO(),
+                logger=log.getChild('quorum_status'),
+            )
+            j = json.loads(r.stdout.getvalue())
+            q = j.get('quorum_names', [])
+            log.debug('Quorum: %s', q)
+            if sorted(q) == sorted(mons):
+                break
+
+
+def created_pool(ctx, config):
+    """
+    Add new pools to the dictionary of pools that the ceph-manager
+    knows about.
+    """
+    for new_pool in config:
+        if new_pool not in ctx.managers['ceph'].pools:
+            ctx.managers['ceph'].pools[new_pool] = ctx.managers['ceph'].get_pool_property(
+                new_pool, 'pg_num')
+
+
+@contextlib.contextmanager
+def restart(ctx, config):
+    """
+   restart ceph daemons
+
+   For example::
+      tasks:
+      - ceph.restart: [all]
+
+   For example::
+      tasks:
+      - ceph.restart: [osd.0, mon.1, mds.*]
+
+   or::
+
+      tasks:
+      - ceph.restart:
+          daemons: [osd.0, mon.1]
+          wait-for-healthy: false
+          wait-for-osds-up: true
+
+    :param ctx: Context
+    :param config: Configuration
+    """
+    if config is None:
+        config = {}
+    elif isinstance(config, list):
+        config = {'daemons': config}
+
+    daemons = ctx.daemons.resolve_role_list(config.get('daemons', None), CEPH_ROLE_TYPES, True)
+    clusters = set()
+    for role in daemons:
+        cluster, type_, id_ = teuthology.split_role(role)
+        ctx.daemons.get_daemon(type_, id_, cluster).restart()
+        clusters.add(cluster)
+
+    manager = ctx.managers['ceph']
+    for dmon in daemons:
+        if '.' in dmon:
+            dm_parts = dmon.split('.')
+            if dm_parts[1].isdigit():
+                if dm_parts[0] == 'osd':
+                    manager.mark_down_osd(int(dm_parts[1]))
+
+    if config.get('wait-for-healthy', True):
+        for cluster in clusters:
+            healthy(ctx=ctx, config=dict(cluster=cluster))
+    if config.get('wait-for-osds-up', False):
+        for cluster in clusters:
+            wait_for_osds_up(ctx=ctx, config=dict(cluster=cluster))
+    yield
+
+
+@contextlib.contextmanager
+def stop(ctx, config):
+    """
+    Stop ceph daemons
+
+    For example::
+      tasks:
+      - ceph.stop: [mds.*]
+
+      tasks:
+      - ceph.stop: [osd.0, osd.2]
+
+      tasks:
+      - ceph.stop:
+          daemons: [osd.0, osd.2]
+
+    """
+    if config is None:
+        config = {}
+    elif isinstance(config, list):
+        config = {'daemons': config}
+
+    daemons = ctx.daemons.resolve_role_list(config.get('daemons', None), CEPH_ROLE_TYPES, True)
+    for role in daemons:
+        cluster, type_, id_ = teuthology.split_role(role)
+        ctx.daemons.get_daemon(type_, id_, cluster).stop()
+
+    yield
+
+
+@contextlib.contextmanager
+def wait_for_failure(ctx, config):
+    """
+    Wait for a failure of a ceph daemon
+
+    For example::
+      tasks:
+      - ceph.wait_for_failure: [mds.*]
+
+      tasks:
+      - ceph.wait_for_failure: [osd.0, osd.2]
+
+      tasks:
+      - ceph.wait_for_failure:
+          daemons: [osd.0, osd.2]
+
+    """
+    if config is None:
+        config = {}
+    elif isinstance(config, list):
+        config = {'daemons': config}
+
+    daemons = ctx.daemons.resolve_role_list(config.get('daemons', None), CEPH_ROLE_TYPES, True)
+    for role in daemons:
+        cluster, type_, id_ = teuthology.split_role(role)
+        try:
+            ctx.daemons.get_daemon(type_, id_, cluster).wait()
+        except:
+            log.info('Saw expected daemon failure.  Continuing.')
+            pass
+        else:
+            raise RuntimeError('daemon %s did not fail' % role)
+
+    yield
+
+
+def validate_config(ctx, config):
+    """
+    Perform some simple validation on task configuration.
+    Raises exceptions.ConfigError if an error is found.
+    """
+    # check for osds from multiple clusters on the same host
+    for remote, roles_for_host in ctx.cluster.remotes.items():
+        last_cluster = None
+        last_role = None
+        for role in roles_for_host:
+            role_cluster, role_type, _ = teuthology.split_role(role)
+            if role_type != 'osd':
+                continue
+            if last_cluster and last_cluster != role_cluster:
+                msg = "Host should not have osds (%s and %s) from multiple clusters" % (
+                    last_role, role)
+                raise exceptions.ConfigError(msg)
+            last_cluster = role_cluster
+            last_role = role
+
+
+@contextlib.contextmanager
+def task(ctx, config):
+    """
+    Set up and tear down a Ceph cluster.
+
+    For example::
+
+        tasks:
+        - ceph:
+        - interactive:
+
+    You can also specify what branch to run::
+
+        tasks:
+        - ceph:
+            branch: foo
+
+    Or a tag::
+
+        tasks:
+        - ceph:
+            tag: v0.42.13
+
+    Or a sha1::
+
+        tasks:
+        - ceph:
+            sha1: 1376a5ab0c89780eab39ffbbe436f6a6092314ed
+
+    Or a local source dir::
+
+        tasks:
+        - ceph:
+            path: /home/sage/ceph
+
+    To capture code coverage data, use::
+
+        tasks:
+        - ceph:
+            coverage: true
+
+    To use btrfs, ext4, or xfs on the target's scratch disks, use::
+
+        tasks:
+        - ceph:
+            fs: xfs
+            mkfs_options: [-b,size=65536,-l,logdev=/dev/sdc1]
+            mount_options: [nobarrier, inode64]
+
+    Note, this will cause the task to check the /scratch_devs file on each node
+    for available devices.  If no such file is found, /dev/sdb will be used.
+
+    To run some daemons under valgrind, include their names
+    and the tool/args to use in a valgrind section::
+
+        tasks:
+        - ceph:
+          valgrind:
+            mds.1: --tool=memcheck
+            osd.1: [--tool=memcheck, --leak-check=no]
+
+    Those nodes which are using memcheck or valgrind will get
+    checked for bad results.
+
+    To adjust or modify config options, use::
+
+        tasks:
+        - ceph:
+            conf:
+              section:
+                key: value
+
+    For example::
+
+        tasks:
+        - ceph:
+            conf:
+              mds.0:
+                some option: value
+                other key: other value
+              client.0:
+                debug client: 10
+                debug ms: 1
+
+    By default, the cluster log is checked for errors and warnings,
+    and the run marked failed if any appear. You can ignore log
+    entries by giving a list of egrep compatible regexes, i.e.:
+
+        tasks:
+        - ceph:
+            log-whitelist: ['foo.*bar', 'bad message']
+
+    To run multiple ceph clusters, use multiple ceph tasks, and roles
+    with a cluster name prefix, e.g. cluster1.client.0. Roles with no
+    cluster use the default cluster name, 'ceph'. OSDs from separate
+    clusters must be on separate hosts. Clients and non-osd daemons
+    from multiple clusters may be colocated. For each cluster, add an
+    instance of the ceph task with the cluster name specified, e.g.::
+
+        roles:
+        - [mon.a, osd.0, osd.1]
+        - [backup.mon.a, backup.osd.0, backup.osd.1]
+        - [client.0, backup.client.0]
+        tasks:
+        - ceph:
+            cluster: ceph
+        - ceph:
+            cluster: backup
+
+    :param ctx: Context
+    :param config: Configuration
+
+    """
+    if config is None:
+        config = {}
+    assert isinstance(config, dict), \
+        "task ceph only supports a dictionary for configuration"
+
+    overrides = ctx.config.get('overrides', {})
+    teuthology.deep_merge(config, overrides.get('ceph', {}))
+
+    first_ceph_cluster = False
+    if not hasattr(ctx, 'daemons'):
+        first_ceph_cluster = True
+        ctx.daemons = DaemonGroup()
+
+    testdir = teuthology.get_testdir(ctx)
+    if config.get('coverage'):
+        coverage_dir = '{tdir}/archive/coverage'.format(tdir=testdir)
+        log.info('Creating coverage directory...')
+        run.wait(
+            ctx.cluster.run(
+                args=[
+                    'install', '-d', '-m0755', '--',
+                    coverage_dir,
+                ],
+                wait=False,
+            )
+        )
+
+    if 'cluster' not in config:
+        config['cluster'] = 'ceph'
+
+    validate_config(ctx, config)
+
+    subtasks = []
+    if first_ceph_cluster:
+        # these tasks handle general log setup and parsing on all hosts,
+        # so they should only be run once
+        subtasks = [
+            lambda: ceph_log(ctx=ctx, config=None),
+            lambda: valgrind_post(ctx=ctx, config=config),
+        ]
+
+    subtasks += [
+        lambda: cluster(ctx=ctx, config=dict(
+            conf=config.get('conf', {}),
+            fs=config.get('fs', 'xfs'),
+            mkfs_options=config.get('mkfs_options', None),
+            mount_options=config.get('mount_options', None),
+            block_journal=config.get('block_journal', None),
+            tmpfs_journal=config.get('tmpfs_journal', None),
+            skip_mgr_daemons=config.get('skip_mgr_daemons', False),
+            log_whitelist=config.get('log-whitelist', []),
+            cpu_profile=set(config.get('cpu_profile', []),),
+            cluster=config['cluster'],
+        )),
+        lambda: run_daemon(ctx=ctx, config=config, type_='mon'),
+        lambda: run_daemon(ctx=ctx, config=config, type_='mgr'),
+        lambda: crush_setup(ctx=ctx, config=config),
+        lambda: run_daemon(ctx=ctx, config=config, type_='osd'),
+        lambda: create_rbd_pool(ctx=ctx, config=config),
+        lambda: cephfs_setup(ctx=ctx, config=config),
+        lambda: run_daemon(ctx=ctx, config=config, type_='mds'),
+    ]
+
+    with contextutil.nested(*subtasks):
+        first_mon = teuthology.get_first_mon(ctx, config, config['cluster'])
+        (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
+        if not hasattr(ctx, 'managers'):
+            ctx.managers = {}
+        ctx.managers[config['cluster']] = CephManager(
+            mon,
+            ctx=ctx,
+            logger=log.getChild('ceph_manager.' + config['cluster']),
+            cluster=config['cluster'],
+        )
+
+        try:
+            if config.get('wait-for-healthy', True):
+                healthy(ctx=ctx, config=dict(cluster=config['cluster']))
+
+            yield
+        finally:
+            if config.get('wait-for-scrub', True):
+                osd_scrub_pgs(ctx, config)
+
+            # stop logging health to clog during shutdown, or else we generate
+            # a bunch of scary messages unrelated to our actual run.
+            firstmon = teuthology.get_first_mon(ctx, config, config['cluster'])
+            (mon0_remote,) = ctx.cluster.only(firstmon).remotes.keys()
+            mon0_remote.run(
+                args=[
+                    'sudo',
+                    'ceph',
+                    '--cluster', config['cluster'],
+                    'tell',
+                    'mon.*',
+                    'injectargs',
+                    '--',
+                    '--no-mon-health-to-clog',
+                ]
+            )