initial code repo
[stor4nfv.git] / src / ceph / src / ceph-disk / tests / test_main.py
diff --git a/src/ceph/src/ceph-disk/tests/test_main.py b/src/ceph/src/ceph-disk/tests/test_main.py
new file mode 100644 (file)
index 0000000..57e4af2
--- /dev/null
@@ -0,0 +1,1322 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2015, 2016 Red Hat <contact@redhat.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU Library Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Library Public License for more details.
+#
+from mock import patch, DEFAULT
+import os
+import platform
+import io
+import shutil
+import subprocess
+import tempfile
+import unittest
+from ceph_disk import main
+
+try:
+    import builtins
+except:
+    import __builtin__ as builtins
+
+
+def fail_to_mount(dev, fstype, options):
+    raise main.MountError(dev + " mount fail")
+
+
+class TestCephDisk(object):
+
+    def setup_class(self):
+        main.setup_logging(verbose=True, log_stdout=False)
+
+    def test_main_list_json(self, capsys):
+        if platform.system() == "FreeBSD":
+            return
+
+        data = tempfile.mkdtemp()
+        main.setup_statedir(data)
+        args = main.parse_args(['list', '--format', 'json'])
+        with patch.multiple(
+                main,
+                list_devices=lambda: {}):
+            main.main_list(args)
+            out, err = capsys.readouterr()
+            assert '{}\n' == out
+        shutil.rmtree(data)
+
+    def test_main_list_plain(self, capsys):
+        if platform.system() == "FreeBSD":
+            return
+
+        data = tempfile.mkdtemp()
+        main.setup_statedir(data)
+        args = main.parse_args(['list'])
+        with patch.multiple(
+                main,
+                list_devices=lambda: {}):
+            main.main_list(args)
+            out, err = capsys.readouterr()
+            assert '' == out
+        shutil.rmtree(data)
+
+    def test_list_format_more_osd_info_plain(self):
+        dev = {
+            'ceph_fsid': 'UUID',
+            'cluster': 'ceph',
+            'whoami': '1234',
+            'journal_dev': '/dev/Xda2',
+        }
+        out = main.list_format_more_osd_info_plain(dev)
+        assert dev['cluster'] in " ".join(out)
+        assert dev['journal_dev'] in " ".join(out)
+        assert dev['whoami'] in " ".join(out)
+
+        dev = {
+            'ceph_fsid': 'UUID',
+            'whoami': '1234',
+            'journal_dev': '/dev/Xda2',
+        }
+        out = main.list_format_more_osd_info_plain(dev)
+        assert 'unknown cluster' in " ".join(out)
+
+    def test_list_format_plain(self):
+        payload = [{
+            'path': '/dev/Xda',
+            'ptype': 'unknown',
+            'type': 'other',
+            'mount': '/somewhere',
+        }]
+        out = main.list_format_plain(payload)
+        assert payload[0]['path'] in out
+        assert payload[0]['type'] in out
+        assert payload[0]['mount'] in out
+
+        payload = [{
+            'path': '/dev/Xda1',
+            'ptype': 'unknown',
+            'type': 'swap',
+        }]
+        out = main.list_format_plain(payload)
+        assert payload[0]['path'] in out
+        assert payload[0]['type'] in out
+
+        payload = [{
+            'path': '/dev/Xda',
+            'partitions': [
+                {
+                    'dmcrypt': {},
+                    'ptype': 'whatever',
+                    'is_partition': True,
+                    'fs_type': 'ext4',
+                    'path': '/dev/Xda1',
+                    'mounted': '/somewhere',
+                    'type': 'other',
+                }
+            ],
+        }]
+        out = main.list_format_plain(payload)
+        assert payload[0]['path'] in out
+        assert payload[0]['partitions'][0]['path'] in out
+
+    def test_list_format_dev_plain(dev):
+        #
+        # data
+        #
+        dev = {
+            'path': '/dev/Xda1',
+            'ptype': main.PTYPE['regular']['osd']['ready'],
+            'state': 'prepared',
+            'whoami': '1234',
+        }
+        out = main.list_format_dev_plain(dev)
+        assert 'data' in out
+        assert dev['whoami'] in out
+        assert dev['state'] in out
+        #
+        # journal
+        #
+        dev = {
+            'path': '/dev/Xda2',
+            'ptype': main.PTYPE['regular']['journal']['ready'],
+            'journal_for': '/dev/Xda1',
+        }
+        out = main.list_format_dev_plain(dev)
+        assert 'journal' in out
+        assert dev['journal_for'] in out
+
+        #
+        # dmcrypt data
+        #
+        ptype2type = {
+            main.PTYPE['plain']['osd']['ready']: 'plain',
+            main.PTYPE['luks']['osd']['ready']: 'luks',
+        }
+        for (ptype, type) in ptype2type.items():
+            for holders in ((), ("dm_0",), ("dm_0", "dm_1")):
+                dev = {
+                    'dmcrypt': {
+                        'holders': holders,
+                        'type': type,
+                    },
+                    'path': '/dev/Xda1',
+                    'ptype': ptype,
+                    'state': 'prepared',
+                    'whoami': '1234',
+                }
+                out = main.list_format_dev_plain(dev)
+                assert 'data' in out
+                assert 'dmcrypt' in out
+                assert type in out
+                if len(holders) == 1:
+                    assert dev['whoami'] in out
+                for holder in holders:
+                    assert holder in out
+
+        #
+        # dmcrypt journal
+        #
+        ptype2type = {
+            main.PTYPE['plain']['journal']['ready']: 'plain',
+            main.PTYPE['luks']['journal']['ready']: 'luks',
+        }
+        for (ptype, type) in ptype2type.items():
+            for holders in ((), ("dm_0",)):
+                dev = {
+                    'path': '/dev/Xda2',
+                    'ptype': ptype,
+                    'journal_for': '/dev/Xda1',
+                    'dmcrypt': {
+                        'holders': holders,
+                        'type': type,
+                    },
+                }
+                out = main.list_format_dev_plain(dev)
+                assert 'journal' in out
+                assert 'dmcrypt' in out
+                assert type in out
+                assert dev['journal_for'] in out
+                if len(holders) == 1:
+                    assert holders[0] in out
+
+    def test_list_dev_osd(self):
+        dev = "Xda"
+        mount_path = '/mount/path'
+        fs_type = 'ext4'
+        cluster = 'ceph'
+        uuid_map = {}
+
+        def more_osd_info(path, uuid_map, desc):
+            desc['cluster'] = cluster
+        #
+        # mounted therefore active
+        #
+        with patch.multiple(
+                main,
+                is_mounted=lambda dev: mount_path,
+                get_dev_fs=lambda dev: fs_type,
+                more_osd_info=more_osd_info
+        ):
+            desc = {}
+            main.list_dev_osd(dev, uuid_map, desc)
+            assert {'cluster': 'ceph',
+                    'fs_type': 'ext4',
+                    'mount': '/mount/path',
+                    'state': 'active'} == desc
+        #
+        # not mounted and cannot mount: unprepared
+        #
+        mount_path = None
+        with patch.multiple(
+                main,
+                is_mounted=lambda dev: mount_path,
+                get_dev_fs=lambda dev: fs_type,
+                mount=fail_to_mount,
+                more_osd_info=more_osd_info
+        ):
+            desc = {}
+            main.list_dev_osd(dev, uuid_map, desc)
+            assert {'fs_type': 'ext4',
+                    'mount': mount_path,
+                    'state': 'unprepared'} == desc
+        #
+        # not mounted and magic found: prepared
+        #
+
+        def get_oneliner(path, what):
+            if what == 'magic':
+                return main.CEPH_OSD_ONDISK_MAGIC
+            else:
+                raise Exception('unknown ' + what)
+        with patch.multiple(
+                main,
+                is_mounted=lambda dev: mount_path,
+                get_dev_fs=lambda dev: fs_type,
+                mount=DEFAULT,
+                unmount=DEFAULT,
+                get_oneliner=get_oneliner,
+                more_osd_info=more_osd_info
+        ):
+            desc = {}
+            main.list_dev_osd(dev, uuid_map, desc)
+            assert {'cluster': 'ceph',
+                    'fs_type': 'ext4',
+                    'mount': mount_path,
+                    'magic': main.CEPH_OSD_ONDISK_MAGIC,
+                    'state': 'prepared'} == desc
+
+    def test_list_all_partitions(self):
+        if platform.system() == "FreeBSD":
+            return
+
+        disk = "Xda"
+        partition = "Xda1"
+
+        with patch(
+                'ceph_disk.main.os',
+                listdir=lambda path: [disk],
+        ), patch.multiple(
+            main,
+            list_partitions=lambda dev: [partition],
+        ):
+                assert {disk: [partition]} == main.list_all_partitions()
+
+    def test_list_data(self):
+        #
+        # a data partition that fails to mount is silently
+        # ignored
+        #
+        if platform.system() == "FreeBSD":
+            return
+
+        partition_uuid = "56244cf5-83ef-4984-888a-2d8b8e0e04b2"
+        disk = "Xda"
+        partition = "Xda1"
+        fs_type = "ext4"
+
+        def get_partition_type(dev):
+            return main.PTYPE['regular']['osd']['ready']
+        with patch.multiple(
+                main,
+                list_all_partitions=lambda: {disk: [partition]},
+                get_partition_uuid=lambda dev: partition_uuid,
+                get_partition_type=get_partition_type,
+                get_dev_fs=lambda dev: fs_type,
+                mount=fail_to_mount,
+                unmount=DEFAULT,
+                is_partition=lambda dev: True,
+        ):
+            expect = [{'path': '/dev/' + disk,
+                       'partitions': [{
+                           'dmcrypt': {},
+                           'fs_type': fs_type,
+                           'is_partition': True,
+                           'mount': None,
+                           'path': '/dev/' + partition,
+                           'ptype': main.PTYPE['regular']['osd']['ready'],
+                           'state': 'unprepared',
+                           'type': 'data',
+                           'uuid': partition_uuid,
+                       }]}]
+            assert expect == main.list_devices()
+
+    def test_list_dmcrypt_data(self):
+        if platform.system() == "FreeBSD":
+            return
+
+        partition_type2type = {
+            main.PTYPE['plain']['osd']['ready']: 'plain',
+            main.PTYPE['luks']['osd']['ready']: 'LUKS',
+        }
+        for (partition_type, type) in partition_type2type.items():
+            #
+            # dmcrypt data partition with one holder
+            #
+            partition_uuid = "56244cf5-83ef-4984-888a-2d8b8e0e04b2"
+            disk = "Xda"
+            partition = "Xda1"
+            holders = ["dm-dummy"]
+            with patch.multiple(
+                    main,
+                    is_held=lambda dev: holders,
+                    list_all_partitions=lambda: {disk: [partition]},
+                    get_partition_uuid=lambda dev: partition_uuid,
+                    get_partition_type=lambda dev: partition_type,
+                    is_partition=lambda dev: True,
+            ):
+                expect = [{'path': '/dev/' + disk,
+                           'partitions': [{
+                               'dmcrypt': {
+                                   'holders': holders,
+                                   'type': type,
+                               },
+                               'fs_type': None,
+                               'is_partition': True,
+                               'mount': None,
+                               'path': '/dev/' + partition,
+                               'ptype': partition_type,
+                               'state': 'unprepared',
+                               'type': 'data',
+                               'uuid': partition_uuid,
+                           }]}]
+                assert expect == main.list_devices()
+            #
+            # dmcrypt data partition with two holders
+            #
+            partition_uuid = "56244cf5-83ef-4984-888a-2d8b8e0e04b2"
+            disk = "Xda"
+            partition = "Xda1"
+            holders = ["dm-dummy", "dm-dummy1"]
+            with patch.multiple(
+                    main,
+                    is_held=lambda dev: holders,
+                    list_all_partitions=lambda: {disk: [partition]},
+                    get_partition_uuid=lambda dev: partition_uuid,
+                    get_partition_type=lambda dev: partition_type,
+                    is_partition=lambda dev: True,
+            ):
+                expect = [{'path': '/dev/' + disk,
+                           'partitions': [{
+                               'dmcrypt': {
+                                   'holders': holders,
+                                   'type': type,
+                               },
+                               'is_partition': True,
+                               'path': '/dev/' + partition,
+                               'ptype': partition_type,
+                               'type': 'data',
+                               'uuid': partition_uuid,
+                           }]}]
+                assert expect == main.list_devices()
+
+    def test_list_multipath(self):
+        #
+        # multipath data partition
+        #
+        if platform.system() == "FreeBSD":
+            return
+
+        partition_uuid = "56244cf5-83ef-4984-888a-2d8b8e0e04b2"
+        disk = "Xda"
+        partition = "Xda1"
+
+        def get_partition_type(dev):
+            return main.PTYPE['mpath']['osd']['ready']
+        with patch.multiple(
+                main,
+                list_all_partitions=lambda: {disk: [partition]},
+                get_partition_uuid=lambda dev: partition_uuid,
+                get_partition_type=get_partition_type,
+                is_partition=lambda dev: True,
+        ):
+            expect = [{'path': '/dev/' + disk,
+                       'partitions': [{
+                           'dmcrypt': {},
+                           'fs_type': None,
+                           'is_partition': True,
+                           'mount': None,
+                           'multipath': True,
+                           'path': '/dev/' + partition,
+                           'ptype': main.PTYPE['mpath']['osd']['ready'],
+                           'state': 'unprepared',
+                           'type': 'data',
+                           'uuid': partition_uuid,
+                       }]}]
+            assert expect == main.list_devices()
+        #
+        # multipath journal partition
+        #
+        journal_partition_uuid = "2cc40457-259e-4542-b029-785c7cc37871"
+
+        def get_partition_type(dev):
+            return main.PTYPE['mpath']['journal']['ready']
+        with patch.multiple(
+                main,
+                list_all_partitions=lambda: {disk: [partition]},
+                get_partition_uuid=lambda dev: journal_partition_uuid,
+                get_partition_type=get_partition_type,
+                is_partition=lambda dev: True,
+        ):
+            expect = [{'path': '/dev/' + disk,
+                       'partitions': [{
+                           'dmcrypt': {},
+                           'is_partition': True,
+                           'multipath': True,
+                           'path': '/dev/' + partition,
+                           'ptype': main.PTYPE['mpath']['journal']['ready'],
+                           'type': 'journal',
+                           'uuid': journal_partition_uuid,
+                       }]}]
+            assert expect == main.list_devices()
+
+    def test_list_default(self):
+        self.list(main.PTYPE['plain']['osd']['ready'],
+                  main.PTYPE['plain']['journal']['ready'])
+        self.list(main.PTYPE['luks']['osd']['ready'],
+                  main.PTYPE['luks']['journal']['ready'])
+        self.list(main.PTYPE['regular']['osd']['ready'],
+                  main.PTYPE['regular']['journal']['ready'])
+
+    def test_list_bluestore(self):
+        if platform.system() == "FreeBSD":
+            return
+
+        self.list(main.PTYPE['plain']['osd']['ready'],
+                  main.PTYPE['plain']['block']['ready'])
+        self.list(main.PTYPE['luks']['osd']['ready'],
+                  main.PTYPE['luks']['block']['ready'])
+        self.list(main.PTYPE['regular']['osd']['ready'],
+                  main.PTYPE['regular']['block']['ready'])
+
+    def list(self, data_ptype, space_ptype):
+        #
+        # a single disk has a data partition and a journal
+        # partition and the osd is active
+        #
+        name = main.Ptype.space_ptype_to_name(space_ptype)
+        data_uuid = "56244cf5-83ef-4984-888a-2d8b8e0e04b2"
+        disk = "Xda"
+        data = "Xda1"
+        data_holder = "dm-dummy"
+        space = "Xda2"
+        space_holder = "dm-dummy"
+        mount_path = '/mount/path'
+        fs_type = 'ext4'
+        space_uuid = "7ad5e65a-0ca5-40e4-a896-62a74ca61c55"
+        ceph_fsid = "60a2ef70-d99b-4b9b-a83c-8a86e5e60091"
+        osd_id = '1234'
+
+        def get_oneliner(path, what):
+            if '_uuid' in what:
+                if what == name + '_uuid':
+                    return space_uuid
+                else:
+                    return None
+            elif what == 'ceph_fsid':
+                return ceph_fsid
+            elif what == 'whoami':
+                return osd_id
+            else:
+                raise Exception('unknown ' + what)
+
+        def get_partition_uuid(dev):
+            if dev == '/dev/' + data:
+                return data_uuid
+            elif dev == '/dev/' + space:
+                return space_uuid
+            else:
+                raise Exception('unknown ' + dev)
+
+        def get_partition_type(dev):
+            if (dev == '/dev/' + data or
+                    dev == '/dev/' + data_holder):
+                return data_ptype
+            elif (dev == '/dev/' + space or
+                    dev == '/dev/' + space_holder):
+                return space_ptype
+            else:
+                raise Exception('unknown ' + dev)
+        cluster = 'ceph'
+        if data_ptype == main.PTYPE['regular']['osd']['ready']:
+            data_dmcrypt = {}
+        elif data_ptype == main.PTYPE['plain']['osd']['ready']:
+            data_dmcrypt = {
+                'type': 'plain',
+                'holders': [data_holder],
+            }
+        elif data_ptype == main.PTYPE['luks']['osd']['ready']:
+            data_dmcrypt = {
+                'type': 'LUKS',
+                'holders': [data_holder],
+            }
+        else:
+            raise Exception('unknown ' + data_ptype)
+
+        if space_ptype == main.PTYPE['regular'][name]['ready']:
+            space_dmcrypt = {}
+        elif space_ptype == main.PTYPE['plain'][name]['ready']:
+            space_dmcrypt = {
+                'type': 'plain',
+                'holders': [space_holder],
+            }
+        elif space_ptype == main.PTYPE['luks'][name]['ready']:
+            space_dmcrypt = {
+                'type': 'LUKS',
+                'holders': [space_holder],
+            }
+        else:
+            raise Exception('unknown ' + space_ptype)
+
+        if data_dmcrypt:
+            def is_held(dev):
+                if dev == '/dev/' + data:
+                    return [data_holder]
+                elif dev == '/dev/' + space:
+                    return [space_holder]
+                else:
+                    raise Exception('unknown ' + dev)
+        else:
+            def is_held(dev):
+                return []
+
+        with patch.multiple(
+                main,
+                list_all_partitions=lambda: {disk: [data, space]},
+                get_dev_fs=lambda dev: fs_type,
+                is_mounted=lambda dev: mount_path,
+                get_partition_uuid=get_partition_uuid,
+                get_partition_type=get_partition_type,
+                find_cluster_by_uuid=lambda ceph_fsid: cluster,
+                is_partition=lambda dev: True,
+                mount=DEFAULT,
+                unmount=DEFAULT,
+                get_oneliner=get_oneliner,
+                is_held=is_held,
+        ):
+            expect = [{'path': '/dev/' + disk,
+                       'partitions': [{
+                           'ceph_fsid': ceph_fsid,
+                           'cluster': cluster,
+                           'dmcrypt': data_dmcrypt,
+                           'fs_type': fs_type,
+                           'is_partition': True,
+                           name + '_dev': '/dev/' + space,
+                           name + '_uuid': space_uuid,
+                           'mount': mount_path,
+                           'path': '/dev/' + data,
+                           'ptype': data_ptype,
+                           'state': 'active',
+                           'type': 'data',
+                           'whoami': osd_id,
+                           'uuid': data_uuid,
+                       }, {
+                           'dmcrypt': space_dmcrypt,
+                           'is_partition': True,
+                           name + '_for': '/dev/' + data,
+                           'path': '/dev/' + space,
+                           'ptype': space_ptype,
+                           'type': name,
+                           'uuid': space_uuid,
+                       }]}]
+            assert expect == main.list_devices()
+
+    def test_list_other(self):
+        #
+        # not swap, unknown fs type, not mounted, with uuid
+        #
+        if platform.system() == "FreeBSD":
+            return
+
+        partition_uuid = "56244cf5-83ef-4984-888a-2d8b8e0e04b2"
+        partition_type = "e51adfb9-e9fd-4718-9fc1-7a0cb03ea3f4"
+        disk = "Xda"
+        partition = "Xda1"
+        with patch.multiple(
+                main,
+                list_all_partitions=lambda: {disk: [partition]},
+                get_partition_uuid=lambda dev: partition_uuid,
+                get_partition_type=lambda dev: partition_type,
+                is_partition=lambda dev: True,
+        ):
+            expect = [{'path': '/dev/' + disk,
+                       'partitions': [{'dmcrypt': {},
+                                       'is_partition': True,
+                                       'path': '/dev/' + partition,
+                                       'ptype': partition_type,
+                                       'type': 'other',
+                                       'uuid': partition_uuid}]}]
+            assert expect == main.list_devices()
+        #
+        # not swap, mounted, ext4 fs type, with uuid
+        #
+        partition_uuid = "56244cf5-83ef-4984-888a-2d8b8e0e04b2"
+        partition_type = "e51adfb9-e9fd-4718-9fc1-7a0cb03ea3f4"
+        disk = "Xda"
+        partition = "Xda1"
+        mount_path = '/mount/path'
+        fs_type = 'ext4'
+        with patch.multiple(
+                main,
+                list_all_partitions=lambda: {disk: [partition]},
+                get_dev_fs=lambda dev: fs_type,
+                is_mounted=lambda dev: mount_path,
+                get_partition_uuid=lambda dev: partition_uuid,
+                get_partition_type=lambda dev: partition_type,
+                is_partition=lambda dev: True,
+        ):
+            expect = [{'path': '/dev/' + disk,
+                       'partitions': [{
+                           'dmcrypt': {},
+                           'is_partition': True,
+                           'mount': mount_path,
+                           'fs_type': fs_type,
+                           'path': '/dev/' + partition,
+                           'ptype': partition_type,
+                           'type': 'other',
+                           'uuid': partition_uuid,
+                       }]}]
+            assert expect == main.list_devices()
+
+        #
+        # swap, with uuid
+        #
+        partition_uuid = "56244cf5-83ef-4984-888a-2d8b8e0e04b2"
+        partition_type = "e51adfb9-e9fd-4718-9fc1-7a0cb03ea3f4"
+        disk = "Xda"
+        partition = "Xda1"
+        with patch.multiple(
+                main,
+                list_all_partitions=lambda: {disk: [partition]},
+                is_swap=lambda dev: True,
+                get_partition_uuid=lambda dev: partition_uuid,
+                get_partition_type=lambda dev: partition_type,
+                is_partition=lambda dev: True,
+        ):
+            expect = [{'path': '/dev/' + disk,
+                       'partitions': [{'dmcrypt': {},
+                                       'is_partition': True,
+                                       'path': '/dev/' + partition,
+                                       'ptype': partition_type,
+                                       'type': 'swap',
+                                       'uuid': partition_uuid}]}]
+            assert expect == main.list_devices()
+
+        #
+        # whole disk
+        #
+        partition_uuid = "56244cf5-83ef-4984-888a-2d8b8e0e04b2"
+        disk = "Xda"
+        partition = "Xda1"
+        with patch.multiple(
+                main,
+                list_all_partitions=lambda: {disk: []},
+                is_partition=lambda dev: False,
+        ):
+            expect = [{'path': '/dev/' + disk,
+                       'dmcrypt': {},
+                       'is_partition': False,
+                       'ptype': 'unknown',
+                       'type': 'other'}]
+            assert expect == main.list_devices()
+
+
+class TestCephDiskDeactivateAndDestroy(unittest.TestCase):
+
+    def setup_class(self):
+        main.setup_logging(verbose=True, log_stdout=False)
+
+    @patch('{0}.open'.format(builtins.__name__))
+    def test_main_deactivate(self, mock_open):
+        data = tempfile.mkdtemp()
+        main.setup_statedir(data)
+        DMCRYPT_LUKS_OSD_UUID = '4fbd7e29-9d25-41b8-afd0-35865ceff05d'
+        part_uuid = '0ce28a16-6d5d-11e5-aec3-fa163e5c167b'
+        disk = 'sdX'
+        #
+        # Can not find match device by osd-id
+        #
+        args = main.parse_args(['deactivate',
+                                '--cluster', 'ceph',
+                                '--deactivate-by-id', '5566'])
+        fake_device = [{'path': '/dev/' + disk,
+                        'partitions': [{
+                            'path': '/dev/sdX1',
+                            'whoami': '-1',
+                        }]}]
+        with patch.multiple(
+                main,
+                list_devices=lambda: fake_device,
+        ):
+            self.assertRaises(Exception, main.main_deactivate, args)
+
+        #
+        # find match device by osd-id, status: OSD_STATUS_IN_DOWN
+        # with --mark-out option
+        #
+        args = main.parse_args(['deactivate',
+                                '--cluster', 'ceph',
+                                '--deactivate-by-id', '5566',
+                                '--mark-out'])
+        fake_device = [{'path': '/dev/' + disk,
+                        'partitions': [{
+                            'ptype': DMCRYPT_LUKS_OSD_UUID,
+                            'path': '/dev/sdX1',
+                            'whoami': '5566',
+                            'mount': '/var/lib/ceph/osd/ceph-5566/',
+                            'uuid': part_uuid,
+                        }]}]
+        with patch.multiple(
+                main,
+                list_devices=lambda: fake_device,
+                _check_osd_status=lambda cluster, osd_id: 2,
+                _mark_osd_out=lambda cluster, osd_id: True
+        ):
+            main.main_deactivate(args)
+
+        #
+        # find match device by device partition, status: OSD_STATUS_IN_DOWN
+        #
+        args = main.parse_args(['deactivate',
+                                '--cluster', 'ceph',
+                                '/dev/sdX1'])
+        fake_device = [{'path': '/dev/' + disk,
+                        'partitions': [{
+                            'ptype': DMCRYPT_LUKS_OSD_UUID,
+                            'path': '/dev/sdX1',
+                            'whoami': '5566',
+                            'mount': '/var/lib/ceph/osd/ceph-5566/',
+                            'uuid': part_uuid,
+                        }]}]
+        with patch.multiple(
+                main,
+                list_devices=lambda: fake_device,
+                _check_osd_status=lambda cluster, osd_id: 0,
+        ):
+            main.main_deactivate(args)
+
+        #
+        # find match device by device partition, status: OSD_STATUS_IN_UP
+        # with --mark-out option
+        #
+        args = main.parse_args(['deactivate',
+                                '--cluster', 'ceph',
+                                '/dev/sdX1',
+                                '--mark-out'])
+        fake_device = [{'path': '/dev/' + disk,
+                        'partitions': [{
+                            'ptype': DMCRYPT_LUKS_OSD_UUID,
+                            'path': '/dev/sdX1',
+                            'whoami': '5566',
+                            'mount': '/var/lib/ceph/osd/ceph-5566/',
+                            'uuid': part_uuid,
+                        }]}]
+
+        # mock the file open.
+        file_opened = io.StringIO()
+        file_opened.write(u'deactive')
+        mock_open.return_value = file_opened
+
+        with patch.multiple(
+                main,
+                mock_open,
+                list_devices=lambda: fake_device,
+                _check_osd_status=lambda cluster, osd_id: 3,
+                _mark_osd_out=lambda cluster, osd_id: True,
+                stop_daemon=lambda cluster, osd_id: True,
+                _remove_osd_directory_files=lambda path, cluster: True,
+                path_set_context=lambda path: True,
+                unmount=lambda path, do_rm: True,
+                dmcrypt_unmap=lambda part_uuid: True,
+        ):
+            main.main_deactivate(args)
+
+        #
+        # find match device by osd-id, status: OSD_STATUS_OUT_UP
+        #
+        args = main.parse_args(['deactivate',
+                                '--cluster', 'ceph',
+                                '--deactivate-by-id', '5566'])
+        fake_device = [{'path': '/dev/' + disk,
+                        'partitions': [{
+                            'ptype': DMCRYPT_LUKS_OSD_UUID,
+                            'path': '/dev/sdX1',
+                            'whoami': '5566',
+                            'mount': '/var/lib/ceph/osd/ceph-5566/',
+                            'uuid': part_uuid,
+                        }]}]
+
+        # mock the file open.
+        file_opened = io.StringIO()
+        file_opened.write(u'deactive')
+        mock_open.return_value = file_opened
+
+        with patch.multiple(
+                main,
+                mock_open,
+                list_devices=lambda: fake_device,
+                _check_osd_status=lambda cluster, osd_id: 1,
+                _mark_osd_out=lambda cluster, osd_id: True,
+                stop_daemon=lambda cluster, osd_id: True,
+                _remove_osd_directory_files=lambda path, cluster: True,
+                path_set_context=lambda path: True,
+                unmount=lambda path, do_rm: True,
+                dmcrypt_unmap=lambda part_uuid: True,
+        ):
+            main.main_deactivate(args)
+        shutil.rmtree(data)
+
+    def test_mark_out_out(self):
+        def mark_osd_out_fail(osd_id):
+            raise main.Error('Could not find osd.%s, is a vaild/exist osd id?'
+                             % osd_id)
+
+        with patch.multiple(
+                main,
+                command=mark_osd_out_fail,
+        ):
+            self.assertRaises(Exception, main._mark_osd_out, 'ceph', '5566')
+
+    def test_check_osd_status(self):
+        #
+        # command failure
+        #
+        with patch.multiple(
+                main,
+                command=raise_command_error,
+        ):
+            self.assertRaises(Exception, main._check_osd_status,
+                              'ceph', '5566')
+
+        #
+        # osd not found
+        #
+
+        fake_data = ('{"osds":[{"osd":0,"up":1,"in":1},'
+                     '{"osd":1,"up":1,"in":1}]}')
+
+        def return_fake_value(cmd):
+            return fake_data, '', 0
+
+        with patch.multiple(
+                main,
+                command=return_fake_value,
+        ):
+            self.assertRaises(Exception, main._check_osd_status,
+                              'ceph', '5566')
+
+        #
+        # successfully
+        #
+
+        fake_data = ('{"osds":[{"osd":0,"up":1,"in":1},'
+                     '{"osd":5566,"up":1,"in":1}]}')
+
+        def return_fake_value(cmd):
+            return fake_data, '', 0
+
+        with patch.multiple(
+                main,
+                command=return_fake_value,
+        ):
+            main._check_osd_status('ceph', '5566')
+
+    def test_stop_daemon(self):
+        STATEDIR = '/var/lib/ceph'
+        cluster = 'ceph'
+        osd_id = '5566'
+
+        def stop_daemon_fail(cmd):
+            raise Exception('ceph osd stop failed')
+
+        #
+        # fail on init type
+        #
+        with patch('os.path.exists', return_value=False):
+            self.assertRaises(Exception, main.stop_daemon, 'ceph', '5566')
+
+        #
+        # upstart failure
+        #
+        fake_path = (STATEDIR + '/osd/{cluster}-{osd_id}/upstart').format(
+            cluster=cluster, osd_id=osd_id)
+
+        def path_exist(check_path):
+            if check_path == fake_path:
+                return True
+            else:
+                False
+
+        patcher = patch('os.path.exists')
+        check_path = patcher.start()
+        check_path.side_effect = path_exist
+        with patch.multiple(
+                main,
+                check_path,
+                command_check_call=stop_daemon_fail,
+        ):
+            self.assertRaises(Exception, main.stop_daemon, 'ceph', '5566')
+
+        #
+        # sysvinit failure
+        #
+        fake_path = (STATEDIR + '/osd/{cluster}-{osd_id}/sysvinit').format(
+            cluster=cluster, osd_id=osd_id)
+
+        def path_exist(check_path):
+            if check_path == fake_path:
+                return True
+            else:
+                return False
+
+        patcher = patch('os.path.exists')
+        check_path = patcher.start()
+        check_path.side_effect = path_exist
+        with patch.multiple(
+                main,
+                check_path,
+                which=lambda name: True,
+                command_check_call=stop_daemon_fail,
+        ):
+            self.assertRaises(Exception, main.stop_daemon, 'ceph', '5566')
+
+        #
+        # systemd failure
+        #
+        fake_path = (STATEDIR + '/osd/{cluster}-{osd_id}/systemd').format(
+            cluster=cluster, osd_id=osd_id)
+
+        def path_exist(check_path):
+            if check_path == fake_path:
+                return True
+            else:
+                False
+
+        def stop_daemon_fail(cmd):
+            if 'stop' in cmd:
+                raise Exception('ceph osd stop failed')
+            else:
+                return True
+
+        patcher = patch('os.path.exists')
+        check_path = patcher.start()
+        check_path.side_effect = path_exist
+        with patch.multiple(
+                main,
+                check_path,
+                command_check_call=stop_daemon_fail,
+        ):
+            self.assertRaises(Exception, main.stop_daemon, 'ceph', '5566')
+
+    def test_remove_osd_directory_files(self):
+        cluster = 'ceph'
+        mounted_path = 'somewhere'
+        fake_path_2 = None
+        fake_path_remove_2 = None
+        fake_path_remove_init = None
+
+        def handle_path_exist(check_path):
+            if check_path == fake_path:
+                return True
+            elif fake_path_2 and check_path == fake_path_2:
+                return True
+            else:
+                return False
+
+        def handle_path_remove(remove_path):
+            if remove_path == fake_path_remove:
+                return True
+            elif fake_path_remove_2 and remove_path == fake_path_remove_2:
+                return True
+            elif (fake_path_remove_init and
+                  remove_path == fake_path_remove_init):
+                return True
+            else:
+                raise OSError
+
+        #
+        # remove ready file failure
+        #
+        fake_path = os.path.join(mounted_path, 'ready')
+        fake_path_remove = os.path.join(mounted_path, 'no_ready')
+
+        patcher_exist = patch('os.path.exists')
+        patcher_remove = patch('os.remove')
+        path_exist = patcher_exist.start()
+        path_remove = patcher_remove.start()
+        path_exist.side_effect = handle_path_exist
+        path_remove.side_effect = handle_path_remove
+        with patch.multiple(
+                main,
+                path_exist,
+                path_remove,
+                get_conf=lambda cluster, **kwargs: True,
+        ):
+            self.assertRaises(Exception, main._remove_osd_directory_files,
+                              'somewhere', cluster)
+
+        #
+        # remove active fil failure
+        #
+        fake_path = os.path.join(mounted_path, 'ready')
+        fake_path_2 = os.path.join(mounted_path, 'active')
+        fake_path_remove = os.path.join(mounted_path, 'ready')
+        fake_path_remove_2 = os.path.join(mounted_path, 'no_active')
+
+        patcher_exist = patch('os.path.exists')
+        patcher_remove = patch('os.remove')
+        path_exist = patcher_exist.start()
+        path_remove = patcher_remove.start()
+        path_exist.side_effect = handle_path_exist
+        path_remove.side_effect = handle_path_remove
+        with patch.multiple(
+                main,
+                path_exist,
+                path_remove,
+                get_conf=lambda cluster, **kwargs: True,
+        ):
+            self.assertRaises(Exception, main._remove_osd_directory_files,
+                              'somewhere', cluster)
+
+        #
+        # conf_val is None and remove init file failure
+        #
+        fake_path = os.path.join(mounted_path, 'ready')
+        fake_path_2 = os.path.join(mounted_path, 'active')
+        fake_path_remove = os.path.join(mounted_path, 'ready')
+        fake_path_remove_2 = os.path.join(mounted_path, 'active')
+        fake_path_remove_init = os.path.join(mounted_path, 'init_failure')
+
+        patcher_exist = patch('os.path.exists')
+        patcher_remove = patch('os.remove')
+        path_exist = patcher_exist.start()
+        path_remove = patcher_remove.start()
+        path_exist.side_effect = handle_path_exist
+        path_remove.side_effect = handle_path_remove
+        with patch.multiple(
+                main,
+                path_exist,
+                path_remove,
+                get_conf=lambda cluster, **kwargs: None,
+                init_get=lambda: 'upstart',
+        ):
+            self.assertRaises(Exception, main._remove_osd_directory_files,
+                              'somewhere', cluster)
+
+        #
+        # already remove `ready`, `active` and remove init file successfully
+        #
+        fake_path = os.path.join(mounted_path, 'no_ready')
+        fake_path_2 = os.path.join(mounted_path, 'no_active')
+        fake_path_remove = os.path.join(mounted_path, 'upstart')
+
+        patcher_exist = patch('os.path.exists')
+        patcher_remove = patch('os.remove')
+        path_exist = patcher_exist.start()
+        path_remove = patcher_remove.start()
+        path_exist.side_effect = handle_path_exist
+        path_remove.side_effect = handle_path_remove
+        with patch.multiple(
+                main,
+                path_exist,
+                path_remove,
+                get_conf=lambda cluster, **kwargs: 'upstart',
+        ):
+            main._remove_osd_directory_files('somewhere', cluster)
+
+    def test_path_set_context(self):
+        path = '/somewhere'
+        with patch.multiple(
+                main,
+                get_ceph_user=lambda **kwargs: 'ceph',
+        ):
+            main.path_set_context(path)
+
+    def test_mount(self):
+        #
+        # None to mount
+        #
+        dev = None
+        fs_type = 'ext4'
+        option = ''
+        self.assertRaises(Exception, main.mount, dev, fs_type, option)
+
+        #
+        # fstype undefine
+        #
+        dev = '/dev/Xda1'
+        fs_type = None
+        option = ''
+        self.assertRaises(Exception, main.mount, dev, fs_type, option)
+
+        #
+        # mount failure
+        #
+        dev = '/dev/Xda1'
+        fstype = 'ext4'
+        options = ''
+        with patch('tempfile.mkdtemp', return_value='/mnt'):
+            self.assertRaises(Exception, main.mount, dev, fstype, options)
+
+        #
+        # mount successfully
+        #
+        def create_temp_directory(*args, **kwargs):
+            return '/mnt'
+
+        dev = '/dev/Xda1'
+        fstype = 'ext4'
+        options = ''
+        patcher = patch('tempfile.mkdtemp')
+        create_tmpdir = patcher.start()
+        create_tmpdir.side_effect = create_temp_directory
+        with patch.multiple(
+                main,
+                create_tmpdir,
+                command_check_call=lambda cmd: True,
+        ):
+            main.mount(dev, fstype, options)
+
+    def test_umount(self):
+        #
+        # umount failure
+        #
+        path = '/somewhere'
+        self.assertRaises(Exception, main.unmount, path)
+
+        #
+        # umount successfully
+        #
+        def remove_directory_successfully(path):
+            return True
+
+        path = '/somewhere'
+        patcher = patch('os.rmdir')
+        rm_directory = patcher.start()
+        rm_directory.side_effect = remove_directory_successfully
+        with patch.multiple(
+                main,
+                rm_directory,
+                command_check_call=lambda cmd: True,
+        ):
+            main.unmount(path)
+
+    def test_main_destroy(self):
+        data = tempfile.mkdtemp()
+        main.setup_statedir(data)
+        OSD_UUID = '4fbd7e29-9d25-41b8-afd0-062c0ceff05d'
+        MPATH_OSD_UUID = '4fbd7e29-8ae0-4982-bf9d-5a8d867af560'
+        part_uuid = '0ce28a16-6d5d-11e5-aec3-fa163e5c167b'
+        journal_uuid = "7ad5e65a-0ca5-40e4-a896-62a74ca61c55"
+        mount_5566 = '/var/lib/ceph/osd/ceph-5566/'
+
+        fake_devices_normal = [{'path': '/dev/sdY',
+                                'partitions': [{
+                                    'dmcrypt': {},
+                                    'type': 'osd',
+                                    'ptype': OSD_UUID,
+                                    'path': '/dev/sdY1',
+                                    'whoami': '5566',
+                                    'mount': mount_5566,
+                                    'uuid': part_uuid,
+                                    'journal_uuid': journal_uuid}]},
+                               {'path': '/dev/sdX',
+                                'partitions': [{
+                                    'dmcrypt': {},
+                                    'type': 'osd',
+                                    'ptype': MPATH_OSD_UUID,
+                                    'path': '/dev/sdX1',
+                                    'whoami': '7788',
+                                    'mount': '/var/lib/ceph/osd/ceph-7788/',
+                                    'uuid': part_uuid,
+                                    'journal_uuid': journal_uuid}]}]
+
+        def list_devices_return():
+            return fake_devices_normal
+
+        #
+        # input device is not the device partition
+        #
+        args = main.parse_args(['destroy', '--cluster', 'ceph', '/dev/sdX'])
+        with patch.multiple(
+                main,
+                is_partition=lambda path: False,
+        ):
+            self.assertRaises(Exception, main.main_destroy, args)
+
+        #
+        # skip the redundent devices and not found by dev
+        #
+        args = main.parse_args(['destroy', '--cluster', 'ceph', '/dev/sdZ1'])
+        with patch.multiple(
+                main,
+                is_partition=lambda path: True,
+                list_devices=list_devices_return,
+        ):
+            self.assertRaises(Exception, main.main_destroy, args)
+
+        #
+        # skip the redundent devices and not found by osd-id
+        #
+        args = main.parse_args(['destroy', '--cluster', 'ceph',
+                                '--destroy-by-id', '1234'])
+        with patch.multiple(
+                main,
+                is_partition=lambda path: True,
+                list_devices=list_devices_return,
+        ):
+            self.assertRaises(Exception, main.main_destroy, args)
+
+        #
+        # skip the redundent devices and found by dev
+        #
+        args = main.parse_args(['destroy', '--cluster',
+                                'ceph', '/dev/sdY1', '--zap'])
+        with patch.multiple(
+                main,
+                is_partition=lambda path: True,
+                list_devices=list_devices_return,
+                get_partition_base=lambda dev_path: '/dev/sdY',
+                _check_osd_status=lambda cluster, osd_id: 0,
+                zap=lambda dev: True
+        ):
+            main.main_destroy(args)
+
+        #
+        # skip the redundent devices and found by osd-id
+        # with active status and MPATH_OSD
+        #
+        args = main.parse_args(['destroy', '--cluster', 'ceph',
+                                '--destroy-by-id', '7788'])
+        with patch.multiple(
+                main,
+                is_partition=lambda path: True,
+                list_devices=list_devices_return,
+                get_partition_base_mpath=lambda dev_path: '/dev/sdX',
+                _check_osd_status=lambda cluster, osd_id: 1,
+        ):
+            self.assertRaises(Exception, main.main_destroy, args)
+        shutil.rmtree(data)
+
+    def test_main_fix(self):
+        if platform.system() == "FreeBSD":
+            return
+
+        args = main.parse_args(['fix', '--all', '--selinux', '--permissions'])
+        commands = []
+
+        def _command(x):
+            commands.append(" ".join(x))
+            return ("", "", None)
+
+        class Os(object):
+            F_OK = 0
+
+            @staticmethod
+            def access(x, y):
+                return True
+
+        with patch.multiple(
+            main,
+            command=_command,
+            command_init=lambda x: commands.append(x),
+            command_wait=lambda x: None,
+            os=Os,
+        ):
+            main.main_fix(args)
+            commands = " ".join(commands)
+            assert '/var/lib/ceph' in commands
+            assert 'restorecon' in commands
+            assert 'chown' in commands
+            assert 'find' in commands
+
+
+def raise_command_error(*args):
+    e = subprocess.CalledProcessError('aaa', 'bbb', 'ccc')
+    raise e