Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / ceph-volume / ceph_volume / tests / util / test_system.py
1 import os
2 import pwd
3 import getpass
4 import pytest
5 from textwrap import dedent
6 from ceph_volume.util import system
7
8
9 class TestMkdirP(object):
10
11     def test_existing_dir_does_not_raise_w_chown(self, monkeypatch, tmpdir):
12         user = pwd.getpwnam(getpass.getuser())
13         uid, gid = user[2], user[3]
14         monkeypatch.setattr(system, 'get_ceph_user_ids', lambda: (uid, gid,))
15         path = str(tmpdir)
16         system.mkdir_p(path)
17         assert os.path.isdir(path)
18
19     def test_new_dir_w_chown(self, monkeypatch, tmpdir):
20         user = pwd.getpwnam(getpass.getuser())
21         uid, gid = user[2], user[3]
22         monkeypatch.setattr(system, 'get_ceph_user_ids', lambda: (uid, gid,))
23         path = os.path.join(str(tmpdir), 'new')
24         system.mkdir_p(path)
25         assert os.path.isdir(path)
26
27     def test_existing_dir_does_not_raise_no_chown(self, tmpdir):
28         path = str(tmpdir)
29         system.mkdir_p(path, chown=False)
30         assert os.path.isdir(path)
31
32     def test_new_dir_no_chown(self, tmpdir):
33         path = os.path.join(str(tmpdir), 'new')
34         system.mkdir_p(path, chown=False)
35         assert os.path.isdir(path)
36
37
38 @pytest.fixture
39 def fake_proc(tmpdir, monkeypatch):
40     PROCDIR = str(tmpdir)
41     proc_path = os.path.join(PROCDIR, 'mounts')
42     with open(proc_path, 'w') as f:
43         f.write(dedent("""nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
44             rootfs / rootfs rw 0 0
45             sysfs /sys sysfs rw,seclabel,nosuid,nodev,noexec,relatime 0 0
46             proc /proc proc rw,nosuid,nodev,noexec,relatime 0 0
47             devtmpfs /dev devtmpfs rw,seclabel,nosuid,size=238292k,nr_inodes=59573,mode=755 0 0
48             securityfs /sys/kernel/security securityfs rw,nosuid,nodev,noexec,relatime 0 0
49             tmpfs /dev/shm tmpfs rw,seclabel,nosuid,nodev 0 0
50             devpts /dev/pts devpts rw,seclabel,nosuid,noexec,relatime,gid=5,mode=620,ptmxmode=000 0 0
51             tmpfs /run tmpfs rw,seclabel,nosuid,nodev,mode=755 0 0
52             tmpfs /sys/fs/cgroup tmpfs ro,seclabel,nosuid,nodev,noexec,mode=755 0 0
53             cgroup /sys/fs/cgroup/systemd cgroup rw,nosuid,nodev,noexec,relatime,xattr,release_agent=/usr/lib/systemd/systemd-cgroups-agent,name=systemd 0 0
54             cgroup /sys/fs/cgroup/freezer cgroup rw,nosuid,nodev,noexec,relatime,freezer 0 0
55             configfs /sys/kernel/config configfs rw,relatime 0 0
56             /dev/mapper/VolGroup00-LogVol00 / xfs rw,seclabel,relatime,attr2,inode64,noquota 0 0
57             selinuxfs /sys/fs/selinux selinuxfs rw,relatime 0 0
58             debugfs /sys/kernel/debug debugfs rw,relatime 0 0
59             hugetlbfs /dev/hugepages hugetlbfs rw,seclabel,relatime 0 0
60             mqueue /dev/mqueue mqueue rw,seclabel,relatime 0 0
61             sunrpc /far/lib/nfs/rpc_pipefs rpc_pipefs rw,relatime 0 0
62             /dev/sde4 /two/field/path
63             nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
64             /dev/sde2 /boot xfs rw,seclabel,relatime,attr2,inode64,noquota 0 0
65             tmpfs /far/lib/ceph/osd/ceph-5 tmpfs rw,seclabel,relatime 0 0
66             tmpfs /far/lib/ceph/osd/ceph-7 tmpfs rw,seclabel,relatime 0 0
67             /dev/sda1 /far/lib/ceph/osd/ceph-0 xfs rw,seclabel,noatime,attr2,inode64,noquota 0 0
68             tmpfs /run/user/1000 tmpfs rw,seclabel,nosuid,nodev,relatime,size=50040k,mode=700,uid=1000,gid=1000 0 0
69             /dev/sdc2 /boot xfs rw,seclabel,relatime,attr2,inode64,noquota 0 0
70             tmpfs /run/user/1000 tmpfs rw,seclabel,mode=700,uid=1000,gid=1000 0 0"""))
71     monkeypatch.setattr(system, 'PROCDIR', PROCDIR)
72     monkeypatch.setattr(os.path, 'exists', lambda x: True)
73
74
75 class TestPathIsMounted(object):
76
77     def test_is_mounted(self, fake_proc):
78         assert system.path_is_mounted('/boot') is True
79
80     def test_is_not_mounted(self, fake_proc):
81         assert system.path_is_mounted('/far/fib/feph') is False
82
83     def test_is_not_mounted_at_destination(self, fake_proc):
84         assert system.path_is_mounted('/boot', destination='/dev/sda1') is False
85
86     def test_is_mounted_at_destination(self, fake_proc):
87         assert system.path_is_mounted('/boot', destination='/dev/sdc2') is True
88
89
90 class TestDeviceIsMounted(object):
91
92     def test_is_mounted(self, fake_proc):
93         assert system.device_is_mounted('/dev/sda1') is True
94
95     def test_path_is_not_device(self, fake_proc):
96         assert system.device_is_mounted('/far/lib/ceph/osd/ceph-7') is False
97
98     def test_is_not_mounted_at_destination(self, fake_proc):
99         assert system.device_is_mounted('/dev/sda1', destination='/far/lib/ceph/osd/test-1') is False
100
101     def test_is_mounted_at_destination(self, fake_proc):
102         assert system.device_is_mounted('/dev/sda1', destination='/far/lib/ceph/osd/ceph-7') is False
103
104
105 class TestGetMounts(object):
106
107     def test_not_mounted(self, tmpdir, monkeypatch):
108         PROCDIR = str(tmpdir)
109         proc_path = os.path.join(PROCDIR, 'mounts')
110         with open(proc_path, 'w') as f:
111             f.write('')
112         monkeypatch.setattr(system, 'PROCDIR', PROCDIR)
113         assert system.get_mounts() == {}
114
115     def test_is_mounted_(self, fake_proc):
116         result = system.get_mounts()
117         assert result['/dev/sdc2'] == ['/boot']
118
119     def test_ignores_two_fields(self, fake_proc):
120         result = system.get_mounts()
121         assert result.get('/dev/sde4') is None
122
123     def test_tmpfs_is_reported(self, fake_proc):
124         result = system.get_mounts()
125         assert result['tmpfs'][0] == '/dev/shm'
126
127     def test_non_skip_devs_arent_reported(self, fake_proc):
128         result = system.get_mounts()
129         assert result.get('cgroup') is None
130
131     def test_multiple_mounts_are_appended(self, fake_proc):
132         result = system.get_mounts()
133         assert len(result['tmpfs']) == 7
134
135     def test_nonexistent_devices_are_skipped(self, tmpdir, monkeypatch):
136         PROCDIR = str(tmpdir)
137         proc_path = os.path.join(PROCDIR, 'mounts')
138         with open(proc_path, 'w') as f:
139             f.write(dedent("""nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
140                     /dev/sda1 /far/lib/ceph/osd/ceph-0 xfs rw,attr2,inode64,noquota 0 0
141                     /dev/sda2 /far/lib/ceph/osd/ceph-1 xfs rw,attr2,inode64,noquota 0 0"""))
142         monkeypatch.setattr(system, 'PROCDIR', PROCDIR)
143         monkeypatch.setattr(os.path, 'exists', lambda x: False if x == '/dev/sda1' else True)
144         result = system.get_mounts()
145         assert result.get('/dev/sda1') is None
146
147
148 class TestIsBinary(object):
149
150     def test_is_binary(self, tmpfile):
151         binary_path = tmpfile(contents='asd\n\nlkjh\x00')
152         assert system.is_binary(binary_path)
153
154     def test_is_not_binary(self, tmpfile):
155         binary_path = tmpfile(contents='asd\n\nlkjh0')
156         assert system.is_binary(binary_path) is False