1 # vim: expandtab smarttab shiftwidth=4 softtabstop=4
2 from nose.tools import assert_raises, assert_equal, with_setup
3 import cephfs as libcephfs
11 cephfs = libcephfs.LibCephFS(conffile='')
14 def teardown_module():
19 d = cephfs.opendir(b"/")
20 dent = cephfs.readdir(d)
22 if (dent.d_name not in [b".", b".."]):
24 cephfs.rmdir(b"/" + dent.d_name)
26 cephfs.unlink(b"/" + dent.d_name)
28 dent = cephfs.readdir(d)
34 @with_setup(setup_test)
36 fsid = cephfs.conf_get("fsid")
39 @with_setup(setup_test)
43 @with_setup(setup_test)
45 fd = cephfs.open(b'file-1', 'w', 0o755)
46 stat = cephfs.fstat(fd)
47 assert(len(stat) == 13)
50 @with_setup(setup_test)
52 stat = cephfs.statfs(b'/')
53 assert(len(stat) == 11)
55 @with_setup(setup_test)
57 stat = cephfs.sync_fs()
59 @with_setup(setup_test)
61 fd = cephfs.open(b'file-1', 'w', 0o755)
62 cephfs.write(fd, b"asdf", 0)
63 stat = cephfs.fsync(fd, 0)
64 cephfs.write(fd, b"qwer", 0)
65 stat = cephfs.fsync(fd, 1)
67 #sync on non-existing fd (assume fd 12345 is not exists)
68 assert_raises(libcephfs.Error, cephfs.fsync, 12345, 0)
70 @with_setup(setup_test)
72 cephfs.mkdir(b"/temp-directory", 0o755)
73 cephfs.mkdirs(b"/temp-directory/foo/bar", 0o755)
74 cephfs.chdir(b"/temp-directory")
75 assert_equal(cephfs.getcwd(), b"/temp-directory")
76 cephfs.rmdir(b"/temp-directory/foo/bar")
77 cephfs.rmdir(b"/temp-directory/foo")
78 cephfs.rmdir(b"/temp-directory")
79 assert_raises(libcephfs.ObjectNotFound, cephfs.chdir, b"/temp-directory")
81 @with_setup(setup_test)
84 dirs = [b"dir-1", b"dir-2", b"dir-3"]
86 cephfs.mkdir(i, 0o755)
87 handler = cephfs.opendir(b"/")
88 d = cephfs.readdir(handler)
91 assert(d.d_name in dirs)
93 d = cephfs.readdir(handler)
94 assert(len(dirs) == 0)
95 dirs = [b"/dir-1", b"/dir-2", b"/dir-3"]
98 cephfs.closedir(handler)
100 @with_setup(setup_test)
102 assert_raises(libcephfs.OperationNotSupported, cephfs.setxattr, "/", "key", b"value", 0)
103 cephfs.setxattr("/", "user.key", b"value", 0)
104 assert_equal(b"value", cephfs.getxattr("/", "user.key"))
106 cephfs.setxattr("/", "user.big", b"x" * 300, 0)
108 # Default size is 255, get ERANGE
109 assert_raises(libcephfs.OutOfRange, cephfs.getxattr, "/", "user.big")
111 # Pass explicit size, and we'll get the value
112 assert_equal(300, len(cephfs.getxattr("/", "user.big", 300)))
115 @with_setup(setup_test)
117 cephfs.mkdir(b"/a", 0o755)
118 cephfs.mkdir(b"/a/b", 0o755)
119 cephfs.rename(b"/a", b"/b")
121 cephfs.rmdir(b"/b/b")
124 @with_setup(setup_test)
126 assert_raises(libcephfs.ObjectNotFound, cephfs.open, b'file-1', 'r')
127 assert_raises(libcephfs.ObjectNotFound, cephfs.open, b'file-1', 'r+')
128 fd = cephfs.open(b'file-1', 'w', 0o755)
129 cephfs.write(fd, b"asdf", 0)
131 fd = cephfs.open(b'file-1', 'r', 0o755)
132 assert_equal(cephfs.read(fd, 0, 4), b"asdf")
134 fd = cephfs.open(b'file-1', 'r+', 0o755)
135 cephfs.write(fd, b"zxcv", 4)
136 assert_equal(cephfs.read(fd, 4, 8), b"zxcv")
138 fd = cephfs.open(b'file-1', 'w+', 0o755)
139 assert_equal(cephfs.read(fd, 0, 4), b"")
140 cephfs.write(fd, b"zxcv", 4)
141 assert_equal(cephfs.read(fd, 4, 8), b"zxcv")
143 fd = cephfs.open(b'file-1', os.O_RDWR, 0o755)
144 cephfs.write(fd, b"asdf", 0)
145 assert_equal(cephfs.read(fd, 0, 4), b"asdf")
147 assert_raises(libcephfs.OperationNotSupported, cephfs.open, b'file-1', 'a')
148 cephfs.unlink(b'file-1')
150 @with_setup(setup_test)
152 fd = cephfs.open(b'file-1', 'w', 0o755)
153 cephfs.write(fd, b"1111", 0)
155 cephfs.link(b'file-1', b'file-2')
156 fd = cephfs.open(b'file-2', 'r', 0o755)
157 assert_equal(cephfs.read(fd, 0, 4), b"1111")
159 fd = cephfs.open(b'file-2', 'r+', 0o755)
160 cephfs.write(fd, b"2222", 4)
162 fd = cephfs.open(b'file-1', 'r', 0o755)
163 assert_equal(cephfs.read(fd, 0, 8), b"11112222")
165 cephfs.unlink(b'file-2')
167 @with_setup(setup_test)
169 fd = cephfs.open(b'file-1', 'w', 0o755)
170 cephfs.write(fd, b"1111", 0)
172 cephfs.symlink(b'file-1', b'file-2')
173 fd = cephfs.open(b'file-2', 'r', 0o755)
174 assert_equal(cephfs.read(fd, 0, 4), b"1111")
176 fd = cephfs.open(b'file-2', 'r+', 0o755)
177 cephfs.write(fd, b"2222", 4)
179 fd = cephfs.open(b'file-1', 'r', 0o755)
180 assert_equal(cephfs.read(fd, 0, 8), b"11112222")
182 cephfs.unlink(b'file-2')
184 @with_setup(setup_test)
186 fd = cephfs.open(b'/file-1', 'w', 0o755)
187 cephfs.write(fd, b"1111", 0)
189 cephfs.symlink(b'/file-1', b'/file-2')
190 d = cephfs.readlink(b"/file-2",100)
191 assert_equal(d, b"/file-1")
192 cephfs.unlink(b'/file-2')
193 cephfs.unlink(b'/file-1')
195 @with_setup(setup_test)
196 def test_delete_cwd():
197 assert_equal(b"/", cephfs.getcwd())
199 cephfs.mkdir(b"/temp-directory", 0o755)
200 cephfs.chdir(b"/temp-directory")
201 cephfs.rmdir(b"/temp-directory")
203 # getcwd gives you something stale here: it remembers the path string
204 # even when things are unlinked. It's up to the caller to find out
205 # whether it really still exists
206 assert_equal(b"/temp-directory", cephfs.getcwd())
208 @with_setup(setup_test)
210 fd = cephfs.open(b'file-1', 'w', 0o755)
212 cephfs.flock(fd, fcntl.LOCK_EX, 123);
213 fd2 = cephfs.open(b'file-1', 'w', 0o755)
215 assert_raises(libcephfs.WouldBlock, cephfs.flock, fd2,
216 fcntl.LOCK_EX | fcntl.LOCK_NB, 456);
221 @with_setup(setup_test)
222 def test_mount_unmount():