Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / pybind / test_cephfs.py
1 # vim: expandtab smarttab shiftwidth=4 softtabstop=4
2 from nose.tools import assert_raises, assert_equal, with_setup
3 import cephfs as libcephfs
4 import fcntl
5 import os
6
7 cephfs = None
8
9 def setup_module():
10     global cephfs
11     cephfs = libcephfs.LibCephFS(conffile='')
12     cephfs.mount()
13
14 def teardown_module():
15     global cephfs
16     cephfs.shutdown()
17
18 def setup_test():
19     d = cephfs.opendir(b"/")
20     dent = cephfs.readdir(d)
21     while dent:
22         if (dent.d_name not in [b".", b".."]):
23             if dent.is_dir():
24                 cephfs.rmdir(b"/" + dent.d_name)
25             else:
26                 cephfs.unlink(b"/" + dent.d_name)
27
28         dent = cephfs.readdir(d)
29
30     cephfs.closedir(d)
31
32     cephfs.chdir(b"/")
33
34 @with_setup(setup_test)
35 def test_conf_get():
36     fsid = cephfs.conf_get("fsid")
37     assert(len(fsid) > 0)
38
39 @with_setup(setup_test)
40 def test_version():
41     cephfs.version()
42
43 @with_setup(setup_test)
44 def test_fstat():
45     fd = cephfs.open(b'file-1', 'w', 0o755)
46     stat = cephfs.fstat(fd)
47     assert(len(stat) == 13)
48     cephfs.close(fd)
49
50 @with_setup(setup_test)
51 def test_statfs():
52     stat = cephfs.statfs(b'/')
53     assert(len(stat) == 11)
54
55 @with_setup(setup_test)
56 def test_syncfs():
57     stat = cephfs.sync_fs()
58
59 @with_setup(setup_test)
60 def test_fsync():
61     fd = cephfs.open(b'file-1', 'w', 0o755)
62     cephfs.write(fd, b"asdf", 0)
63     stat = cephfs.fsync(fd, 0)
64     cephfs.write(fd, b"qwer", 0)
65     stat = cephfs.fsync(fd, 1)
66     cephfs.close(fd)
67     #sync on non-existing fd (assume fd 12345 is not exists)
68     assert_raises(libcephfs.Error, cephfs.fsync, 12345, 0)
69
70 @with_setup(setup_test)
71 def test_directory():
72     cephfs.mkdir(b"/temp-directory", 0o755)
73     cephfs.mkdirs(b"/temp-directory/foo/bar", 0o755)
74     cephfs.chdir(b"/temp-directory")
75     assert_equal(cephfs.getcwd(), b"/temp-directory")
76     cephfs.rmdir(b"/temp-directory/foo/bar")
77     cephfs.rmdir(b"/temp-directory/foo")
78     cephfs.rmdir(b"/temp-directory")
79     assert_raises(libcephfs.ObjectNotFound, cephfs.chdir, b"/temp-directory")
80
81 @with_setup(setup_test)
82 def test_walk_dir():
83     cephfs.chdir(b"/")
84     dirs = [b"dir-1", b"dir-2", b"dir-3"]
85     for i in dirs:
86         cephfs.mkdir(i, 0o755)
87     handler = cephfs.opendir(b"/")
88     d = cephfs.readdir(handler)
89     dirs += [b".", b".."]
90     while d:
91         assert(d.d_name in dirs)
92         dirs.remove(d.d_name)
93         d = cephfs.readdir(handler)
94     assert(len(dirs) == 0)
95     dirs = [b"/dir-1", b"/dir-2", b"/dir-3"]
96     for i in dirs:
97         cephfs.rmdir(i)
98     cephfs.closedir(handler)
99
100 @with_setup(setup_test)
101 def test_xattr():
102     assert_raises(libcephfs.OperationNotSupported, cephfs.setxattr, "/", "key", b"value", 0)
103     cephfs.setxattr("/", "user.key", b"value", 0)
104     assert_equal(b"value", cephfs.getxattr("/", "user.key"))
105
106     cephfs.setxattr("/", "user.big", b"x" * 300, 0)
107
108     # Default size is 255, get ERANGE
109     assert_raises(libcephfs.OutOfRange, cephfs.getxattr, "/", "user.big")
110
111     # Pass explicit size, and we'll get the value
112     assert_equal(300, len(cephfs.getxattr("/", "user.big", 300)))
113
114
115 @with_setup(setup_test)
116 def test_rename():
117     cephfs.mkdir(b"/a", 0o755)
118     cephfs.mkdir(b"/a/b", 0o755)
119     cephfs.rename(b"/a", b"/b")
120     cephfs.stat(b"/b/b")
121     cephfs.rmdir(b"/b/b")
122     cephfs.rmdir(b"/b")
123
124 @with_setup(setup_test)
125 def test_open():
126     assert_raises(libcephfs.ObjectNotFound, cephfs.open, b'file-1', 'r')
127     assert_raises(libcephfs.ObjectNotFound, cephfs.open, b'file-1', 'r+')
128     fd = cephfs.open(b'file-1', 'w', 0o755)
129     cephfs.write(fd, b"asdf", 0)
130     cephfs.close(fd)
131     fd = cephfs.open(b'file-1', 'r', 0o755)
132     assert_equal(cephfs.read(fd, 0, 4), b"asdf")
133     cephfs.close(fd)
134     fd = cephfs.open(b'file-1', 'r+', 0o755)
135     cephfs.write(fd, b"zxcv", 4)
136     assert_equal(cephfs.read(fd, 4, 8), b"zxcv")
137     cephfs.close(fd)
138     fd = cephfs.open(b'file-1', 'w+', 0o755)
139     assert_equal(cephfs.read(fd, 0, 4), b"")
140     cephfs.write(fd, b"zxcv", 4)
141     assert_equal(cephfs.read(fd, 4, 8), b"zxcv")
142     cephfs.close(fd)
143     fd = cephfs.open(b'file-1', os.O_RDWR, 0o755)
144     cephfs.write(fd, b"asdf", 0)
145     assert_equal(cephfs.read(fd, 0, 4), b"asdf")
146     cephfs.close(fd)
147     assert_raises(libcephfs.OperationNotSupported, cephfs.open, b'file-1', 'a')
148     cephfs.unlink(b'file-1')
149
150 @with_setup(setup_test)
151 def test_link():
152     fd = cephfs.open(b'file-1', 'w', 0o755)
153     cephfs.write(fd, b"1111", 0)
154     cephfs.close(fd)
155     cephfs.link(b'file-1', b'file-2')
156     fd = cephfs.open(b'file-2', 'r', 0o755)
157     assert_equal(cephfs.read(fd, 0, 4), b"1111")
158     cephfs.close(fd)
159     fd = cephfs.open(b'file-2', 'r+', 0o755)
160     cephfs.write(fd, b"2222", 4)
161     cephfs.close(fd)
162     fd = cephfs.open(b'file-1', 'r', 0o755)
163     assert_equal(cephfs.read(fd, 0, 8), b"11112222")
164     cephfs.close(fd)
165     cephfs.unlink(b'file-2')
166
167 @with_setup(setup_test)
168 def test_symlink():
169     fd = cephfs.open(b'file-1', 'w', 0o755)
170     cephfs.write(fd, b"1111", 0)
171     cephfs.close(fd)
172     cephfs.symlink(b'file-1', b'file-2')
173     fd = cephfs.open(b'file-2', 'r', 0o755)
174     assert_equal(cephfs.read(fd, 0, 4), b"1111")
175     cephfs.close(fd)
176     fd = cephfs.open(b'file-2', 'r+', 0o755)
177     cephfs.write(fd, b"2222", 4)
178     cephfs.close(fd)
179     fd = cephfs.open(b'file-1', 'r', 0o755)
180     assert_equal(cephfs.read(fd, 0, 8), b"11112222")
181     cephfs.close(fd)
182     cephfs.unlink(b'file-2')
183
184 @with_setup(setup_test)
185 def test_readlink():
186     fd = cephfs.open(b'/file-1', 'w', 0o755)
187     cephfs.write(fd, b"1111", 0)
188     cephfs.close(fd)
189     cephfs.symlink(b'/file-1', b'/file-2')
190     d = cephfs.readlink(b"/file-2",100)
191     assert_equal(d, b"/file-1")
192     cephfs.unlink(b'/file-2')
193     cephfs.unlink(b'/file-1')
194
195 @with_setup(setup_test)
196 def test_delete_cwd():
197     assert_equal(b"/", cephfs.getcwd())
198
199     cephfs.mkdir(b"/temp-directory", 0o755)
200     cephfs.chdir(b"/temp-directory")
201     cephfs.rmdir(b"/temp-directory")
202
203     # getcwd gives you something stale here: it remembers the path string
204     # even when things are unlinked.  It's up to the caller to find out
205     # whether it really still exists
206     assert_equal(b"/temp-directory", cephfs.getcwd())
207
208 @with_setup(setup_test)
209 def test_flock():
210     fd = cephfs.open(b'file-1', 'w', 0o755)
211
212     cephfs.flock(fd, fcntl.LOCK_EX, 123);
213     fd2 = cephfs.open(b'file-1', 'w', 0o755)
214
215     assert_raises(libcephfs.WouldBlock, cephfs.flock, fd2,
216                   fcntl.LOCK_EX | fcntl.LOCK_NB, 456);
217     cephfs.close(fd2)
218
219     cephfs.close(fd)
220
221 @with_setup(setup_test)
222 def test_mount_unmount():
223     test_directory()
224     cephfs.unmount()
225     cephfs.mount()
226     test_open()