X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftest%2Fpybind%2Ftest_rados.py;fp=src%2Fceph%2Fsrc%2Ftest%2Fpybind%2Ftest_rados.py;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=88b8d2a92c8c73fbbb34acc1c0cfd170add6f90f;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/test/pybind/test_rados.py b/src/ceph/src/test/pybind/test_rados.py deleted file mode 100644 index 88b8d2a..0000000 --- a/src/ceph/src/test/pybind/test_rados.py +++ /dev/null @@ -1,1004 +0,0 @@ -from __future__ import print_function -from nose import SkipTest -from nose.tools import eq_ as eq, ok_ as ok, assert_raises -from rados import (Rados, Error, RadosStateError, Object, ObjectExists, - ObjectNotFound, ObjectBusy, requires, opt, - ANONYMOUS_AUID, ADMIN_AUID, LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx, - LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog) -import time -import threading -import json -import errno -import sys - -# Are we running Python 2.x -_python2 = sys.version_info[0] < 3 - -def test_rados_init_error(): - assert_raises(Error, Rados, conffile='', rados_id='admin', - name='client.admin') - assert_raises(Error, Rados, conffile='', name='invalid') - assert_raises(Error, Rados, conffile='', name='bad.invalid') - -def test_rados_init(): - with Rados(conffile='', rados_id='admin'): - pass - with Rados(conffile='', name='client.admin'): - pass - with Rados(conffile='', name='client.admin'): - pass - with Rados(conffile='', name='client.admin'): - pass - -def test_ioctx_context_manager(): - with Rados(conffile='', rados_id='admin') as conn: - with conn.open_ioctx('rbd') as ioctx: - pass - -def test_parse_argv(): - args = ['osd', 'pool', 'delete', 'foobar', 'foobar', '--yes-i-really-really-mean-it'] - r = Rados() - eq(args, r.conf_parse_argv(args)) - -def test_parse_argv_empty_str(): - args = [''] - r = Rados() - eq(args, r.conf_parse_argv(args)) - -class TestRequires(object): - @requires(('foo', str), ('bar', int), ('baz', int)) - def _method_plain(self, foo, bar, baz): - ok(isinstance(foo, str)) - ok(isinstance(bar, int)) - ok(isinstance(baz, int)) - return (foo, bar, baz) - - def test_method_plain(self): - assert_raises(TypeError, self._method_plain, 42, 42, 42) - assert_raises(TypeError, self._method_plain, '42', '42', '42') - assert_raises(TypeError, self._method_plain, foo='42', bar='42', baz='42') - eq(self._method_plain('42', 42, 42), ('42', 42, 42)) - eq(self._method_plain(foo='42', bar=42, baz=42), ('42', 42, 42)) - - @requires(('opt_foo', opt(str)), ('opt_bar', opt(int)), ('baz', int)) - def _method_with_opt_arg(self, foo, bar, baz): - ok(isinstance(foo, str) or foo is None) - ok(isinstance(bar, int) or bar is None) - ok(isinstance(baz, int)) - return (foo, bar, baz) - - def test_method_with_opt_args(self): - assert_raises(TypeError, self._method_with_opt_arg, 42, 42, 42) - assert_raises(TypeError, self._method_with_opt_arg, '42', '42', 42) - assert_raises(TypeError, self._method_with_opt_arg, None, None, None) - eq(self._method_with_opt_arg(None, 42, 42), (None, 42, 42)) - eq(self._method_with_opt_arg('42', None, 42), ('42', None, 42)) - eq(self._method_with_opt_arg(None, None, 42), (None, None, 42)) - - -class TestRadosStateError(object): - def _requires_configuring(self, rados): - assert_raises(RadosStateError, rados.connect) - - def _requires_configuring_or_connected(self, rados): - assert_raises(RadosStateError, rados.conf_read_file) - assert_raises(RadosStateError, rados.conf_parse_argv, None) - assert_raises(RadosStateError, rados.conf_parse_env) - assert_raises(RadosStateError, rados.conf_get, 'opt') - assert_raises(RadosStateError, rados.conf_set, 'opt', 'val') - assert_raises(RadosStateError, rados.ping_monitor, 0) - - def _requires_connected(self, rados): - assert_raises(RadosStateError, rados.pool_exists, 'foo') - assert_raises(RadosStateError, rados.pool_lookup, 'foo') - assert_raises(RadosStateError, rados.pool_reverse_lookup, 0) - assert_raises(RadosStateError, rados.create_pool, 'foo') - assert_raises(RadosStateError, rados.get_pool_base_tier, 0) - assert_raises(RadosStateError, rados.delete_pool, 'foo') - assert_raises(RadosStateError, rados.list_pools) - assert_raises(RadosStateError, rados.get_fsid) - assert_raises(RadosStateError, rados.open_ioctx, 'foo') - assert_raises(RadosStateError, rados.mon_command, '', b'') - assert_raises(RadosStateError, rados.osd_command, 0, '', b'') - assert_raises(RadosStateError, rados.pg_command, '', '', b'') - assert_raises(RadosStateError, rados.wait_for_latest_osdmap) - assert_raises(RadosStateError, rados.blacklist_add, '127.0.0.1/123', 0) - - def test_configuring(self): - rados = Rados(conffile='') - eq('configuring', rados.state) - self._requires_connected(rados) - - def test_connected(self): - rados = Rados(conffile='') - with rados: - eq('connected', rados.state) - self._requires_configuring(rados) - - def test_shutdown(self): - rados = Rados(conffile='') - with rados: - pass - eq('shutdown', rados.state) - self._requires_configuring(rados) - self._requires_configuring_or_connected(rados) - self._requires_connected(rados) - - -class TestRados(object): - - def setUp(self): - self.rados = Rados(conffile='') - self.rados.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH') - self.rados.conf_parse_env() - self.rados.connect() - - # Assume any pre-existing pools are the cluster's defaults - self.default_pools = self.rados.list_pools() - - def tearDown(self): - self.rados.shutdown() - - def test_ping_monitor(self): - assert_raises(ObjectNotFound, self.rados.ping_monitor, 'not_exists_monitor') - cmd = {'prefix': 'mon dump', 'format':'json'} - ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'') - for mon in json.loads(buf.decode('utf8'))['mons']: - while True: - output = self.rados.ping_monitor(mon['name']) - if output is None: - continue - buf = json.loads(output) - if buf.get('health'): - break - - def test_create(self): - self.rados.create_pool('foo') - self.rados.delete_pool('foo') - - def test_create_utf8(self): - if _python2: - # Use encoded bytestring - poolname = b"\351\273\204" - else: - poolname = "\u9ec4" - self.rados.create_pool(poolname) - assert self.rados.pool_exists(u"\u9ec4") - self.rados.delete_pool(poolname) - - def test_pool_lookup_utf8(self): - if _python2: - poolname = u'\u9ec4' - else: - poolname = '\u9ec4' - self.rados.create_pool(poolname) - try: - poolid = self.rados.pool_lookup(poolname) - eq(poolname, self.rados.pool_reverse_lookup(poolid)) - finally: - self.rados.delete_pool(poolname) - - def test_create_auid(self): - self.rados.create_pool('foo', 100) - assert self.rados.pool_exists('foo') - self.rados.delete_pool('foo') - - def test_eexist(self): - self.rados.create_pool('foo') - assert_raises(ObjectExists, self.rados.create_pool, 'foo') - self.rados.delete_pool('foo') - - def list_non_default_pools(self): - pools = self.rados.list_pools() - for p in self.default_pools: - pools.remove(p) - return set(pools) - - def test_list_pools(self): - eq(set(), self.list_non_default_pools()) - self.rados.create_pool('foo') - eq(set(['foo']), self.list_non_default_pools()) - self.rados.create_pool('bar') - eq(set(['foo', 'bar']), self.list_non_default_pools()) - self.rados.create_pool('baz') - eq(set(['foo', 'bar', 'baz']), self.list_non_default_pools()) - self.rados.delete_pool('foo') - eq(set(['bar', 'baz']), self.list_non_default_pools()) - self.rados.delete_pool('baz') - eq(set(['bar']), self.list_non_default_pools()) - self.rados.delete_pool('bar') - eq(set(), self.list_non_default_pools()) - self.rados.create_pool('a' * 500) - eq(set(['a' * 500]), self.list_non_default_pools()) - self.rados.delete_pool('a' * 500) - - def test_get_pool_base_tier(self): - self.rados.create_pool('foo') - try: - self.rados.create_pool('foo-cache') - try: - pool_id = self.rados.pool_lookup('foo') - tier_pool_id = self.rados.pool_lookup('foo-cache') - - cmd = {"prefix":"osd tier add", "pool":"foo", "tierpool":"foo-cache", "force_nonempty":""} - ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) - eq(ret, 0) - - try: - cmd = {"prefix":"osd tier cache-mode", "pool":"foo-cache", "tierpool":"foo-cache", "mode":"readonly", "sure":"--yes-i-really-mean-it"} - ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) - eq(ret, 0) - - eq(self.rados.wait_for_latest_osdmap(), 0) - - eq(pool_id, self.rados.get_pool_base_tier(pool_id)) - eq(pool_id, self.rados.get_pool_base_tier(tier_pool_id)) - finally: - cmd = {"prefix":"osd tier remove", "pool":"foo", "tierpool":"foo-cache"} - ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) - eq(ret, 0) - finally: - self.rados.delete_pool('foo-cache') - finally: - self.rados.delete_pool('foo') - - def test_get_fsid(self): - fsid = self.rados.get_fsid() - eq(len(fsid), 36) - - def test_blacklist_add(self): - self.rados.blacklist_add("1.2.3.4/123", 1) - - def test_get_cluster_stats(self): - stats = self.rados.get_cluster_stats() - assert stats['kb'] > 0 - assert stats['kb_avail'] > 0 - assert stats['kb_used'] > 0 - assert stats['num_objects'] >= 0 - - def test_monitor_log(self): - lock = threading.Condition() - def cb(arg, line, who, sec, nsec, seq, level, msg): - # NOTE(sileht): the old pyrados API was received the pointer as int - # instead of the value of arg - eq(arg, "arg") - with lock: - lock.notify() - return 0 - - # NOTE(sileht): force don't save the monitor into local var - # to ensure all references are correctly tracked into the lib - MonitorLog(self.rados, "debug", cb, "arg") - with lock: - lock.wait() - MonitorLog(self.rados, "debug", None, None) - eq(None, self.rados.monitor_callback) - -class TestIoctx(object): - - def setUp(self): - self.rados = Rados(conffile='') - self.rados.connect() - self.rados.create_pool('test_pool') - assert self.rados.pool_exists('test_pool') - self.ioctx = self.rados.open_ioctx('test_pool') - - def tearDown(self): - cmd = {"prefix":"osd unset", "key":"noup"} - self.rados.mon_command(json.dumps(cmd), b'') - self.ioctx.close() - self.rados.delete_pool('test_pool') - self.rados.shutdown() - - def test_get_last_version(self): - version = self.ioctx.get_last_version() - assert version >= 0 - - def test_get_stats(self): - stats = self.ioctx.get_stats() - eq(stats, {'num_objects_unfound': 0, - 'num_objects_missing_on_primary': 0, - 'num_object_clones': 0, - 'num_objects': 0, - 'num_object_copies': 0, - 'num_bytes': 0, - 'num_rd_kb': 0, - 'num_wr_kb': 0, - 'num_kb': 0, - 'num_wr': 0, - 'num_objects_degraded': 0, - 'num_rd': 0}) - - def test_change_auid(self): - self.ioctx.change_auid(ANONYMOUS_AUID) - self.ioctx.change_auid(ADMIN_AUID) - - def test_write(self): - self.ioctx.write('abc', b'abc') - eq(self.ioctx.read('abc'), b'abc') - - def test_write_full(self): - self.ioctx.write('abc', b'abc') - eq(self.ioctx.read('abc'), b'abc') - self.ioctx.write_full('abc', b'd') - eq(self.ioctx.read('abc'), b'd') - - def test_append(self): - self.ioctx.write('abc', b'a') - self.ioctx.append('abc', b'b') - self.ioctx.append('abc', b'c') - eq(self.ioctx.read('abc'), b'abc') - - def test_write_zeros(self): - self.ioctx.write('abc', b'a\0b\0c') - eq(self.ioctx.read('abc'), b'a\0b\0c') - - def test_trunc(self): - self.ioctx.write('abc', b'abc') - self.ioctx.trunc('abc', 2) - eq(self.ioctx.read('abc'), b'ab') - size = self.ioctx.stat('abc')[0] - eq(size, 2) - - def test_list_objects_empty(self): - eq(list(self.ioctx.list_objects()), []) - - def test_list_objects(self): - self.ioctx.write('a', b'') - self.ioctx.write('b', b'foo') - self.ioctx.write_full('c', b'bar') - self.ioctx.append('d', b'jazz') - object_names = [obj.key for obj in self.ioctx.list_objects()] - eq(sorted(object_names), ['a', 'b', 'c', 'd']) - - def test_list_ns_objects(self): - self.ioctx.write('a', b'') - self.ioctx.write('b', b'foo') - self.ioctx.write_full('c', b'bar') - self.ioctx.append('d', b'jazz') - self.ioctx.set_namespace("ns1") - self.ioctx.write('ns1-a', b'') - self.ioctx.write('ns1-b', b'foo') - self.ioctx.write_full('ns1-c', b'bar') - self.ioctx.append('ns1-d', b'jazz') - self.ioctx.append('d', b'jazz') - self.ioctx.set_namespace(LIBRADOS_ALL_NSPACES) - object_names = [(obj.nspace, obj.key) for obj in self.ioctx.list_objects()] - eq(sorted(object_names), [('', 'a'), ('','b'), ('','c'), ('','d'),\ - ('ns1', 'd'), ('ns1', 'ns1-a'), ('ns1', 'ns1-b'),\ - ('ns1', 'ns1-c'), ('ns1', 'ns1-d')]) - - def test_xattrs(self): - xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0') - self.ioctx.write('abc', b'') - for key, value in xattrs.items(): - self.ioctx.set_xattr('abc', key, value) - eq(self.ioctx.get_xattr('abc', key), value) - stored_xattrs = {} - for key, value in self.ioctx.get_xattrs('abc'): - stored_xattrs[key] = value - eq(stored_xattrs, xattrs) - - def test_obj_xattrs(self): - xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0') - self.ioctx.write('abc', b'') - obj = list(self.ioctx.list_objects())[0] - for key, value in xattrs.items(): - obj.set_xattr(key, value) - eq(obj.get_xattr(key), value) - stored_xattrs = {} - for key, value in obj.get_xattrs(): - stored_xattrs[key] = value - eq(stored_xattrs, xattrs) - - def test_create_snap(self): - assert_raises(ObjectNotFound, self.ioctx.remove_snap, 'foo') - self.ioctx.create_snap('foo') - self.ioctx.remove_snap('foo') - - def test_list_snaps_empty(self): - eq(list(self.ioctx.list_snaps()), []) - - def test_list_snaps(self): - snaps = ['snap1', 'snap2', 'snap3'] - for snap in snaps: - self.ioctx.create_snap(snap) - listed_snaps = [snap.name for snap in self.ioctx.list_snaps()] - eq(snaps, listed_snaps) - - def test_lookup_snap(self): - self.ioctx.create_snap('foo') - snap = self.ioctx.lookup_snap('foo') - eq(snap.name, 'foo') - - def test_snap_timestamp(self): - self.ioctx.create_snap('foo') - snap = self.ioctx.lookup_snap('foo') - snap.get_timestamp() - - def test_remove_snap(self): - self.ioctx.create_snap('foo') - (snap,) = self.ioctx.list_snaps() - eq(snap.name, 'foo') - self.ioctx.remove_snap('foo') - eq(list(self.ioctx.list_snaps()), []) - - def test_snap_rollback(self): - self.ioctx.write("insnap", b"contents1") - self.ioctx.create_snap("snap1") - self.ioctx.remove_object("insnap") - self.ioctx.snap_rollback("insnap", "snap1") - eq(self.ioctx.read("insnap"), b"contents1") - self.ioctx.remove_snap("snap1") - self.ioctx.remove_object("insnap") - - def test_snap_read(self): - self.ioctx.write("insnap", b"contents1") - self.ioctx.create_snap("snap1") - self.ioctx.remove_object("insnap") - snap = self.ioctx.lookup_snap("snap1") - self.ioctx.set_read(snap.snap_id) - eq(self.ioctx.read("insnap"), b"contents1") - self.ioctx.set_read(LIBRADOS_SNAP_HEAD) - self.ioctx.write("inhead", b"contents2") - eq(self.ioctx.read("inhead"), b"contents2") - self.ioctx.remove_snap("snap1") - self.ioctx.remove_object("inhead") - - def test_set_omap(self): - keys = ("1", "2", "3", "4") - values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04") - with WriteOpCtx(self.ioctx) as write_op: - self.ioctx.set_omap(write_op, keys, values) - write_op.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS) - self.ioctx.operate_write_op(write_op, "hw") - with ReadOpCtx(self.ioctx) as read_op: - iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4) - eq(ret, 0) - self.ioctx.operate_read_op(read_op, "hw") - next(iter) - eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")]) - with ReadOpCtx(self.ioctx) as read_op: - iter, ret = self.ioctx.get_omap_vals(read_op, "2", "", 4) - eq(ret, 0) - self.ioctx.operate_read_op(read_op, "hw") - eq(("3", b"ccc"), next(iter)) - eq(list(iter), [("4", b"\x04\x04\x04\x04")]) - with ReadOpCtx(self.ioctx) as read_op: - iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4) - eq(ret, 0) - read_op.set_flags(LIBRADOS_OPERATION_BALANCE_READS) - self.ioctx.operate_read_op(read_op, "hw") - eq(list(iter), [("2", b"bbb")]) - - def test_set_omap_aio(self): - lock = threading.Condition() - count = [0] - def cb(blah): - with lock: - count[0] += 1 - lock.notify() - return 0 - - keys = ("1", "2", "3", "4") - values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04") - with WriteOpCtx(self.ioctx) as write_op: - self.ioctx.set_omap(write_op, keys, values) - comp = self.ioctx.operate_aio_write_op(write_op, "hw", cb, cb) - comp.wait_for_complete() - comp.wait_for_safe() - with lock: - while count[0] < 2: - lock.wait() - eq(comp.get_return_value(), 0) - - with ReadOpCtx(self.ioctx) as read_op: - iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4) - eq(ret, 0) - comp = self.ioctx.operate_aio_read_op(read_op, "hw", cb, cb) - comp.wait_for_complete() - comp.wait_for_safe() - with lock: - while count[0] < 4: - lock.wait() - eq(comp.get_return_value(), 0) - next(iter) - eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")]) - - def test_write_ops(self): - with WriteOpCtx(self.ioctx) as write_op: - write_op.new(0) - self.ioctx.operate_write_op(write_op, "write_ops") - eq(self.ioctx.read('write_ops'), b'') - write_op.write_full(b'1') - write_op.append(b'2') - self.ioctx.operate_write_op(write_op, "write_ops") - eq(self.ioctx.read('write_ops'), b'12') - write_op.write_full(b'12345') - write_op.write(b'x', 2) - self.ioctx.operate_write_op(write_op, "write_ops") - eq(self.ioctx.read('write_ops'), b'12x45') - write_op.write_full(b'12345') - write_op.zero(2, 2) - self.ioctx.operate_write_op(write_op, "write_ops") - eq(self.ioctx.read('write_ops'), b'12\x00\x005') - write_op.write_full(b'12345') - write_op.truncate(2) - self.ioctx.operate_write_op(write_op, "write_ops") - eq(self.ioctx.read('write_ops'), b'12') - write_op.remove() - self.ioctx.operate_write_op(write_op, "write_ops") - with assert_raises(ObjectNotFound): - self.ioctx.read('write_ops') - - def test_get_omap_vals_by_keys(self): - keys = ("1", "2", "3", "4") - values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04") - with WriteOpCtx(self.ioctx) as write_op: - self.ioctx.set_omap(write_op, keys, values) - self.ioctx.operate_write_op(write_op, "hw") - with ReadOpCtx(self.ioctx) as read_op: - iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",)) - eq(ret, 0) - self.ioctx.operate_read_op(read_op, "hw") - eq(list(iter), [("3", b"ccc"), ("4", b"\x04\x04\x04\x04")]) - with ReadOpCtx(self.ioctx) as read_op: - iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",)) - eq(ret, 0) - with assert_raises(ObjectNotFound): - self.ioctx.operate_read_op(read_op, "no_such") - - def test_get_omap_keys(self): - keys = ("1", "2", "3") - values = (b"aaa", b"bbb", b"ccc") - with WriteOpCtx(self.ioctx) as write_op: - self.ioctx.set_omap(write_op, keys, values) - self.ioctx.operate_write_op(write_op, "hw") - with ReadOpCtx(self.ioctx) as read_op: - iter, ret = self.ioctx.get_omap_keys(read_op,"",2) - eq(ret, 0) - self.ioctx.operate_read_op(read_op, "hw") - eq(list(iter), [("1", None), ("2", None)]) - with ReadOpCtx(self.ioctx) as read_op: - iter, ret = self.ioctx.get_omap_keys(read_op,"",2) - eq(ret, 0) - with assert_raises(ObjectNotFound): - self.ioctx.operate_read_op(read_op, "no_such") - - def test_clear_omap(self): - keys = ("1", "2", "3") - values = (b"aaa", b"bbb", b"ccc") - with WriteOpCtx(self.ioctx) as write_op: - self.ioctx.set_omap(write_op, keys, values) - self.ioctx.operate_write_op(write_op, "hw") - with WriteOpCtx(self.ioctx) as write_op_1: - self.ioctx.clear_omap(write_op_1) - self.ioctx.operate_write_op(write_op_1, "hw") - with ReadOpCtx(self.ioctx) as read_op: - iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("1",)) - eq(ret, 0) - self.ioctx.operate_read_op(read_op, "hw") - eq(list(iter), []) - - def test_locator(self): - self.ioctx.set_locator_key("bar") - self.ioctx.write('foo', b'contents1') - objects = [i for i in self.ioctx.list_objects()] - eq(len(objects), 1) - eq(self.ioctx.get_locator_key(), "bar") - self.ioctx.set_locator_key("") - objects[0].seek(0) - objects[0].write(b"contents2") - eq(self.ioctx.get_locator_key(), "") - self.ioctx.set_locator_key("bar") - contents = self.ioctx.read("foo") - eq(contents, b"contents2") - eq(self.ioctx.get_locator_key(), "bar") - objects[0].remove() - objects = [i for i in self.ioctx.list_objects()] - eq(objects, []) - self.ioctx.set_locator_key("") - - def test_aio_write(self): - lock = threading.Condition() - count = [0] - def cb(blah): - with lock: - count[0] += 1 - lock.notify() - return 0 - comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb) - comp.wait_for_complete() - comp.wait_for_safe() - with lock: - while count[0] < 2: - lock.wait() - eq(comp.get_return_value(), 0) - contents = self.ioctx.read("foo") - eq(contents, b"bar") - [i.remove() for i in self.ioctx.list_objects()] - - def test_aio_write_no_comp_ref(self): - lock = threading.Condition() - count = [0] - def cb(blah): - with lock: - count[0] += 1 - lock.notify() - return 0 - # NOTE(sileht): force don't save the comp into local var - # to ensure all references are correctly tracked into the lib - self.ioctx.aio_write("foo", b"bar", 0, cb, cb) - with lock: - while count[0] < 2: - lock.wait() - contents = self.ioctx.read("foo") - eq(contents, b"bar") - [i.remove() for i in self.ioctx.list_objects()] - - def test_aio_append(self): - lock = threading.Condition() - count = [0] - def cb(blah): - with lock: - count[0] += 1 - lock.notify() - return 0 - comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb) - comp2 = self.ioctx.aio_append("foo", b"baz", cb, cb) - comp.wait_for_complete() - contents = self.ioctx.read("foo") - eq(contents, b"barbaz") - with lock: - while count[0] < 4: - lock.wait() - eq(comp.get_return_value(), 0) - eq(comp2.get_return_value(), 0) - [i.remove() for i in self.ioctx.list_objects()] - - def test_aio_write_full(self): - lock = threading.Condition() - count = [0] - def cb(blah): - with lock: - count[0] += 1 - lock.notify() - return 0 - self.ioctx.aio_write("foo", b"barbaz", 0, cb, cb) - comp = self.ioctx.aio_write_full("foo", b"bar", cb, cb) - comp.wait_for_complete() - comp.wait_for_safe() - with lock: - while count[0] < 2: - lock.wait() - eq(comp.get_return_value(), 0) - contents = self.ioctx.read("foo") - eq(contents, b"bar") - [i.remove() for i in self.ioctx.list_objects()] - - def test_aio_stat(self): - lock = threading.Condition() - count = [0] - def cb(_, size, mtime): - with lock: - count[0] += 1 - lock.notify() - - comp = self.ioctx.aio_stat("foo", cb) - comp.wait_for_complete() - with lock: - while count[0] < 1: - lock.wait() - eq(comp.get_return_value(), -2) - - self.ioctx.write("foo", b"bar") - - comp = self.ioctx.aio_stat("foo", cb) - comp.wait_for_complete() - with lock: - while count[0] < 2: - lock.wait() - eq(comp.get_return_value(), 0) - - [i.remove() for i in self.ioctx.list_objects()] - - def _take_down_acting_set(self, pool, objectname): - # find acting_set for pool:objectname and take it down; used to - # verify that async reads don't complete while acting set is missing - cmd = { - "prefix":"osd map", - "pool":pool, - "object":objectname, - "format":"json", - } - r, jsonout, _ = self.rados.mon_command(json.dumps(cmd), b'') - objmap = json.loads(jsonout.decode("utf-8")) - acting_set = objmap['acting'] - cmd = {"prefix":"osd set", "key":"noup"} - r, _, _ = self.rados.mon_command(json.dumps(cmd), b'') - eq(r, 0) - cmd = {"prefix":"osd down", "ids":[str(i) for i in acting_set]} - r, _, _ = self.rados.mon_command(json.dumps(cmd), b'') - eq(r, 0) - - # wait for OSDs to acknowledge the down - eq(self.rados.wait_for_latest_osdmap(), 0) - - def _let_osds_back_up(self): - cmd = {"prefix":"osd unset", "key":"noup"} - r, _, _ = self.rados.mon_command(json.dumps(cmd), b'') - eq(r, 0) - - def test_aio_read(self): - # this is a list so that the local cb() can modify it - retval = [None] - lock = threading.Condition() - def cb(_, buf): - with lock: - retval[0] = buf - lock.notify() - payload = b"bar\000frob" - self.ioctx.write("foo", payload) - - # test1: use wait_for_complete() and wait for cb by - # watching retval[0] - self._take_down_acting_set('test_pool', 'foo') - comp = self.ioctx.aio_read("foo", len(payload), 0, cb) - eq(False, comp.is_complete()) - time.sleep(3) - eq(False, comp.is_complete()) - with lock: - eq(None, retval[0]) - self._let_osds_back_up() - comp.wait_for_complete() - loops = 0 - with lock: - while retval[0] is None and loops <= 10: - lock.wait(timeout=5) - loops += 1 - assert(loops <= 10) - - eq(retval[0], payload) - eq(sys.getrefcount(comp), 2) - - # test2: use wait_for_complete_and_cb(), verify retval[0] is - # set by the time we regain control - - retval[0] = None - self._take_down_acting_set('test_pool', 'foo') - comp = self.ioctx.aio_read("foo", len(payload), 0, cb) - eq(False, comp.is_complete()) - time.sleep(3) - eq(False, comp.is_complete()) - with lock: - eq(None, retval[0]) - self._let_osds_back_up() - - comp.wait_for_complete_and_cb() - assert(retval[0] is not None) - eq(retval[0], payload) - eq(sys.getrefcount(comp), 2) - - # test3: error case, use wait_for_complete_and_cb(), verify retval[0] is - # set by the time we regain control - - retval[0] = 1 - self._take_down_acting_set('test_pool', 'bar') - comp = self.ioctx.aio_read("bar", len(payload), 0, cb) - eq(False, comp.is_complete()) - time.sleep(3) - eq(False, comp.is_complete()) - with lock: - eq(1, retval[0]) - self._let_osds_back_up() - - comp.wait_for_complete_and_cb() - eq(None, retval[0]) - assert(comp.get_return_value() < 0) - eq(sys.getrefcount(comp), 2) - - [i.remove() for i in self.ioctx.list_objects()] - - def test_lock(self): - self.ioctx.lock_exclusive("foo", "lock", "locker", "desc_lock", - 10000, 0) - assert_raises(ObjectExists, - self.ioctx.lock_exclusive, - "foo", "lock", "locker", "desc_lock", 10000, 0) - self.ioctx.unlock("foo", "lock", "locker") - assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker") - - self.ioctx.lock_shared("foo", "lock", "locker1", "tag", "desc_lock", - 10000, 0) - self.ioctx.lock_shared("foo", "lock", "locker2", "tag", "desc_lock", - 10000, 0) - assert_raises(ObjectBusy, - self.ioctx.lock_exclusive, - "foo", "lock", "locker3", "desc_lock", 10000, 0) - self.ioctx.unlock("foo", "lock", "locker1") - self.ioctx.unlock("foo", "lock", "locker2") - assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker1") - assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker2") - - def test_execute(self): - self.ioctx.write("foo", b"") # ensure object exists - - ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"") - eq(buf, b"Hello, world!") - - ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"nose") - eq(buf, b"Hello, nose!") - - def test_aio_execute(self): - count = [0] - retval = [None] - lock = threading.Condition() - def cb(_, buf): - with lock: - if retval[0] is None: - retval[0] = buf - count[0] += 1 - lock.notify() - self.ioctx.write("foo", b"") # ensure object exists - - comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"", 32, cb, cb) - comp.wait_for_complete() - with lock: - while count[0] < 2: - lock.wait() - eq(comp.get_return_value(), 13) - eq(retval[0], b"Hello, world!") - - retval[0] = None - comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"nose", 32, cb, cb) - comp.wait_for_complete() - with lock: - while count[0] < 4: - lock.wait() - eq(comp.get_return_value(), 12) - eq(retval[0], b"Hello, nose!") - - [i.remove() for i in self.ioctx.list_objects()] - - def test_applications(self): - cmd = {"prefix":"osd dump", "format":"json"} - ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'') - eq(ret, 0) - assert len(buf) > 0 - release = json.loads(buf.decode("utf-8")).get("require_osd_release", - None) - if not release or release[0] < 'l': - raise SkipTest - - eq([], self.ioctx.application_list()) - - self.ioctx.application_enable("app1") - assert_raises(Error, self.ioctx.application_enable, "app2") - self.ioctx.application_enable("app2", True) - - assert_raises(Error, self.ioctx.application_metadata_list, "dne") - eq([], self.ioctx.application_metadata_list("app1")) - - assert_raises(Error, self.ioctx.application_metadata_set, "dne", "key", - "key") - self.ioctx.application_metadata_set("app1", "key1", "val1") - self.ioctx.application_metadata_set("app1", "key2", "val2") - self.ioctx.application_metadata_set("app2", "key1", "val1") - - eq([("key1", "val1"), ("key2", "val2")], - self.ioctx.application_metadata_list("app1")) - - self.ioctx.application_metadata_remove("app1", "key1") - eq([("key2", "val2")], self.ioctx.application_metadata_list("app1")) - -class TestObject(object): - - def setUp(self): - self.rados = Rados(conffile='') - self.rados.connect() - self.rados.create_pool('test_pool') - assert self.rados.pool_exists('test_pool') - self.ioctx = self.rados.open_ioctx('test_pool') - self.ioctx.write('foo', b'bar') - self.object = Object(self.ioctx, 'foo') - - def tearDown(self): - self.ioctx.close() - self.ioctx = None - self.rados.delete_pool('test_pool') - self.rados.shutdown() - self.rados = None - - def test_read(self): - eq(self.object.read(3), b'bar') - eq(self.object.read(100), b'') - - def test_seek(self): - self.object.write(b'blah') - self.object.seek(0) - eq(self.object.read(4), b'blah') - self.object.seek(1) - eq(self.object.read(3), b'lah') - - def test_write(self): - self.object.write(b'barbaz') - self.object.seek(0) - eq(self.object.read(3), b'bar') - eq(self.object.read(3), b'baz') - -class TestCommand(object): - - def setUp(self): - self.rados = Rados(conffile='') - self.rados.connect() - - def tearDown(self): - self.rados.shutdown() - - def test_monmap_dump(self): - - # check for success and some plain output with epoch in it - cmd = {"prefix":"mon dump"} - ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) - eq(ret, 0) - assert len(buf) > 0 - assert(b'epoch' in buf) - - # JSON, and grab current epoch - cmd['format'] = 'json' - ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) - eq(ret, 0) - assert len(buf) > 0 - d = json.loads(buf.decode("utf-8")) - assert('epoch' in d) - epoch = d['epoch'] - - # assume epoch + 1000 does not exist; test for ENOENT - cmd['epoch'] = epoch + 1000 - ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) - eq(ret, -errno.ENOENT) - eq(len(buf), 0) - del cmd['epoch'] - - # send to specific target by name - target = d['mons'][0]['name'] - print(target) - ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30, - target=target) - eq(ret, 0) - assert len(buf) > 0 - d = json.loads(buf.decode("utf-8")) - assert('epoch' in d) - - # and by rank - target = d['mons'][0]['rank'] - print(target) - ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30, - target=target) - eq(ret, 0) - assert len(buf) > 0 - d = json.loads(buf.decode("utf-8")) - assert('epoch' in d) - - def test_osd_bench(self): - cmd = dict(prefix='bench', size=4096, count=8192) - ret, buf, err = self.rados.osd_command(0, json.dumps(cmd), b'', - timeout=30) - eq(ret, 0) - assert len(err) > 0 - out = json.loads(err) - eq(out['blocksize'], cmd['size']) - eq(out['bytes_written'], cmd['count']) - - def test_ceph_osd_pool_create_utf8(self): - if _python2: - # Use encoded bytestring - poolname = b"\351\273\205" - else: - poolname = "\u9ec5" - - cmd = {"prefix": "osd pool create", "pg_num": 16, "pool": poolname} - ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'') - eq(ret, 0) - assert len(out) > 0 - eq(u"pool '\u9ec5' created", out)