initial code repo
[stor4nfv.git] / src / ceph / qa / workunits / ceph-disk / ceph-disk-test.py
diff --git a/src/ceph/qa/workunits/ceph-disk/ceph-disk-test.py b/src/ceph/qa/workunits/ceph-disk/ceph-disk-test.py
new file mode 100644 (file)
index 0000000..637fa90
--- /dev/null
@@ -0,0 +1,777 @@
+#
+# Copyright (C) 2015, 2016 Red Hat <contact@redhat.com>
+#
+# Author: Loic Dachary <loic@dachary.org>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU Library Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Library Public License for more details.
+#
+# When debugging these tests (must be root), here are a few useful commands:
+#
+#  export PATH=.:..:$PATH
+#  ceph-disk.sh # run once to prepare the environment as it would be by teuthology
+#  ln -sf /home/ubuntu/ceph/src/ceph-disk/ceph_disk/main.py $(which ceph-disk)
+#  ln -sf /home/ubuntu/ceph/udev/95-ceph-osd.rules /lib/udev/rules.d/95-ceph-osd.rules
+#  ln -sf /home/ubuntu/ceph/systemd/ceph-disk@.service /usr/lib/systemd/system/ceph-disk@.service
+#  ceph-disk.conf will be silently ignored if it is a symbolic link or a hard link /var/log/upstart for logs
+#  cp /home/ubuntu/ceph/src/upstart/ceph-disk.conf /etc/init/ceph-disk.conf
+#  id=3 ; ceph-disk deactivate --deactivate-by-id $id ; ceph-disk destroy --purge --zap --destroy-by-id $id
+#  py.test -s -v -k test_activate_dmcrypt_luks ceph-disk-test.py
+#
+#  CentOS 7
+#    udevadm monitor --property & tail -f /var/log/messages
+#    udev rules messages are logged in /var/log/messages
+#    systemctl stop ceph-osd@2
+#    systemctl start ceph-osd@2
+#
+#  udevadm monitor --property & tail -f /var/log/syslog /var/log/upstart/*  # on Ubuntu 14.04
+#  udevadm test --action=add /block/vdb/vdb1 # verify the udev rule is run as expected
+#  udevadm control --reload # when changing the udev rules
+#  sudo /usr/sbin/ceph-disk -v trigger /dev/vdb1 # activates if vdb1 is data
+#
+#  integration tests coverage
+#  pip install coverage
+#  perl -pi -e 's|"ceph-disk |"coverage run --source=/usr/sbin/ceph-disk --append /usr/sbin/ceph-disk |' ceph-disk-test.py
+#  rm -f .coverage ; py.test -s -v ceph-disk-test.py
+#  coverage report --show-missing
+#
+import argparse
+import json
+import logging
+import configobj
+import os
+import pytest
+import re
+import subprocess
+import sys
+import tempfile
+import time
+import uuid
+
+LOG = logging.getLogger('CephDisk')
+
+
+class CephDisk:
+
+    def __init__(self):
+        self.conf = configobj.ConfigObj('/etc/ceph/ceph.conf')
+
+    def save_conf(self):
+        self.conf.write(open('/etc/ceph/ceph.conf', 'wb'))
+
+    @staticmethod
+    def helper(command):
+        command = "ceph-helpers-root.sh " + command
+        return CephDisk.sh(command)
+
+    @staticmethod
+    def sh(command):
+        LOG.debug(":sh: " + command)
+        proc = subprocess.Popen(
+            args=command,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
+            shell=True,
+            bufsize=1)
+        output, _ = proc.communicate()
+        if proc.poll():
+            LOG.warning(output.decode('utf-8'))
+            raise subprocess.CalledProcessError(
+                returncode=proc.returncode,
+                cmd=command,
+                output=output,
+            )
+        lines = []
+        for line in output.decode('utf-8').split('\n'):
+            if 'dangerous and experimental' in line:
+                LOG.debug('SKIP dangerous and experimental')
+                continue
+            lines.append(line)
+            LOG.debug(line.strip().encode('ascii', 'ignore'))
+        return "".join(lines)
+
+    def unused_disks(self, pattern='[vs]d.'):
+        names = [x for x in os.listdir("/sys/block") if re.match(pattern, x)]
+        if not names:
+            return []
+        disks = json.loads(
+            self.sh("ceph-disk list --format json " + " ".join(names)))
+        unused = []
+        for disk in disks:
+            if 'partitions' not in disk:
+                unused.append(disk['path'])
+        return unused
+
+    def ensure_sd(self):
+        LOG.debug(self.unused_disks('sd.'))
+        if self.unused_disks('sd.'):
+            return
+        modprobe = "modprobe scsi_debug vpd_use_hostno=0 add_host=1 dev_size_mb=300 ; udevadm settle"
+        try:
+            self.sh(modprobe)
+        except:
+            self.helper("install linux-image-extra-3.13.0-61-generic")
+            self.sh(modprobe)
+
+    def unload_scsi_debug(self):
+        self.sh("rmmod scsi_debug || true")
+
+    def get_lockbox(self):
+        disks = json.loads(self.sh("ceph-disk list --format json"))
+        for disk in disks:
+            if 'partitions' in disk:
+                for partition in disk['partitions']:
+                    if partition.get('type') == 'lockbox':
+                        return partition
+        raise Exception("no lockbox found " + str(disks))
+
+    def get_osd_partition(self, uuid):
+        disks = json.loads(self.sh("ceph-disk list --format json"))
+        for disk in disks:
+            if 'partitions' in disk:
+                for partition in disk['partitions']:
+                    if partition.get('uuid') == uuid:
+                        return partition
+        raise Exception("uuid = " + uuid + " not found in " + str(disks))
+
+    def get_journal_partition(self, uuid):
+        return self.get_space_partition('journal', uuid)
+
+    def get_block_partition(self, uuid):
+        return self.get_space_partition('block', uuid)
+
+    def get_blockdb_partition(self, uuid):
+        return self.get_space_partition('block.db', uuid)
+
+    def get_blockwal_partition(self, uuid):
+        return self.get_space_partition('block.wal', uuid)
+
+    def get_space_partition(self, name, uuid):
+        data_partition = self.get_osd_partition(uuid)
+        space_dev = data_partition[name + '_dev']
+        disks = json.loads(self.sh("ceph-disk list --format json"))
+        for disk in disks:
+            if 'partitions' in disk:
+                for partition in disk['partitions']:
+                    if partition['path'] == space_dev:
+                        if name + '_for' in partition:
+                            assert partition[
+                                name + '_for'] == data_partition['path']
+                        return partition
+        raise Exception(
+            name + " for uuid = " + uuid + " not found in " + str(disks))
+
+    def destroy_osd(self, uuid):
+        id = self.sh("ceph osd create " + uuid).strip()
+        self.sh("""
+        set -xe
+        ceph-disk --verbose deactivate --deactivate-by-id {id}
+        ceph-disk --verbose destroy --purge --destroy-by-id {id} --zap
+        """.format(id=id))
+
+    def deactivate_osd(self, uuid):
+        id = self.sh("ceph osd create " + uuid).strip()
+        self.sh("""
+        set -xe
+        ceph-disk --verbose deactivate --once --deactivate-by-id {id}
+        """.format(id=id))
+
+    @staticmethod
+    def osd_up_predicate(osds, uuid):
+        for osd in osds:
+            if osd['uuid'] == uuid and 'up' in osd['state']:
+                return True
+        return False
+
+    @staticmethod
+    def wait_for_osd_up(uuid):
+        CephDisk.wait_for_osd(uuid, CephDisk.osd_up_predicate, 'up')
+
+    @staticmethod
+    def osd_down_predicate(osds, uuid):
+        found = False
+        for osd in osds:
+            if osd['uuid'] == uuid:
+                found = True
+                if 'down' in osd['state'] or ['exists'] == osd['state']:
+                    return True
+        return not found
+
+    @staticmethod
+    def wait_for_osd_down(uuid):
+        CephDisk.wait_for_osd(uuid, CephDisk.osd_down_predicate, 'down')
+
+    @staticmethod
+    def wait_for_osd(uuid, predicate, info):
+        LOG.info("wait_for_osd " + info + " " + uuid)
+        for delay in (1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024):
+            dump = json.loads(CephDisk.sh("ceph osd dump -f json"))
+            if predicate(dump['osds'], uuid):
+                return True
+            time.sleep(delay)
+        raise Exception('timeout waiting for osd ' + uuid + ' to be ' + info)
+
+    def check_osd_status(self, uuid, space_name=None):
+        data_partition = self.get_osd_partition(uuid)
+        assert data_partition['type'] == 'data'
+        assert data_partition['state'] == 'active'
+        if space_name is not None:
+            space_partition = self.get_space_partition(space_name, uuid)
+            assert space_partition
+
+
+class TestCephDisk(object):
+
+    def setup_class(self):
+        logging.basicConfig(level=logging.DEBUG)
+        c = CephDisk()
+        if c.sh("lsb_release -si").strip() == 'CentOS':
+            c.helper("install multipath-tools device-mapper-multipath")
+        c.conf['global']['pid file'] = '/var/run/ceph/$cluster-$name.pid'
+        #
+        # Avoid json parsing interference
+        #
+        c.conf['global']['debug monc'] = 0
+        #
+        # objecstore
+        #
+        c.conf['global']['osd journal size'] = 100
+        #
+        # bluestore
+        #
+        c.conf['global']['bluestore fsck on mount'] = 'true'
+        c.save_conf()
+
+    def setup(self):
+        c = CephDisk()
+        for key in ('osd objectstore', 'osd dmcrypt type'):
+            if key in c.conf['global']:
+                del c.conf['global'][key]
+        c.save_conf()
+
+    def test_deactivate_reactivate_osd(self):
+        c = CephDisk()
+        disk = c.unused_disks()[0]
+        osd_uuid = str(uuid.uuid1())
+        c.sh("ceph-disk --verbose zap " + disk)
+        c.sh("ceph-disk --verbose prepare --filestore --osd-uuid " + osd_uuid +
+             " " + disk)
+        c.wait_for_osd_up(osd_uuid)
+        device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
+        assert len(device['partitions']) == 2
+        c.check_osd_status(osd_uuid, 'journal')
+        data_partition = c.get_osd_partition(osd_uuid)
+        c.sh("ceph-disk --verbose deactivate " + data_partition['path'])
+        c.wait_for_osd_down(osd_uuid)
+        c.sh("ceph-disk --verbose activate " + data_partition['path'] + " --reactivate")
+        # check again
+        c.wait_for_osd_up(osd_uuid)
+        device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
+        assert len(device['partitions']) == 2
+        c.check_osd_status(osd_uuid, 'journal')
+        c.helper("pool_read_write")
+        c.destroy_osd(osd_uuid)
+
+    def test_destroy_osd_by_id(self):
+        c = CephDisk()
+        disk = c.unused_disks()[0]
+        osd_uuid = str(uuid.uuid1())
+        c.sh("ceph-disk --verbose prepare --filestore --osd-uuid " + osd_uuid + " " + disk)
+        c.wait_for_osd_up(osd_uuid)
+        c.check_osd_status(osd_uuid)
+        c.destroy_osd(osd_uuid)
+
+    def test_destroy_osd_by_dev_path(self):
+        c = CephDisk()
+        disk = c.unused_disks()[0]
+        osd_uuid = str(uuid.uuid1())
+        c.sh("ceph-disk --verbose prepare --filestore --osd-uuid " + osd_uuid + " " + disk)
+        c.wait_for_osd_up(osd_uuid)
+        partition = c.get_osd_partition(osd_uuid)
+        assert partition['type'] == 'data'
+        assert partition['state'] == 'active'
+        c.sh("ceph-disk --verbose deactivate " + partition['path'])
+        c.wait_for_osd_down(osd_uuid)
+        c.sh("ceph-disk --verbose destroy --purge " + partition['path'] + " --zap")
+
+    def test_deactivate_reactivate_dmcrypt_plain(self):
+        c = CephDisk()
+        c.conf['global']['osd dmcrypt type'] = 'plain'
+        c.save_conf()
+        osd_uuid = self.activate_dmcrypt('ceph-disk-no-lockbox')
+        data_partition = c.get_osd_partition(osd_uuid)
+        c.sh("ceph-disk --verbose deactivate " + data_partition['path'])
+        c.wait_for_osd_down(osd_uuid)
+        c.sh("ceph-disk --verbose activate-journal " + data_partition['journal_dev'] +
+             " --reactivate" + " --dmcrypt")
+        c.wait_for_osd_up(osd_uuid)
+        c.check_osd_status(osd_uuid, 'journal')
+        c.destroy_osd(osd_uuid)
+        c.save_conf()
+
+    def test_deactivate_reactivate_dmcrypt_luks(self):
+        c = CephDisk()
+        osd_uuid = self.activate_dmcrypt('ceph-disk')
+        data_partition = c.get_osd_partition(osd_uuid)
+        lockbox_partition = c.get_lockbox()
+        c.sh("ceph-disk --verbose deactivate " + data_partition['path'])
+        c.wait_for_osd_down(osd_uuid)
+        c.sh("ceph-disk --verbose trigger --sync " + lockbox_partition['path'])
+        c.sh("ceph-disk --verbose activate-journal " + data_partition['journal_dev'] +
+             " --reactivate" + " --dmcrypt")
+        c.wait_for_osd_up(osd_uuid)
+        c.check_osd_status(osd_uuid, 'journal')
+        c.destroy_osd(osd_uuid)
+
+    def test_activate_dmcrypt_plain_no_lockbox(self):
+        c = CephDisk()
+        c.conf['global']['osd dmcrypt type'] = 'plain'
+        c.save_conf()
+        osd_uuid = self.activate_dmcrypt('ceph-disk-no-lockbox')
+        c.destroy_osd(osd_uuid)
+        c.save_conf()
+
+    def test_activate_dmcrypt_luks_no_lockbox(self):
+        c = CephDisk()
+        osd_uuid = self.activate_dmcrypt('ceph-disk-no-lockbox')
+        c.destroy_osd(osd_uuid)
+
+    def test_activate_dmcrypt_luks_with_lockbox(self):
+        c = CephDisk()
+        osd_uuid = self.activate_dmcrypt('ceph-disk')
+        c.destroy_osd(osd_uuid)
+
+    def test_activate_lockbox(self):
+        c = CephDisk()
+        osd_uuid = self.activate_dmcrypt('ceph-disk')
+        lockbox = c.get_lockbox()
+        assert lockbox['state'] == 'active'
+        c.sh("umount " + lockbox['path'])
+        lockbox = c.get_lockbox()
+        assert lockbox['state'] == 'prepared'
+        c.sh("ceph-disk --verbose trigger " + lockbox['path'])
+        success = False
+        for delay in (1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024):
+            lockbox = c.get_lockbox()
+            if lockbox['state'] == 'active':
+                success = True
+                break
+            time.sleep(delay)
+        if not success:
+            raise Exception('timeout waiting for lockbox ' + lockbox['path'])
+        c.destroy_osd(osd_uuid)
+
+    def activate_dmcrypt(self, ceph_disk):
+        c = CephDisk()
+        disk = c.unused_disks()[0]
+        osd_uuid = str(uuid.uuid1())
+        journal_uuid = str(uuid.uuid1())
+        c.sh("ceph-disk --verbose zap " + disk)
+        c.sh(ceph_disk + " --verbose prepare --filestore " +
+             " --osd-uuid " + osd_uuid +
+             " --journal-uuid " + journal_uuid +
+             " --dmcrypt " +
+             " " + disk)
+        c.wait_for_osd_up(osd_uuid)
+        c.check_osd_status(osd_uuid, 'journal')
+        return osd_uuid
+
+    def test_trigger_dmcrypt_journal_lockbox(self):
+        c = CephDisk()
+        osd_uuid = self.activate_dmcrypt('ceph-disk')
+        data_partition = c.get_osd_partition(osd_uuid)
+        lockbox_partition = c.get_lockbox()
+        c.deactivate_osd(osd_uuid)
+        c.wait_for_osd_down(osd_uuid)
+        with pytest.raises(subprocess.CalledProcessError):
+            # fails because the lockbox is not mounted yet
+            c.sh("ceph-disk --verbose trigger --sync " + data_partition['journal_dev'])
+        c.sh("ceph-disk --verbose trigger --sync " + lockbox_partition['path'])
+        c.wait_for_osd_up(osd_uuid)
+        c.destroy_osd(osd_uuid)
+
+    def test_trigger_dmcrypt_data_lockbox(self):
+        c = CephDisk()
+        osd_uuid = self.activate_dmcrypt('ceph-disk')
+        data_partition = c.get_osd_partition(osd_uuid)
+        lockbox_partition = c.get_lockbox()
+        c.deactivate_osd(osd_uuid)
+        c.wait_for_osd_down(osd_uuid)
+        with pytest.raises(subprocess.CalledProcessError):
+            # fails because the lockbox is not mounted yet
+            c.sh("ceph-disk --verbose trigger --sync " + data_partition['path'])
+        c.sh("ceph-disk --verbose trigger --sync " + lockbox_partition['path'])
+        c.wait_for_osd_up(osd_uuid)
+        c.destroy_osd(osd_uuid)
+
+    def test_trigger_dmcrypt_lockbox(self):
+        c = CephDisk()
+        osd_uuid = self.activate_dmcrypt('ceph-disk')
+        data_partition = c.get_osd_partition(osd_uuid)
+        lockbox_partition = c.get_lockbox()
+        c.deactivate_osd(osd_uuid)
+        c.wait_for_osd_down(osd_uuid)
+        c.sh("ceph-disk --verbose trigger --sync " + lockbox_partition['path'])
+        c.wait_for_osd_up(osd_uuid)
+        c.destroy_osd(osd_uuid)
+
+    def test_activate_no_journal(self):
+        c = CephDisk()
+        disk = c.unused_disks()[0]
+        osd_uuid = str(uuid.uuid1())
+        c.sh("ceph-disk --verbose zap " + disk)
+        c.conf['global']['osd objectstore'] = 'memstore'
+        c.save_conf()
+        c.sh("ceph-disk --verbose prepare --filestore --osd-uuid " + osd_uuid +
+             " " + disk)
+        c.wait_for_osd_up(osd_uuid)
+        device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
+        assert len(device['partitions']) == 1
+        partition = device['partitions'][0]
+        assert partition['type'] == 'data'
+        assert partition['state'] == 'active'
+        assert 'journal_dev' not in partition
+        c.helper("pool_read_write")
+        c.destroy_osd(osd_uuid)
+        c.save_conf()
+
+    def test_activate_with_journal_dev_no_symlink(self):
+        c = CephDisk()
+        disk = c.unused_disks()[0]
+        osd_uuid = str(uuid.uuid1())
+        c.sh("ceph-disk --verbose zap " + disk)
+        c.sh("ceph-disk --verbose prepare --filestore --osd-uuid " + osd_uuid +
+             " " + disk)
+        c.wait_for_osd_up(osd_uuid)
+        device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
+        assert len(device['partitions']) == 2
+        c.check_osd_status(osd_uuid, 'journal')
+        c.helper("pool_read_write")
+        c.destroy_osd(osd_uuid)
+
+    def test_activate_bluestore(self):
+        c = CephDisk()
+        disk = c.unused_disks()[0]
+        osd_uuid = str(uuid.uuid1())
+        c.sh("ceph-disk --verbose zap " + disk)
+        c.conf['global']['osd objectstore'] = 'bluestore'
+        c.save_conf()
+        c.sh("ceph-disk --verbose prepare --bluestore --osd-uuid " + osd_uuid +
+             " " + disk)
+        c.wait_for_osd_up(osd_uuid)
+        device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
+        assert len(device['partitions']) == 2
+        c.check_osd_status(osd_uuid, 'block')
+        c.helper("pool_read_write")
+        c.destroy_osd(osd_uuid)
+        c.sh("ceph-disk --verbose zap " + disk)
+
+    def test_activate_bluestore_seperated_block_db_wal(self):
+        c = CephDisk()
+        disk1 = c.unused_disks()[0]
+        disk2 = c.unused_disks()[1]
+        osd_uuid = str(uuid.uuid1())
+        c.sh("ceph-disk --verbose zap " + disk1 + " " + disk2)
+        c.conf['global']['osd objectstore'] = 'bluestore'
+        c.save_conf()
+        c.sh("ceph-disk --verbose prepare --bluestore --osd-uuid " + osd_uuid +
+             " " + disk1 + " --block.db " + disk2 + " --block.wal " + disk2)
+        c.wait_for_osd_up(osd_uuid)
+        device = json.loads(c.sh("ceph-disk list --format json " + disk1))[0]
+        assert len(device['partitions']) == 2
+        device = json.loads(c.sh("ceph-disk list --format json " + disk2))[0]
+        assert len(device['partitions']) == 2
+        c.check_osd_status(osd_uuid, 'block')
+        c.check_osd_status(osd_uuid, 'block.wal')
+        c.check_osd_status(osd_uuid, 'block.db')
+        c.helper("pool_read_write")
+        c.destroy_osd(osd_uuid)
+        c.sh("ceph-disk --verbose zap " + disk1 + " " + disk2)
+
+    def test_activate_bluestore_reuse_db_wal_partition(self):
+        c = CephDisk()
+        disks = c.unused_disks()
+        block_disk = disks[0]
+        db_wal_disk = disks[1]
+        #
+        # Create an OSD with two disks (one for block, 
+        # the other for block.db and block.wal ) and then destroy osd.
+        #
+        osd_uuid1 = str(uuid.uuid1())
+        c.sh("ceph-disk --verbose zap " + block_disk + " " + db_wal_disk)
+        c.conf['global']['osd objectstore'] = 'bluestore'
+        c.save_conf()
+        c.sh("ceph-disk --verbose prepare --bluestore --osd-uuid " + 
+             osd_uuid1 + " " + block_disk + " --block.db " + db_wal_disk + 
+             " --block.wal " + db_wal_disk)
+        c.wait_for_osd_up(osd_uuid1)
+        blockdb_partition = c.get_blockdb_partition(osd_uuid1)
+        blockdb_path = blockdb_partition['path']
+        blockwal_partition = c.get_blockwal_partition(osd_uuid1)
+        blockwal_path = blockwal_partition['path']
+        c.destroy_osd(osd_uuid1)
+        c.sh("ceph-disk --verbose zap " + block_disk)
+        #
+        # Create another OSD with the block.db and block.wal partition 
+        # of the previous OSD
+        #
+        osd_uuid2 = str(uuid.uuid1())
+        c.sh("ceph-disk --verbose prepare --bluestore --osd-uuid " + 
+             osd_uuid2 + " " + block_disk + " --block.db " + blockdb_path + 
+             " --block.wal " + blockwal_path)
+        c.wait_for_osd_up(osd_uuid2)
+        device = json.loads(c.sh("ceph-disk list --format json " + block_disk))[0]
+        assert len(device['partitions']) == 2
+        device = json.loads(c.sh("ceph-disk list --format json " + db_wal_disk))[0]
+        assert len(device['partitions']) == 2
+        c.check_osd_status(osd_uuid2, 'block')
+        c.check_osd_status(osd_uuid2, 'block.wal')
+        c.check_osd_status(osd_uuid2, 'block.db')
+        blockdb_partition = c.get_blockdb_partition(osd_uuid2)
+        blockwal_partition = c.get_blockwal_partition(osd_uuid2)
+        #
+        # Verify the previous OSD partition has been reused
+        #
+        assert blockdb_partition['path'] == blockdb_path
+        assert blockwal_partition['path'] == blockwal_path
+        c.destroy_osd(osd_uuid2)
+        c.sh("ceph-disk --verbose zap " + block_disk + " " + db_wal_disk)
+
+    def test_activate_with_journal_dev_is_symlink(self):
+        c = CephDisk()
+        disk = c.unused_disks()[0]
+        osd_uuid = str(uuid.uuid1())
+        tempdir = tempfile.mkdtemp()
+        symlink = os.path.join(tempdir, 'osd')
+        os.symlink(disk, symlink)
+        c.sh("ceph-disk --verbose zap " + symlink)
+        c.sh("ceph-disk --verbose prepare --filestore --osd-uuid " + osd_uuid +
+             " " + symlink)
+        c.wait_for_osd_up(osd_uuid)
+        device = json.loads(c.sh("ceph-disk list --format json " + symlink))[0]
+        assert len(device['partitions']) == 2
+        data_partition = c.get_osd_partition(osd_uuid)
+        assert data_partition['type'] == 'data'
+        assert data_partition['state'] == 'active'
+        journal_partition = c.get_journal_partition(osd_uuid)
+        assert journal_partition
+        c.helper("pool_read_write")
+        c.destroy_osd(osd_uuid)
+        c.sh("ceph-disk --verbose zap " + symlink)
+        os.unlink(symlink)
+        os.rmdir(tempdir)
+
+    def test_activate_journal_file(self):
+        c = CephDisk()
+        disks = c.unused_disks()
+        data_disk = disks[0]
+        #
+        # /var/lib/ceph/osd is required otherwise it may violate
+        # restrictions enforced by systemd regarding the directories
+        # which ceph-osd is allowed to read/write
+        #
+        tempdir = tempfile.mkdtemp(dir='/var/lib/ceph/osd')
+        c.sh("chown ceph:ceph " + tempdir + " || true")
+        journal_file = os.path.join(tempdir, 'journal')
+        osd_uuid = str(uuid.uuid1())
+        c.sh("ceph-disk --verbose prepare --filestore --osd-uuid " + osd_uuid +
+             " " + data_disk + " " + journal_file)
+        c.wait_for_osd_up(osd_uuid)
+        device = json.loads(
+            c.sh("ceph-disk list --format json " + data_disk))[0]
+        assert len(device['partitions']) == 1
+        partition = device['partitions'][0]
+        assert journal_file == os.readlink(
+            os.path.join(partition['mount'], 'journal'))
+        c.check_osd_status(osd_uuid)
+        c.helper("pool_read_write 1")  # 1 == pool size
+        c.destroy_osd(osd_uuid)
+        c.sh("ceph-disk --verbose zap " + data_disk)
+        os.unlink(journal_file)
+        os.rmdir(tempdir)
+
+    def test_activate_separated_journal(self):
+        c = CephDisk()
+        disks = c.unused_disks()
+        data_disk = disks[0]
+        journal_disk = disks[1]
+        osd_uuid = self.activate_separated_journal(data_disk, journal_disk)
+        c.helper("pool_read_write 1")  # 1 == pool size
+        c.destroy_osd(osd_uuid)
+        c.sh("ceph-disk --verbose zap " + data_disk + " " + journal_disk)
+
+    def test_activate_separated_journal_dev_is_symlink(self):
+        c = CephDisk()
+        disks = c.unused_disks()
+        data_disk = disks[0]
+        journal_disk = disks[1]
+        tempdir = tempfile.mkdtemp()
+        data_symlink = os.path.join(tempdir, 'osd')
+        os.symlink(data_disk, data_symlink)
+        journal_symlink = os.path.join(tempdir, 'journal')
+        os.symlink(journal_disk, journal_symlink)
+        osd_uuid = self.activate_separated_journal(
+            data_symlink, journal_symlink)
+        c.helper("pool_read_write 1")  # 1 == pool size
+        c.destroy_osd(osd_uuid)
+        c.sh("ceph-disk --verbose zap " + data_symlink + " " + journal_symlink)
+        os.unlink(data_symlink)
+        os.unlink(journal_symlink)
+        os.rmdir(tempdir)
+
+    def activate_separated_journal(self, data_disk, journal_disk):
+        c = CephDisk()
+        osd_uuid = str(uuid.uuid1())
+        c.sh("ceph-disk --verbose prepare --filestore --osd-uuid " + osd_uuid +
+             " " + data_disk + " " + journal_disk)
+        c.wait_for_osd_up(osd_uuid)
+        device = json.loads(
+            c.sh("ceph-disk list --format json " + data_disk))[0]
+        assert len(device['partitions']) == 1
+        c.check_osd_status(osd_uuid, 'journal')
+        return osd_uuid
+
+    #
+    # Create an OSD and get a journal partition from a disk that
+    # already contains a journal partition which is in use. Updates of
+    # the kernel partition table may behave differently when a
+    # partition is in use. See http://tracker.ceph.com/issues/7334 for
+    # more information.
+    #
+    def test_activate_two_separated_journal(self):
+        c = CephDisk()
+        disks = c.unused_disks()
+        data_disk = disks[0]
+        other_data_disk = disks[1]
+        journal_disk = disks[2]
+        osd_uuid = self.activate_separated_journal(data_disk, journal_disk)
+        other_osd_uuid = self.activate_separated_journal(
+            other_data_disk, journal_disk)
+        #
+        # read/write can only succeed if the two osds are up because
+        # the pool needs two OSD
+        #
+        c.helper("pool_read_write 2")  # 2 == pool size
+        c.destroy_osd(osd_uuid)
+        c.destroy_osd(other_osd_uuid)
+        c.sh("ceph-disk --verbose zap " + data_disk + " " +
+             journal_disk + " " + other_data_disk)
+
+    #
+    # Create an OSD and reuse an existing journal partition
+    #
+    def test_activate_reuse_journal(self):
+        c = CephDisk()
+        disks = c.unused_disks()
+        data_disk = disks[0]
+        journal_disk = disks[1]
+        #
+        # Create an OSD with a separated journal and destroy it.
+        #
+        osd_uuid = self.activate_separated_journal(data_disk, journal_disk)
+        journal_partition = c.get_journal_partition(osd_uuid)
+        journal_path = journal_partition['path']
+        c.destroy_osd(osd_uuid)
+        c.sh("ceph-disk --verbose zap " + data_disk)
+        osd_uuid = str(uuid.uuid1())
+        #
+        # Create another OSD with the journal partition of the previous OSD
+        #
+        c.sh("ceph-disk --verbose prepare --filestore --osd-uuid " + osd_uuid +
+             " " + data_disk + " " + journal_path)
+        c.helper("pool_read_write 1")  # 1 == pool size
+        c.wait_for_osd_up(osd_uuid)
+        device = json.loads(
+            c.sh("ceph-disk list --format json " + data_disk))[0]
+        assert len(device['partitions']) == 1
+        c.check_osd_status(osd_uuid)
+        journal_partition = c.get_journal_partition(osd_uuid)
+        #
+        # Verify the previous OSD partition has been reused
+        #
+        assert journal_partition['path'] == journal_path
+        c.destroy_osd(osd_uuid)
+        c.sh("ceph-disk --verbose zap " + data_disk + " " + journal_disk)
+
+    def test_activate_multipath(self):
+        c = CephDisk()
+        if c.sh("lsb_release -si").strip() != 'CentOS':
+            pytest.skip(
+                "see issue https://bugs.launchpad.net/ubuntu/+source/multipath-tools/+bug/1488688")
+        c.ensure_sd()
+        #
+        # Figure out the name of the multipath device
+        #
+        disk = c.unused_disks('sd.')[0]
+        c.sh("mpathconf --enable || true")
+        c.sh("multipath " + disk)
+        holders = os.listdir(
+            "/sys/block/" + os.path.basename(disk) + "/holders")
+        assert 1 == len(holders)
+        name = open("/sys/block/" + holders[0] + "/dm/name").read()
+        multipath = "/dev/mapper/" + name
+        #
+        # Prepare the multipath device
+        #
+        osd_uuid = str(uuid.uuid1())
+        c.sh("ceph-disk --verbose zap " + multipath)
+        c.sh("ceph-disk --verbose prepare --filestore --osd-uuid " + osd_uuid +
+             " " + multipath)
+        c.wait_for_osd_up(osd_uuid)
+        device = json.loads(
+            c.sh("ceph-disk list --format json " + multipath))[0]
+        assert len(device['partitions']) == 2
+        data_partition = c.get_osd_partition(osd_uuid)
+        assert data_partition['type'] == 'data'
+        assert data_partition['state'] == 'active'
+        journal_partition = c.get_journal_partition(osd_uuid)
+        assert journal_partition
+        c.helper("pool_read_write")
+        c.destroy_osd(osd_uuid)
+        c.sh("udevadm settle")
+        c.sh("multipath -F")
+        c.unload_scsi_debug()
+
+
+class CephDiskTest(CephDisk):
+
+    def main(self, argv):
+        parser = argparse.ArgumentParser(
+            'ceph-disk-test',
+        )
+        parser.add_argument(
+            '-v', '--verbose',
+            action='store_true', default=None,
+            help='be more verbose',
+        )
+        parser.add_argument(
+            '--destroy-osd',
+            help='stop, umount and destroy',
+        )
+        args = parser.parse_args(argv)
+
+        if args.verbose:
+            logging.basicConfig(level=logging.DEBUG)
+
+        if args.destroy_osd:
+            dump = json.loads(CephDisk.sh("ceph osd dump -f json"))
+            osd_uuid = None
+            for osd in dump['osds']:
+                if str(osd['osd']) == args.destroy_osd:
+                    osd_uuid = osd['uuid']
+            if osd_uuid:
+                self.destroy_osd(osd_uuid)
+            else:
+                raise Exception("cannot find OSD " + args.destroy_osd +
+                                " ceph osd dump -f json")
+            return
+
+if __name__ == '__main__':
+    sys.exit(CephDiskTest().main(sys.argv[1:]))