+++ /dev/null
-# vim: expandtab smarttab shiftwidth=4 softtabstop=4
-from nose.tools import assert_raises, assert_equal, with_setup
-import rgw as librgwfs
-
-rgwfs = None
-root_handler = None
-root_dir_handler = None
-
-
-def setup_module():
- global rgwfs
- global root_handler
- rgwfs = librgwfs.LibRGWFS("testid", "", "")
- root_handler = rgwfs.mount()
-
-
-def teardown_module():
- global rgwfs
- rgwfs.shutdown()
-
-
-def setup_test():
- global root_dir_handler
-
- names = []
-
- try:
- root_dir_handler = rgwfs.opendir(root_handler, b"bucket", 0)
- except Exception:
- root_dir_handler = rgwfs.mkdir(root_handler, b"bucket", 0)
-
- def cb(name, offset):
- names.append(name)
- rgwfs.readdir(root_dir_handler, cb, 0, 0)
- for name in names:
- rgwfs.unlink(root_dir_handler, name, 0)
-
-
-@with_setup(setup_test)
-def test_version():
- rgwfs.version()
-
-
-@with_setup(setup_test)
-def test_fstat():
- stat = rgwfs.fstat(root_dir_handler)
- assert(len(stat) == 13)
- file_handler = rgwfs.create(root_dir_handler, b'file-1', 0)
- stat = rgwfs.fstat(file_handler)
- assert(len(stat) == 13)
- rgwfs.close(file_handler)
-
-
-@with_setup(setup_test)
-def test_statfs():
- stat = rgwfs.statfs()
- assert(len(stat) == 11)
-
-
-@with_setup(setup_test)
-def test_fsync():
- fd = rgwfs.create(root_dir_handler, b'file-1', 0)
- rgwfs.write(fd, 0, b"asdf")
- rgwfs.fsync(fd, 0)
- rgwfs.write(fd, 4, b"qwer")
- rgwfs.fsync(fd, 1)
- rgwfs.close(fd)
-
-
-@with_setup(setup_test)
-def test_directory():
- dir_handler = rgwfs.mkdir(root_dir_handler, b"temp-directory", 0)
- rgwfs.close(dir_handler)
- rgwfs.unlink(root_dir_handler, b"temp-directory")
-
-
-@with_setup(setup_test)
-def test_walk_dir():
- dirs = [b"dir-1", b"dir-2", b"dir-3"]
- handles = []
- for i in dirs:
- d = rgwfs.mkdir(root_dir_handler, i, 0)
- handles.append(d)
- entries = []
-
- def cb(name, offset):
- entries.append((name, offset))
-
- offset, eof = rgwfs.readdir(root_dir_handler, cb, 0)
-
- for i in handles:
- rgwfs.close(i)
-
- for name, _ in entries:
- assert(name in dirs)
- rgwfs.unlink(root_dir_handler, name)
-
-
-@with_setup(setup_test)
-def test_rename():
- file_handler = rgwfs.create(root_dir_handler, b"a", 0)
- rgwfs.close(file_handler)
- rgwfs.rename(root_dir_handler, b"a", root_dir_handler, b"b")
- file_handler = rgwfs.open(root_dir_handler, b"b", 0)
- rgwfs.fstat(file_handler)
- rgwfs.close(file_handler)
- rgwfs.unlink(root_dir_handler, b"b")
-
-
-@with_setup(setup_test)
-def test_open():
- assert_raises(librgwfs.ObjectNotFound, rgwfs.open,
- root_dir_handler, b'file-1', 0)
- assert_raises(librgwfs.ObjectNotFound, rgwfs.open,
- root_dir_handler, b'file-1', 0)
- fd = rgwfs.create(root_dir_handler, b'file-1', 0)
- rgwfs.write(fd, 0, b"asdf")
- rgwfs.close(fd)
- fd = rgwfs.open(root_dir_handler, b'file-1', 0)
- assert_equal(rgwfs.read(fd, 0, 4), b"asdf")
- rgwfs.close(fd)
- fd = rgwfs.open(root_dir_handler, b'file-1', 0)
- rgwfs.write(fd, 0, b"aaaazxcv")
- rgwfs.close(fd)
- fd = rgwfs.open(root_dir_handler, b'file-1', 0)
- assert_equal(rgwfs.read(fd, 4, 4), b"zxcv")
- rgwfs.close(fd)
- fd = rgwfs.open(root_dir_handler, b'file-1', 0)
- assert_equal(rgwfs.read(fd, 0, 4), b"aaaa")
- rgwfs.close(fd)
- rgwfs.unlink(root_dir_handler, b"file-1")
-
-
-@with_setup(setup_test)
-def test_mount_unmount():
- global root_handler
- global root_dir_handler
- test_directory()
- rgwfs.close(root_dir_handler)
- rgwfs.close(root_handler)
- rgwfs.unmount()
- root_handler = rgwfs.mount()
- root_dir_handler = rgwfs.opendir(root_handler, b"bucket", 0)
- test_open()