Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / pybind / test_rgwfs.py
1 # vim: expandtab smarttab shiftwidth=4 softtabstop=4
2 from nose.tools import assert_raises, assert_equal, with_setup
3 import rgw as librgwfs
4
5 rgwfs = None
6 root_handler = None
7 root_dir_handler = None
8
9
10 def setup_module():
11     global rgwfs
12     global root_handler
13     rgwfs = librgwfs.LibRGWFS("testid", "", "")
14     root_handler = rgwfs.mount()
15
16
17 def teardown_module():
18     global rgwfs
19     rgwfs.shutdown()
20
21
22 def setup_test():
23     global root_dir_handler
24
25     names = []
26
27     try:
28         root_dir_handler = rgwfs.opendir(root_handler, b"bucket", 0)
29     except Exception:
30         root_dir_handler = rgwfs.mkdir(root_handler, b"bucket", 0)
31
32     def cb(name, offset):
33         names.append(name)
34     rgwfs.readdir(root_dir_handler, cb, 0, 0)
35     for name in names:
36         rgwfs.unlink(root_dir_handler, name, 0)
37
38
39 @with_setup(setup_test)
40 def test_version():
41     rgwfs.version()
42
43
44 @with_setup(setup_test)
45 def test_fstat():
46     stat = rgwfs.fstat(root_dir_handler)
47     assert(len(stat) == 13)
48     file_handler = rgwfs.create(root_dir_handler, b'file-1', 0)
49     stat = rgwfs.fstat(file_handler)
50     assert(len(stat) == 13)
51     rgwfs.close(file_handler)
52
53
54 @with_setup(setup_test)
55 def test_statfs():
56     stat = rgwfs.statfs()
57     assert(len(stat) == 11)
58
59
60 @with_setup(setup_test)
61 def test_fsync():
62     fd = rgwfs.create(root_dir_handler, b'file-1', 0)
63     rgwfs.write(fd, 0, b"asdf")
64     rgwfs.fsync(fd, 0)
65     rgwfs.write(fd, 4, b"qwer")
66     rgwfs.fsync(fd, 1)
67     rgwfs.close(fd)
68
69
70 @with_setup(setup_test)
71 def test_directory():
72     dir_handler = rgwfs.mkdir(root_dir_handler, b"temp-directory", 0)
73     rgwfs.close(dir_handler)
74     rgwfs.unlink(root_dir_handler, b"temp-directory")
75
76
77 @with_setup(setup_test)
78 def test_walk_dir():
79     dirs = [b"dir-1", b"dir-2", b"dir-3"]
80     handles = []
81     for i in dirs:
82         d = rgwfs.mkdir(root_dir_handler, i, 0)
83         handles.append(d)
84     entries = []
85
86     def cb(name, offset):
87         entries.append((name, offset))
88
89     offset, eof = rgwfs.readdir(root_dir_handler, cb, 0)
90
91     for i in handles:
92         rgwfs.close(i)
93
94     for name, _ in entries:
95         assert(name in dirs)
96         rgwfs.unlink(root_dir_handler, name)
97
98
99 @with_setup(setup_test)
100 def test_rename():
101     file_handler = rgwfs.create(root_dir_handler, b"a", 0)
102     rgwfs.close(file_handler)
103     rgwfs.rename(root_dir_handler, b"a", root_dir_handler, b"b")
104     file_handler = rgwfs.open(root_dir_handler, b"b", 0)
105     rgwfs.fstat(file_handler)
106     rgwfs.close(file_handler)
107     rgwfs.unlink(root_dir_handler, b"b")
108
109
110 @with_setup(setup_test)
111 def test_open():
112     assert_raises(librgwfs.ObjectNotFound, rgwfs.open,
113                   root_dir_handler, b'file-1', 0)
114     assert_raises(librgwfs.ObjectNotFound, rgwfs.open,
115                   root_dir_handler, b'file-1', 0)
116     fd = rgwfs.create(root_dir_handler, b'file-1', 0)
117     rgwfs.write(fd, 0, b"asdf")
118     rgwfs.close(fd)
119     fd = rgwfs.open(root_dir_handler, b'file-1', 0)
120     assert_equal(rgwfs.read(fd, 0, 4), b"asdf")
121     rgwfs.close(fd)
122     fd = rgwfs.open(root_dir_handler, b'file-1', 0)
123     rgwfs.write(fd, 0, b"aaaazxcv")
124     rgwfs.close(fd)
125     fd = rgwfs.open(root_dir_handler, b'file-1', 0)
126     assert_equal(rgwfs.read(fd, 4, 4), b"zxcv")
127     rgwfs.close(fd)
128     fd = rgwfs.open(root_dir_handler, b'file-1', 0)
129     assert_equal(rgwfs.read(fd, 0, 4), b"aaaa")
130     rgwfs.close(fd)
131     rgwfs.unlink(root_dir_handler, b"file-1")
132
133
134 @with_setup(setup_test)
135 def test_mount_unmount():
136     global root_handler
137     global root_dir_handler
138     test_directory()
139     rgwfs.close(root_dir_handler)
140     rgwfs.close(root_handler)
141     rgwfs.unmount()
142     root_handler = rgwfs.mount()
143     root_dir_handler = rgwfs.opendir(root_handler, b"bucket", 0)
144     test_open()