Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / pybind / test_rados.py
1 from __future__ import print_function
2 from nose import SkipTest
3 from nose.tools import eq_ as eq, ok_ as ok, assert_raises
4 from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
5                    ObjectNotFound, ObjectBusy, requires, opt,
6                    ANONYMOUS_AUID, ADMIN_AUID, LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx,
7                    LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog)
8 import time
9 import threading
10 import json
11 import errno
12 import sys
13
14 # Are we running Python 2.x
15 _python2 = sys.version_info[0] < 3
16
17 def test_rados_init_error():
18     assert_raises(Error, Rados, conffile='', rados_id='admin',
19                   name='client.admin')
20     assert_raises(Error, Rados, conffile='', name='invalid')
21     assert_raises(Error, Rados, conffile='', name='bad.invalid')
22
23 def test_rados_init():
24     with Rados(conffile='', rados_id='admin'):
25         pass
26     with Rados(conffile='', name='client.admin'):
27         pass
28     with Rados(conffile='', name='client.admin'):
29         pass
30     with Rados(conffile='', name='client.admin'):
31         pass
32
33 def test_ioctx_context_manager():
34     with Rados(conffile='', rados_id='admin') as conn:
35         with conn.open_ioctx('rbd') as ioctx:
36             pass
37
38 def test_parse_argv():
39     args = ['osd', 'pool', 'delete', 'foobar', 'foobar', '--yes-i-really-really-mean-it']
40     r = Rados()
41     eq(args, r.conf_parse_argv(args))
42
43 def test_parse_argv_empty_str():
44     args = ['']
45     r = Rados()
46     eq(args, r.conf_parse_argv(args))
47
48 class TestRequires(object):
49     @requires(('foo', str), ('bar', int), ('baz', int))
50     def _method_plain(self, foo, bar, baz):
51         ok(isinstance(foo, str))
52         ok(isinstance(bar, int))
53         ok(isinstance(baz, int))
54         return (foo, bar, baz)
55
56     def test_method_plain(self):
57         assert_raises(TypeError, self._method_plain, 42, 42, 42)
58         assert_raises(TypeError, self._method_plain, '42', '42', '42')
59         assert_raises(TypeError, self._method_plain, foo='42', bar='42', baz='42')
60         eq(self._method_plain('42', 42, 42), ('42', 42, 42))
61         eq(self._method_plain(foo='42', bar=42, baz=42), ('42', 42, 42))
62
63     @requires(('opt_foo', opt(str)), ('opt_bar', opt(int)), ('baz', int))
64     def _method_with_opt_arg(self, foo, bar, baz):
65         ok(isinstance(foo, str) or foo is None)
66         ok(isinstance(bar, int) or bar is None)
67         ok(isinstance(baz, int))
68         return (foo, bar, baz)
69
70     def test_method_with_opt_args(self):
71         assert_raises(TypeError, self._method_with_opt_arg, 42, 42, 42)
72         assert_raises(TypeError, self._method_with_opt_arg, '42', '42', 42)
73         assert_raises(TypeError, self._method_with_opt_arg, None, None, None)
74         eq(self._method_with_opt_arg(None, 42, 42), (None, 42, 42))
75         eq(self._method_with_opt_arg('42', None, 42), ('42', None, 42))
76         eq(self._method_with_opt_arg(None, None, 42), (None, None, 42))
77
78
79 class TestRadosStateError(object):
80     def _requires_configuring(self, rados):
81         assert_raises(RadosStateError, rados.connect)
82
83     def _requires_configuring_or_connected(self, rados):
84         assert_raises(RadosStateError, rados.conf_read_file)
85         assert_raises(RadosStateError, rados.conf_parse_argv, None)
86         assert_raises(RadosStateError, rados.conf_parse_env)
87         assert_raises(RadosStateError, rados.conf_get, 'opt')
88         assert_raises(RadosStateError, rados.conf_set, 'opt', 'val')
89         assert_raises(RadosStateError, rados.ping_monitor, 0)
90
91     def _requires_connected(self, rados):
92         assert_raises(RadosStateError, rados.pool_exists, 'foo')
93         assert_raises(RadosStateError, rados.pool_lookup, 'foo')
94         assert_raises(RadosStateError, rados.pool_reverse_lookup, 0)
95         assert_raises(RadosStateError, rados.create_pool, 'foo')
96         assert_raises(RadosStateError, rados.get_pool_base_tier, 0)
97         assert_raises(RadosStateError, rados.delete_pool, 'foo')
98         assert_raises(RadosStateError, rados.list_pools)
99         assert_raises(RadosStateError, rados.get_fsid)
100         assert_raises(RadosStateError, rados.open_ioctx, 'foo')
101         assert_raises(RadosStateError, rados.mon_command, '', b'')
102         assert_raises(RadosStateError, rados.osd_command, 0, '', b'')
103         assert_raises(RadosStateError, rados.pg_command, '', '', b'')
104         assert_raises(RadosStateError, rados.wait_for_latest_osdmap)
105         assert_raises(RadosStateError, rados.blacklist_add, '127.0.0.1/123', 0)
106
107     def test_configuring(self):
108         rados = Rados(conffile='')
109         eq('configuring', rados.state)
110         self._requires_connected(rados)
111
112     def test_connected(self):
113         rados = Rados(conffile='')
114         with rados:
115             eq('connected', rados.state)
116             self._requires_configuring(rados)
117
118     def test_shutdown(self):
119         rados = Rados(conffile='')
120         with rados:
121             pass
122         eq('shutdown', rados.state)
123         self._requires_configuring(rados)
124         self._requires_configuring_or_connected(rados)
125         self._requires_connected(rados)
126
127
128 class TestRados(object):
129
130     def setUp(self):
131         self.rados = Rados(conffile='')
132         self.rados.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH')
133         self.rados.conf_parse_env()
134         self.rados.connect()
135
136         # Assume any pre-existing pools are the cluster's defaults
137         self.default_pools = self.rados.list_pools()
138
139     def tearDown(self):
140         self.rados.shutdown()
141
142     def test_ping_monitor(self):
143         assert_raises(ObjectNotFound, self.rados.ping_monitor, 'not_exists_monitor')
144         cmd = {'prefix': 'mon dump', 'format':'json'}
145         ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
146         for mon in json.loads(buf.decode('utf8'))['mons']:
147             while True:
148                 output = self.rados.ping_monitor(mon['name'])
149                 if output is None:
150                     continue
151                 buf = json.loads(output)
152                 if buf.get('health'):
153                     break
154
155     def test_create(self):
156         self.rados.create_pool('foo')
157         self.rados.delete_pool('foo')
158
159     def test_create_utf8(self):
160         if _python2:
161             # Use encoded bytestring
162             poolname = b"\351\273\204"
163         else:
164             poolname = "\u9ec4"
165         self.rados.create_pool(poolname)
166         assert self.rados.pool_exists(u"\u9ec4")
167         self.rados.delete_pool(poolname)
168
169     def test_pool_lookup_utf8(self):
170         if _python2:
171             poolname = u'\u9ec4'
172         else:
173             poolname = '\u9ec4'
174         self.rados.create_pool(poolname)
175         try:
176             poolid = self.rados.pool_lookup(poolname)
177             eq(poolname, self.rados.pool_reverse_lookup(poolid))
178         finally:
179             self.rados.delete_pool(poolname)
180
181     def test_create_auid(self):
182         self.rados.create_pool('foo', 100)
183         assert self.rados.pool_exists('foo')
184         self.rados.delete_pool('foo')
185
186     def test_eexist(self):
187         self.rados.create_pool('foo')
188         assert_raises(ObjectExists, self.rados.create_pool, 'foo')
189         self.rados.delete_pool('foo')
190
191     def list_non_default_pools(self):
192         pools = self.rados.list_pools()
193         for p in self.default_pools:
194             pools.remove(p)
195         return set(pools)
196
197     def test_list_pools(self):
198         eq(set(), self.list_non_default_pools())
199         self.rados.create_pool('foo')
200         eq(set(['foo']), self.list_non_default_pools())
201         self.rados.create_pool('bar')
202         eq(set(['foo', 'bar']), self.list_non_default_pools())
203         self.rados.create_pool('baz')
204         eq(set(['foo', 'bar', 'baz']), self.list_non_default_pools())
205         self.rados.delete_pool('foo')
206         eq(set(['bar', 'baz']), self.list_non_default_pools())
207         self.rados.delete_pool('baz')
208         eq(set(['bar']), self.list_non_default_pools())
209         self.rados.delete_pool('bar')
210         eq(set(), self.list_non_default_pools())
211         self.rados.create_pool('a' * 500)
212         eq(set(['a' * 500]), self.list_non_default_pools())
213         self.rados.delete_pool('a' * 500)
214
215     def test_get_pool_base_tier(self):
216         self.rados.create_pool('foo')
217         try:
218             self.rados.create_pool('foo-cache')
219             try:
220                 pool_id = self.rados.pool_lookup('foo')
221                 tier_pool_id = self.rados.pool_lookup('foo-cache')
222
223                 cmd = {"prefix":"osd tier add", "pool":"foo", "tierpool":"foo-cache", "force_nonempty":""}
224                 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
225                 eq(ret, 0)
226
227                 try:
228                     cmd = {"prefix":"osd tier cache-mode", "pool":"foo-cache", "tierpool":"foo-cache", "mode":"readonly", "sure":"--yes-i-really-mean-it"}
229                     ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
230                     eq(ret, 0)
231
232                     eq(self.rados.wait_for_latest_osdmap(), 0)
233
234                     eq(pool_id, self.rados.get_pool_base_tier(pool_id))
235                     eq(pool_id, self.rados.get_pool_base_tier(tier_pool_id))
236                 finally:
237                     cmd = {"prefix":"osd tier remove", "pool":"foo", "tierpool":"foo-cache"}
238                     ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
239                     eq(ret, 0)
240             finally:
241                 self.rados.delete_pool('foo-cache')
242         finally:
243             self.rados.delete_pool('foo')
244
245     def test_get_fsid(self):
246         fsid = self.rados.get_fsid()
247         eq(len(fsid), 36)
248
249     def test_blacklist_add(self):
250         self.rados.blacklist_add("1.2.3.4/123", 1)
251
252     def test_get_cluster_stats(self):
253         stats = self.rados.get_cluster_stats()
254         assert stats['kb'] > 0
255         assert stats['kb_avail'] > 0
256         assert stats['kb_used'] > 0
257         assert stats['num_objects'] >= 0
258
259     def test_monitor_log(self):
260         lock = threading.Condition()
261         def cb(arg, line, who, sec, nsec, seq, level, msg):
262             # NOTE(sileht): the old pyrados API was received the pointer as int
263             # instead of the value of arg
264             eq(arg, "arg")
265             with lock:
266                 lock.notify()
267             return 0
268
269         # NOTE(sileht): force don't save the monitor into local var
270         # to ensure all references are correctly tracked into the lib
271         MonitorLog(self.rados, "debug", cb, "arg")
272         with lock:
273             lock.wait()
274         MonitorLog(self.rados, "debug", None, None)
275         eq(None, self.rados.monitor_callback)
276
277 class TestIoctx(object):
278
279     def setUp(self):
280         self.rados = Rados(conffile='')
281         self.rados.connect()
282         self.rados.create_pool('test_pool')
283         assert self.rados.pool_exists('test_pool')
284         self.ioctx = self.rados.open_ioctx('test_pool')
285
286     def tearDown(self):
287         cmd = {"prefix":"osd unset", "key":"noup"}
288         self.rados.mon_command(json.dumps(cmd), b'')
289         self.ioctx.close()
290         self.rados.delete_pool('test_pool')
291         self.rados.shutdown()
292
293     def test_get_last_version(self):
294         version = self.ioctx.get_last_version()
295         assert version >= 0
296
297     def test_get_stats(self):
298         stats = self.ioctx.get_stats()
299         eq(stats, {'num_objects_unfound': 0,
300                    'num_objects_missing_on_primary': 0,
301                    'num_object_clones': 0,
302                    'num_objects': 0,
303                    'num_object_copies': 0,
304                    'num_bytes': 0,
305                    'num_rd_kb': 0,
306                    'num_wr_kb': 0,
307                    'num_kb': 0,
308                    'num_wr': 0,
309                    'num_objects_degraded': 0,
310                    'num_rd': 0})
311
312     def test_change_auid(self):
313         self.ioctx.change_auid(ANONYMOUS_AUID)
314         self.ioctx.change_auid(ADMIN_AUID)
315
316     def test_write(self):
317         self.ioctx.write('abc', b'abc')
318         eq(self.ioctx.read('abc'), b'abc')
319
320     def test_write_full(self):
321         self.ioctx.write('abc', b'abc')
322         eq(self.ioctx.read('abc'), b'abc')
323         self.ioctx.write_full('abc', b'd')
324         eq(self.ioctx.read('abc'), b'd')
325
326     def test_append(self):
327         self.ioctx.write('abc', b'a')
328         self.ioctx.append('abc', b'b')
329         self.ioctx.append('abc', b'c')
330         eq(self.ioctx.read('abc'), b'abc')
331
332     def test_write_zeros(self):
333         self.ioctx.write('abc', b'a\0b\0c')
334         eq(self.ioctx.read('abc'), b'a\0b\0c')
335
336     def test_trunc(self):
337         self.ioctx.write('abc', b'abc')
338         self.ioctx.trunc('abc', 2)
339         eq(self.ioctx.read('abc'), b'ab')
340         size = self.ioctx.stat('abc')[0]
341         eq(size, 2)
342
343     def test_list_objects_empty(self):
344         eq(list(self.ioctx.list_objects()), [])
345
346     def test_list_objects(self):
347         self.ioctx.write('a', b'')
348         self.ioctx.write('b', b'foo')
349         self.ioctx.write_full('c', b'bar')
350         self.ioctx.append('d', b'jazz')
351         object_names = [obj.key for obj in self.ioctx.list_objects()]
352         eq(sorted(object_names), ['a', 'b', 'c', 'd'])
353
354     def test_list_ns_objects(self):
355         self.ioctx.write('a', b'')
356         self.ioctx.write('b', b'foo')
357         self.ioctx.write_full('c', b'bar')
358         self.ioctx.append('d', b'jazz')
359         self.ioctx.set_namespace("ns1")
360         self.ioctx.write('ns1-a', b'')
361         self.ioctx.write('ns1-b', b'foo')
362         self.ioctx.write_full('ns1-c', b'bar')
363         self.ioctx.append('ns1-d', b'jazz')
364         self.ioctx.append('d', b'jazz')
365         self.ioctx.set_namespace(LIBRADOS_ALL_NSPACES)
366         object_names = [(obj.nspace, obj.key) for obj in self.ioctx.list_objects()]
367         eq(sorted(object_names), [('', 'a'), ('','b'), ('','c'), ('','d'),\
368                 ('ns1', 'd'), ('ns1', 'ns1-a'), ('ns1', 'ns1-b'),\
369                 ('ns1', 'ns1-c'), ('ns1', 'ns1-d')])
370
371     def test_xattrs(self):
372         xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0')
373         self.ioctx.write('abc', b'')
374         for key, value in xattrs.items():
375             self.ioctx.set_xattr('abc', key, value)
376             eq(self.ioctx.get_xattr('abc', key), value)
377         stored_xattrs = {}
378         for key, value in self.ioctx.get_xattrs('abc'):
379             stored_xattrs[key] = value
380         eq(stored_xattrs, xattrs)
381
382     def test_obj_xattrs(self):
383         xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0')
384         self.ioctx.write('abc', b'')
385         obj = list(self.ioctx.list_objects())[0]
386         for key, value in xattrs.items():
387             obj.set_xattr(key, value)
388             eq(obj.get_xattr(key), value)
389         stored_xattrs = {}
390         for key, value in obj.get_xattrs():
391             stored_xattrs[key] = value
392         eq(stored_xattrs, xattrs)
393
394     def test_create_snap(self):
395         assert_raises(ObjectNotFound, self.ioctx.remove_snap, 'foo')
396         self.ioctx.create_snap('foo')
397         self.ioctx.remove_snap('foo')
398
399     def test_list_snaps_empty(self):
400         eq(list(self.ioctx.list_snaps()), [])
401
402     def test_list_snaps(self):
403         snaps = ['snap1', 'snap2', 'snap3']
404         for snap in snaps:
405             self.ioctx.create_snap(snap)
406         listed_snaps = [snap.name for snap in self.ioctx.list_snaps()]
407         eq(snaps, listed_snaps)
408
409     def test_lookup_snap(self):
410         self.ioctx.create_snap('foo')
411         snap = self.ioctx.lookup_snap('foo')
412         eq(snap.name, 'foo')
413
414     def test_snap_timestamp(self):
415         self.ioctx.create_snap('foo')
416         snap = self.ioctx.lookup_snap('foo')
417         snap.get_timestamp()
418
419     def test_remove_snap(self):
420         self.ioctx.create_snap('foo')
421         (snap,) = self.ioctx.list_snaps()
422         eq(snap.name, 'foo')
423         self.ioctx.remove_snap('foo')
424         eq(list(self.ioctx.list_snaps()), [])
425
426     def test_snap_rollback(self):
427         self.ioctx.write("insnap", b"contents1")
428         self.ioctx.create_snap("snap1")
429         self.ioctx.remove_object("insnap")
430         self.ioctx.snap_rollback("insnap", "snap1")
431         eq(self.ioctx.read("insnap"), b"contents1")
432         self.ioctx.remove_snap("snap1")
433         self.ioctx.remove_object("insnap")
434
435     def test_snap_read(self):
436         self.ioctx.write("insnap", b"contents1")
437         self.ioctx.create_snap("snap1")
438         self.ioctx.remove_object("insnap")
439         snap = self.ioctx.lookup_snap("snap1")
440         self.ioctx.set_read(snap.snap_id)
441         eq(self.ioctx.read("insnap"), b"contents1")
442         self.ioctx.set_read(LIBRADOS_SNAP_HEAD)
443         self.ioctx.write("inhead", b"contents2")
444         eq(self.ioctx.read("inhead"), b"contents2")
445         self.ioctx.remove_snap("snap1")
446         self.ioctx.remove_object("inhead")
447
448     def test_set_omap(self):
449         keys = ("1", "2", "3", "4")
450         values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
451         with WriteOpCtx(self.ioctx) as write_op:
452             self.ioctx.set_omap(write_op, keys, values)
453             write_op.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS)
454             self.ioctx.operate_write_op(write_op, "hw")
455         with ReadOpCtx(self.ioctx) as read_op:
456             iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
457             eq(ret, 0)
458             self.ioctx.operate_read_op(read_op, "hw")
459             next(iter)
460             eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
461         with ReadOpCtx(self.ioctx) as read_op:
462             iter, ret = self.ioctx.get_omap_vals(read_op, "2", "", 4)
463             eq(ret, 0)
464             self.ioctx.operate_read_op(read_op, "hw")
465             eq(("3", b"ccc"), next(iter))
466             eq(list(iter), [("4", b"\x04\x04\x04\x04")])
467         with ReadOpCtx(self.ioctx) as read_op:
468             iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4)
469             eq(ret, 0)
470             read_op.set_flags(LIBRADOS_OPERATION_BALANCE_READS)
471             self.ioctx.operate_read_op(read_op, "hw")
472             eq(list(iter), [("2", b"bbb")])
473
474     def test_set_omap_aio(self):
475         lock = threading.Condition()
476         count = [0]
477         def cb(blah):
478             with lock:
479                 count[0] += 1
480                 lock.notify()
481             return 0
482
483         keys = ("1", "2", "3", "4")
484         values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
485         with WriteOpCtx(self.ioctx) as write_op:
486             self.ioctx.set_omap(write_op, keys, values)
487             comp = self.ioctx.operate_aio_write_op(write_op, "hw", cb, cb)
488             comp.wait_for_complete()
489             comp.wait_for_safe()
490             with lock:
491                 while count[0] < 2:
492                     lock.wait()
493             eq(comp.get_return_value(), 0)
494
495         with ReadOpCtx(self.ioctx) as read_op:
496             iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
497             eq(ret, 0)
498             comp = self.ioctx.operate_aio_read_op(read_op, "hw", cb, cb)
499             comp.wait_for_complete()
500             comp.wait_for_safe()
501             with lock:
502                 while count[0] < 4:
503                     lock.wait()
504             eq(comp.get_return_value(), 0)
505             next(iter)
506             eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
507
508     def test_write_ops(self):
509         with WriteOpCtx(self.ioctx) as write_op:
510             write_op.new(0)
511             self.ioctx.operate_write_op(write_op, "write_ops")
512             eq(self.ioctx.read('write_ops'), b'')
513             write_op.write_full(b'1')
514             write_op.append(b'2')
515             self.ioctx.operate_write_op(write_op, "write_ops")
516             eq(self.ioctx.read('write_ops'), b'12')
517             write_op.write_full(b'12345')
518             write_op.write(b'x', 2)
519             self.ioctx.operate_write_op(write_op, "write_ops")
520             eq(self.ioctx.read('write_ops'), b'12x45')
521             write_op.write_full(b'12345')
522             write_op.zero(2, 2)
523             self.ioctx.operate_write_op(write_op, "write_ops")
524             eq(self.ioctx.read('write_ops'), b'12\x00\x005')
525             write_op.write_full(b'12345')
526             write_op.truncate(2)
527             self.ioctx.operate_write_op(write_op, "write_ops")
528             eq(self.ioctx.read('write_ops'), b'12')
529             write_op.remove()
530             self.ioctx.operate_write_op(write_op, "write_ops")
531             with assert_raises(ObjectNotFound):
532                 self.ioctx.read('write_ops')
533
534     def test_get_omap_vals_by_keys(self):
535         keys = ("1", "2", "3", "4")
536         values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
537         with WriteOpCtx(self.ioctx) as write_op:
538             self.ioctx.set_omap(write_op, keys, values)
539             self.ioctx.operate_write_op(write_op, "hw")
540         with ReadOpCtx(self.ioctx) as read_op:
541             iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",))
542             eq(ret, 0)
543             self.ioctx.operate_read_op(read_op, "hw")
544             eq(list(iter), [("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
545         with ReadOpCtx(self.ioctx) as read_op:
546             iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",))
547             eq(ret, 0)
548             with assert_raises(ObjectNotFound):
549                 self.ioctx.operate_read_op(read_op, "no_such")
550
551     def test_get_omap_keys(self):
552         keys = ("1", "2", "3")
553         values = (b"aaa", b"bbb", b"ccc")
554         with WriteOpCtx(self.ioctx) as write_op:
555             self.ioctx.set_omap(write_op, keys, values)
556             self.ioctx.operate_write_op(write_op, "hw")
557         with ReadOpCtx(self.ioctx) as read_op:
558             iter, ret = self.ioctx.get_omap_keys(read_op,"",2)
559             eq(ret, 0)
560             self.ioctx.operate_read_op(read_op, "hw")
561             eq(list(iter), [("1", None), ("2", None)])
562         with ReadOpCtx(self.ioctx) as read_op:
563             iter, ret = self.ioctx.get_omap_keys(read_op,"",2)
564             eq(ret, 0)
565             with assert_raises(ObjectNotFound):
566                 self.ioctx.operate_read_op(read_op, "no_such")
567
568     def test_clear_omap(self):
569         keys = ("1", "2", "3")
570         values = (b"aaa", b"bbb", b"ccc")
571         with WriteOpCtx(self.ioctx) as write_op:
572             self.ioctx.set_omap(write_op, keys, values)
573             self.ioctx.operate_write_op(write_op, "hw")
574         with WriteOpCtx(self.ioctx) as write_op_1:
575             self.ioctx.clear_omap(write_op_1)
576             self.ioctx.operate_write_op(write_op_1, "hw")
577         with ReadOpCtx(self.ioctx) as read_op:
578             iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("1",))
579             eq(ret, 0)
580             self.ioctx.operate_read_op(read_op, "hw")
581             eq(list(iter), [])
582
583     def test_locator(self):
584         self.ioctx.set_locator_key("bar")
585         self.ioctx.write('foo', b'contents1')
586         objects = [i for i in self.ioctx.list_objects()]
587         eq(len(objects), 1)
588         eq(self.ioctx.get_locator_key(), "bar")
589         self.ioctx.set_locator_key("")
590         objects[0].seek(0)
591         objects[0].write(b"contents2")
592         eq(self.ioctx.get_locator_key(), "")
593         self.ioctx.set_locator_key("bar")
594         contents = self.ioctx.read("foo")
595         eq(contents, b"contents2")
596         eq(self.ioctx.get_locator_key(), "bar")
597         objects[0].remove()
598         objects = [i for i in self.ioctx.list_objects()]
599         eq(objects, [])
600         self.ioctx.set_locator_key("")
601
602     def test_aio_write(self):
603         lock = threading.Condition()
604         count = [0]
605         def cb(blah):
606             with lock:
607                 count[0] += 1
608                 lock.notify()
609             return 0
610         comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
611         comp.wait_for_complete()
612         comp.wait_for_safe()
613         with lock:
614             while count[0] < 2:
615                 lock.wait()
616         eq(comp.get_return_value(), 0)
617         contents = self.ioctx.read("foo")
618         eq(contents, b"bar")
619         [i.remove() for i in self.ioctx.list_objects()]
620
621     def test_aio_write_no_comp_ref(self):
622         lock = threading.Condition()
623         count = [0]
624         def cb(blah):
625             with lock:
626                 count[0] += 1
627                 lock.notify()
628             return 0
629         # NOTE(sileht): force don't save the comp into local var
630         # to ensure all references are correctly tracked into the lib
631         self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
632         with lock:
633             while count[0] < 2:
634                 lock.wait()
635         contents = self.ioctx.read("foo")
636         eq(contents, b"bar")
637         [i.remove() for i in self.ioctx.list_objects()]
638
639     def test_aio_append(self):
640         lock = threading.Condition()
641         count = [0]
642         def cb(blah):
643             with lock:
644                 count[0] += 1
645                 lock.notify()
646             return 0
647         comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
648         comp2 = self.ioctx.aio_append("foo", b"baz", cb, cb)
649         comp.wait_for_complete()
650         contents = self.ioctx.read("foo")
651         eq(contents, b"barbaz")
652         with lock:
653             while count[0] < 4:
654                 lock.wait()
655         eq(comp.get_return_value(), 0)
656         eq(comp2.get_return_value(), 0)
657         [i.remove() for i in self.ioctx.list_objects()]
658
659     def test_aio_write_full(self):
660         lock = threading.Condition()
661         count = [0]
662         def cb(blah):
663             with lock:
664                 count[0] += 1
665                 lock.notify()
666             return 0
667         self.ioctx.aio_write("foo", b"barbaz", 0, cb, cb)
668         comp = self.ioctx.aio_write_full("foo", b"bar", cb, cb)
669         comp.wait_for_complete()
670         comp.wait_for_safe()
671         with lock:
672             while count[0] < 2:
673                 lock.wait()
674         eq(comp.get_return_value(), 0)
675         contents = self.ioctx.read("foo")
676         eq(contents, b"bar")
677         [i.remove() for i in self.ioctx.list_objects()]
678
679     def test_aio_stat(self):
680         lock = threading.Condition()
681         count = [0]
682         def cb(_, size, mtime):
683             with lock:
684                 count[0] += 1
685                 lock.notify()
686
687         comp = self.ioctx.aio_stat("foo", cb)
688         comp.wait_for_complete()
689         with lock:
690             while count[0] < 1:
691                 lock.wait()
692         eq(comp.get_return_value(), -2)
693
694         self.ioctx.write("foo", b"bar")
695
696         comp = self.ioctx.aio_stat("foo", cb)
697         comp.wait_for_complete()
698         with lock:
699             while count[0] < 2:
700                 lock.wait()
701         eq(comp.get_return_value(), 0)
702
703         [i.remove() for i in self.ioctx.list_objects()]
704
705     def _take_down_acting_set(self, pool, objectname):
706         # find acting_set for pool:objectname and take it down; used to
707         # verify that async reads don't complete while acting set is missing
708         cmd = {
709             "prefix":"osd map",
710             "pool":pool,
711             "object":objectname,
712             "format":"json",
713         }
714         r, jsonout, _ = self.rados.mon_command(json.dumps(cmd), b'')
715         objmap = json.loads(jsonout.decode("utf-8"))
716         acting_set = objmap['acting']
717         cmd = {"prefix":"osd set", "key":"noup"}
718         r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
719         eq(r, 0)
720         cmd = {"prefix":"osd down", "ids":[str(i) for i in acting_set]}
721         r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
722         eq(r, 0)
723
724         # wait for OSDs to acknowledge the down
725         eq(self.rados.wait_for_latest_osdmap(), 0)
726
727     def _let_osds_back_up(self):
728         cmd = {"prefix":"osd unset", "key":"noup"}
729         r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
730         eq(r, 0)
731
732     def test_aio_read(self):
733         # this is a list so that the local cb() can modify it
734         retval = [None]
735         lock = threading.Condition()
736         def cb(_, buf):
737             with lock:
738                 retval[0] = buf
739                 lock.notify()
740         payload = b"bar\000frob"
741         self.ioctx.write("foo", payload)
742
743         # test1: use wait_for_complete() and wait for cb by
744         # watching retval[0]
745         self._take_down_acting_set('test_pool', 'foo')
746         comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
747         eq(False, comp.is_complete())
748         time.sleep(3)
749         eq(False, comp.is_complete())
750         with lock:
751             eq(None, retval[0])
752         self._let_osds_back_up()
753         comp.wait_for_complete()
754         loops = 0
755         with lock:
756             while retval[0] is None and loops <= 10:
757                 lock.wait(timeout=5)
758                 loops += 1
759         assert(loops <= 10)
760
761         eq(retval[0], payload)
762         eq(sys.getrefcount(comp), 2)
763
764         # test2: use wait_for_complete_and_cb(), verify retval[0] is
765         # set by the time we regain control
766
767         retval[0] = None
768         self._take_down_acting_set('test_pool', 'foo')
769         comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
770         eq(False, comp.is_complete())
771         time.sleep(3)
772         eq(False, comp.is_complete())
773         with lock:
774             eq(None, retval[0])
775         self._let_osds_back_up()
776
777         comp.wait_for_complete_and_cb()
778         assert(retval[0] is not None)
779         eq(retval[0], payload)
780         eq(sys.getrefcount(comp), 2)
781
782         # test3: error case, use wait_for_complete_and_cb(), verify retval[0] is
783         # set by the time we regain control
784
785         retval[0] = 1
786         self._take_down_acting_set('test_pool', 'bar')
787         comp = self.ioctx.aio_read("bar", len(payload), 0, cb)
788         eq(False, comp.is_complete())
789         time.sleep(3)
790         eq(False, comp.is_complete())
791         with lock:
792             eq(1, retval[0])
793         self._let_osds_back_up()
794
795         comp.wait_for_complete_and_cb()
796         eq(None, retval[0])
797         assert(comp.get_return_value() < 0)
798         eq(sys.getrefcount(comp), 2)
799
800         [i.remove() for i in self.ioctx.list_objects()]
801
802     def test_lock(self):
803         self.ioctx.lock_exclusive("foo", "lock", "locker", "desc_lock",
804                                   10000, 0)
805         assert_raises(ObjectExists,
806                       self.ioctx.lock_exclusive,
807                       "foo", "lock", "locker", "desc_lock", 10000, 0)
808         self.ioctx.unlock("foo", "lock", "locker")
809         assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker")
810
811         self.ioctx.lock_shared("foo", "lock", "locker1", "tag", "desc_lock",
812                                10000, 0)
813         self.ioctx.lock_shared("foo", "lock", "locker2", "tag", "desc_lock",
814                                10000, 0)
815         assert_raises(ObjectBusy,
816                       self.ioctx.lock_exclusive,
817                       "foo", "lock", "locker3", "desc_lock", 10000, 0)
818         self.ioctx.unlock("foo", "lock", "locker1")
819         self.ioctx.unlock("foo", "lock", "locker2")
820         assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker1")
821         assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker2")
822
823     def test_execute(self):
824         self.ioctx.write("foo", b"") # ensure object exists
825
826         ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"")
827         eq(buf, b"Hello, world!")
828
829         ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"nose")
830         eq(buf, b"Hello, nose!")
831
832     def test_aio_execute(self):
833         count = [0]
834         retval = [None]
835         lock = threading.Condition()
836         def cb(_, buf):
837             with lock:
838                 if retval[0] is None:
839                     retval[0] = buf
840                 count[0] += 1
841                 lock.notify()
842         self.ioctx.write("foo", b"") # ensure object exists
843
844         comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"", 32, cb, cb)
845         comp.wait_for_complete()
846         with lock:
847             while count[0] < 2:
848                 lock.wait()
849         eq(comp.get_return_value(), 13)
850         eq(retval[0], b"Hello, world!")
851
852         retval[0] = None
853         comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"nose", 32, cb, cb)
854         comp.wait_for_complete()
855         with lock:
856             while count[0] < 4:
857                 lock.wait()
858         eq(comp.get_return_value(), 12)
859         eq(retval[0], b"Hello, nose!")
860
861         [i.remove() for i in self.ioctx.list_objects()]
862
863     def test_applications(self):
864         cmd = {"prefix":"osd dump", "format":"json"}
865         ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'')
866         eq(ret, 0)
867         assert len(buf) > 0
868         release = json.loads(buf.decode("utf-8")).get("require_osd_release",
869                                                       None)
870         if not release or release[0] < 'l':
871             raise SkipTest
872
873         eq([], self.ioctx.application_list())
874
875         self.ioctx.application_enable("app1")
876         assert_raises(Error, self.ioctx.application_enable, "app2")
877         self.ioctx.application_enable("app2", True)
878
879         assert_raises(Error, self.ioctx.application_metadata_list, "dne")
880         eq([], self.ioctx.application_metadata_list("app1"))
881
882         assert_raises(Error, self.ioctx.application_metadata_set, "dne", "key",
883                       "key")
884         self.ioctx.application_metadata_set("app1", "key1", "val1")
885         self.ioctx.application_metadata_set("app1", "key2", "val2")
886         self.ioctx.application_metadata_set("app2", "key1", "val1")
887
888         eq([("key1", "val1"), ("key2", "val2")],
889            self.ioctx.application_metadata_list("app1"))
890
891         self.ioctx.application_metadata_remove("app1", "key1")
892         eq([("key2", "val2")], self.ioctx.application_metadata_list("app1"))
893
894 class TestObject(object):
895
896     def setUp(self):
897         self.rados = Rados(conffile='')
898         self.rados.connect()
899         self.rados.create_pool('test_pool')
900         assert self.rados.pool_exists('test_pool')
901         self.ioctx = self.rados.open_ioctx('test_pool')
902         self.ioctx.write('foo', b'bar')
903         self.object = Object(self.ioctx, 'foo')
904
905     def tearDown(self):
906         self.ioctx.close()
907         self.ioctx = None
908         self.rados.delete_pool('test_pool')
909         self.rados.shutdown()
910         self.rados = None
911
912     def test_read(self):
913         eq(self.object.read(3), b'bar')
914         eq(self.object.read(100), b'')
915
916     def test_seek(self):
917         self.object.write(b'blah')
918         self.object.seek(0)
919         eq(self.object.read(4), b'blah')
920         self.object.seek(1)
921         eq(self.object.read(3), b'lah')
922
923     def test_write(self):
924         self.object.write(b'barbaz')
925         self.object.seek(0)
926         eq(self.object.read(3), b'bar')
927         eq(self.object.read(3), b'baz')
928
929 class TestCommand(object):
930
931     def setUp(self):
932         self.rados = Rados(conffile='')
933         self.rados.connect()
934
935     def tearDown(self):
936         self.rados.shutdown()
937
938     def test_monmap_dump(self):
939
940         # check for success and some plain output with epoch in it
941         cmd = {"prefix":"mon dump"}
942         ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
943         eq(ret, 0)
944         assert len(buf) > 0
945         assert(b'epoch' in buf)
946
947         # JSON, and grab current epoch
948         cmd['format'] = 'json'
949         ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
950         eq(ret, 0)
951         assert len(buf) > 0
952         d = json.loads(buf.decode("utf-8"))
953         assert('epoch' in d)
954         epoch = d['epoch']
955
956         # assume epoch + 1000 does not exist; test for ENOENT
957         cmd['epoch'] = epoch + 1000
958         ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
959         eq(ret, -errno.ENOENT)
960         eq(len(buf), 0)
961         del cmd['epoch']
962
963         # send to specific target by name
964         target = d['mons'][0]['name']
965         print(target)
966         ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30,
967                                                 target=target)
968         eq(ret, 0)
969         assert len(buf) > 0
970         d = json.loads(buf.decode("utf-8"))
971         assert('epoch' in d)
972
973         # and by rank
974         target = d['mons'][0]['rank']
975         print(target)
976         ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30,
977                                                 target=target)
978         eq(ret, 0)
979         assert len(buf) > 0
980         d = json.loads(buf.decode("utf-8"))
981         assert('epoch' in d)
982
983     def test_osd_bench(self):
984         cmd = dict(prefix='bench', size=4096, count=8192)
985         ret, buf, err = self.rados.osd_command(0, json.dumps(cmd), b'',
986                                                timeout=30)
987         eq(ret, 0)
988         assert len(err) > 0
989         out = json.loads(err)
990         eq(out['blocksize'], cmd['size'])
991         eq(out['bytes_written'], cmd['count'])
992
993     def test_ceph_osd_pool_create_utf8(self):
994         if _python2:
995             # Use encoded bytestring
996             poolname = b"\351\273\205"
997         else:
998             poolname = "\u9ec5"
999
1000         cmd = {"prefix": "osd pool create", "pg_num": 16, "pool": poolname}
1001         ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
1002         eq(ret, 0)
1003         assert len(out) > 0
1004         eq(u"pool '\u9ec5' created", out)