Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / workunits / fs / misc / direct_io.py
1 #!/usr/bin/python
2
3 import json
4 import mmap
5 import os
6 import subprocess
7
8
9 def get_data_pool():
10     cmd = ['ceph', 'fs', 'ls', '--format=json-pretty']
11     proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
12     out = proc.communicate()[0]
13     return json.loads(out)[0]['data_pools'][0]
14
15
16 def main():
17     fd = os.open("testfile", os.O_RDWR | os.O_CREAT | os.O_TRUNC | os.O_DIRECT, 0o644)
18
19     ino = os.fstat(fd).st_ino
20     obj_name = "{ino:x}.00000000".format(ino=ino)
21     pool_name = get_data_pool()
22
23     buf = mmap.mmap(-1, 1)
24     buf.write('1')
25     os.write(fd, buf)
26
27     proc = subprocess.Popen(['rados', '-p', pool_name, 'get', obj_name, 'tmpfile'])
28     proc.wait()
29
30     with open('tmpfile', 'r') as tmpf:
31         out = tmpf.read()
32         if out != '1':
33             raise RuntimeError("data were not written to object store directly")
34
35     with open('tmpfile', 'w') as tmpf:
36         tmpf.write('2')
37
38     proc = subprocess.Popen(['rados', '-p', pool_name, 'put', obj_name, 'tmpfile'])
39     proc.wait()
40
41     os.lseek(fd, 0, os.SEEK_SET)
42     out = os.read(fd, 1)
43     if out != '2':
44         raise RuntimeError("data were not directly read from object store")
45
46     os.close(fd)
47     print('ok')
48
49
50 main()