Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / pybind / rados / rados.pyx
1 # cython: embedsignature=True
2 """
3 This module is a thin wrapper around librados.
4
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
9 method.
10 """
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
15
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
21
22 import sys
23 import threading
24 import time
25
26 from collections import Callable
27 from datetime import datetime
28 from functools import partial, wraps
29 from itertools import chain
30
31 # Are we running Python 2.x
32 if sys.version_info[0] < 3:
33     str_type = basestring
34 else:
35     str_type = str
36
37
38 cdef extern from "Python.h":
39     # These are in cpython/string.pxd, but use "object" types instead of
40     # PyObject*, which invokes assumptions in cpython that we need to
41     # legitimately break to implement zero-copy string buffers in Ioctx.read().
42     # This is valid use of the Python API and documented as a special case.
43     PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
44     char* PyBytes_AsString(PyObject *string) except NULL
45     int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
46     void PyEval_InitThreads()
47
48
49 cdef extern from "time.h":
50     ctypedef long int time_t
51     ctypedef long int suseconds_t
52
53
54 cdef extern from "sys/time.h":
55     cdef struct timeval:
56         time_t tv_sec
57         suseconds_t tv_usec
58
59
60 cdef extern from "rados/rados_types.h" nogil:
61     cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
62
63
64 cdef extern from "rados/librados.h" nogil:
65     enum:
66         _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
67         _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
68         _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
69         _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
70         _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
71         _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
72         _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
73
74
75     enum:
76         _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
77         _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
78         _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
79         _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
80         _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
81         _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
82         _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
83         _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
84         _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
85
86     cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
87
88     ctypedef void* rados_t
89     ctypedef void* rados_config_t
90     ctypedef void* rados_ioctx_t
91     ctypedef void* rados_xattrs_iter_t
92     ctypedef void* rados_omap_iter_t
93     ctypedef void* rados_list_ctx_t
94     ctypedef uint64_t rados_snap_t
95     ctypedef void *rados_write_op_t
96     ctypedef void *rados_read_op_t
97     ctypedef void *rados_completion_t
98     ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99     ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100                                           uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
101     ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
102                                           uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
103
104
105     cdef struct rados_cluster_stat_t:
106         uint64_t kb
107         uint64_t kb_used
108         uint64_t kb_avail
109         uint64_t num_objects
110
111     cdef struct rados_pool_stat_t:
112         uint64_t num_bytes
113         uint64_t num_kb
114         uint64_t num_objects
115         uint64_t num_object_clones
116         uint64_t num_object_copies
117         uint64_t num_objects_missing_on_primary
118         uint64_t num_objects_unfound
119         uint64_t num_objects_degraded
120         uint64_t num_rd
121         uint64_t num_rd_kb
122         uint64_t num_wr
123         uint64_t num_wr_kb
124
125     void rados_buffer_free(char *buf)
126
127     void rados_version(int *major, int *minor, int *extra)
128     int rados_create2(rados_t *pcluster, const char *const clustername,
129                       const char * const name, uint64_t flags)
130     int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131     int rados_connect(rados_t cluster)
132     void rados_shutdown(rados_t cluster)
133     int rados_conf_read_file(rados_t cluster, const char *path)
134     int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
135     int rados_conf_parse_env(rados_t cluster, const char *var)
136     int rados_conf_set(rados_t cluster, char *option, const char *value)
137     int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
138
139     int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
140     int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
141     int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
142     int rados_pool_create(rados_t cluster, const char *pool_name)
143     int rados_pool_create_with_auid(rados_t cluster, const char *pool_name, uint64_t auid)
144     int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
145     int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t auid, uint8_t crush_rule_num)
146     int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
147     int rados_pool_list(rados_t cluster, char *buf, size_t len)
148     int rados_pool_delete(rados_t cluster, const char *pool_name)
149     int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
150
151     int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
152     int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
153     int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
154     int rados_application_enable(rados_ioctx_t io, const char *app_name,
155                                  int force)
156     int rados_application_list(rados_ioctx_t io, char *values,
157                              size_t *values_len)
158     int rados_application_metadata_get(rados_ioctx_t io, const char *app_name,
159                                        const char *key, char *value,
160                                        size_t *value_len)
161     int rados_application_metadata_set(rados_ioctx_t io, const char *app_name,
162                                        const char *key, const char *value)
163     int rados_application_metadata_remove(rados_ioctx_t io,
164                                           const char *app_name, const char *key)
165     int rados_application_metadata_list(rados_ioctx_t io,
166                                         const char *app_name, char *keys,
167                                         size_t *key_len, char *values,
168                                         size_t *value_len)
169     int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
170     int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
171                           const char *inbuf, size_t inbuflen,
172                           char **outbuf, size_t *outbuflen,
173                           char **outs, size_t *outslen)
174     int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
175                           const char *inbuf, size_t inbuflen,
176                           char **outbuf, size_t *outbuflen,
177                           char **outs, size_t *outslen)
178     int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
179                                  const char *inbuf, size_t inbuflen,
180                                  char **outbuf, size_t *outbuflen,
181                                  char **outs, size_t *outslen)
182     int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
183                           const char *inbuf, size_t inbuflen,
184                           char **outbuf, size_t *outbuflen,
185                           char **outs, size_t *outslen)
186     int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
187                          const char *inbuf, size_t inbuflen,
188                          char **outbuf, size_t *outbuflen,
189                          char **outs, size_t *outslen)
190     int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
191     int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
192
193     int rados_wait_for_latest_osdmap(rados_t cluster)
194
195     int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
196     void rados_ioctx_destroy(rados_ioctx_t io)
197     int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
198     void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
199     void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
200
201     uint64_t rados_get_last_version(rados_ioctx_t io)
202     int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
203     int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
204     int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
205     int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
206     int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
207     int rados_remove(rados_ioctx_t io, const char *oid)
208     int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
209     int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
210     int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
211     int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
212     int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
213     int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
214     void rados_getxattrs_end(rados_xattrs_iter_t iter)
215
216     int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
217     int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
218     void rados_nobjects_list_close(rados_list_ctx_t ctx)
219
220     int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
221     int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
222     int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
223     int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
224     int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
225     void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
226     int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
227     int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
228
229     int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
230                              const char * cookie, const char * desc,
231                              timeval * duration, uint8_t flags)
232     int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
233                           const char * cookie, const char * tag, const char * desc,
234                           timeval * duration, uint8_t flags)
235     int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
236
237     rados_write_op_t rados_create_write_op()
238     void rados_release_write_op(rados_write_op_t write_op)
239
240     rados_read_op_t rados_create_read_op()
241     void rados_release_read_op(rados_read_op_t read_op)
242
243     int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
244     void rados_aio_release(rados_completion_t c)
245     int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
246     int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
247     int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
248     int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
249     int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
250     int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
251     int rados_aio_flush(rados_ioctx_t io)
252
253     int rados_aio_get_return_value(rados_completion_t c)
254     int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
255     int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
256     int rados_aio_wait_for_complete(rados_completion_t c)
257     int rados_aio_wait_for_safe(rados_completion_t c)
258     int rados_aio_is_complete(rados_completion_t c)
259     int rados_aio_is_safe(rados_completion_t c)
260
261     int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
262                    const char * in_buf, size_t in_len, char * buf, size_t out_len)
263     int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
264                        const char * in_buf, size_t in_len, char * buf, size_t out_len)
265
266     int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
267     int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
268     void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
269     void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
270     void rados_write_op_omap_clear(rados_write_op_t write_op)
271     void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
272
273     void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
274     void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
275     void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
276     void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
277     void rados_write_op_remove(rados_write_op_t write_op)
278     void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
279     void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
280
281     void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
282     void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
283     void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
284     int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
285     int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
286     void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
287     int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
288     void rados_omap_get_end(rados_omap_iter_t iter)
289
290
291 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
292 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
293 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
294 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
295 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
296 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
297 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
298
299 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
300
301 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
302 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
303 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
304 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
305 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
306 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
307 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
308
309 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
310
311 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
312 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
313
314 ANONYMOUS_AUID = 0xffffffffffffffff
315 ADMIN_AUID = 0
316
317
318 class Error(Exception):
319     """ `Error` class, derived from `Exception` """
320     pass
321
322
323 class InvalidArgumentError(Error):
324     pass
325
326
327 class OSError(Error):
328     """ `OSError` class, derived from `Error` """
329     def __init__(self, message, errno=None):
330         super(OSError, self).__init__(message)
331         self.errno = errno
332
333     def __str__(self):
334         msg = super(OSError, self).__str__()
335         if self.errno is None:
336             return msg
337         return '[errno {0}] {1}'.format(self.errno, msg)
338
339     def __reduce__(self):
340         return (self.__class__, (self.message, self.errno))
341
342 class InterruptedOrTimeoutError(OSError):
343     """ `InterruptedOrTimeoutError` class, derived from `OSError` """
344     pass
345
346
347 class PermissionError(OSError):
348     """ `PermissionError` class, derived from `OSError` """
349     pass
350
351
352 class PermissionDeniedError(OSError):
353     """ deal with EACCES related. """
354     pass
355
356
357 class ObjectNotFound(OSError):
358     """ `ObjectNotFound` class, derived from `OSError` """
359     pass
360
361
362 class NoData(OSError):
363     """ `NoData` class, derived from `OSError` """
364     pass
365
366
367 class ObjectExists(OSError):
368     """ `ObjectExists` class, derived from `OSError` """
369     pass
370
371
372 class ObjectBusy(OSError):
373     """ `ObjectBusy` class, derived from `IOError` """
374     pass
375
376
377 class IOError(OSError):
378     """ `ObjectBusy` class, derived from `OSError` """
379     pass
380
381
382 class NoSpace(OSError):
383     """ `NoSpace` class, derived from `OSError` """
384     pass
385
386
387 class RadosStateError(Error):
388     """ `RadosStateError` class, derived from `Error` """
389     pass
390
391
392 class IoctxStateError(Error):
393     """ `IoctxStateError` class, derived from `Error` """
394     pass
395
396
397 class ObjectStateError(Error):
398     """ `ObjectStateError` class, derived from `Error` """
399     pass
400
401
402 class LogicError(Error):
403     """ `` class, derived from `Error` """
404     pass
405
406
407 class TimedOut(OSError):
408     """ `TimedOut` class, derived from `OSError` """
409     pass
410
411
412 IF UNAME_SYSNAME == "FreeBSD":
413     cdef errno_to_exception = {
414         errno.EPERM     : PermissionError,
415         errno.ENOENT    : ObjectNotFound,
416         errno.EIO       : IOError,
417         errno.ENOSPC    : NoSpace,
418         errno.EEXIST    : ObjectExists,
419         errno.EBUSY     : ObjectBusy,
420         errno.ENOATTR   : NoData,
421         errno.EINTR     : InterruptedOrTimeoutError,
422         errno.ETIMEDOUT : TimedOut,
423         errno.EACCES    : PermissionDeniedError,
424         errno.EINVAL    : InvalidArgumentError,
425     }
426 ELSE:
427     cdef errno_to_exception = {
428         errno.EPERM     : PermissionError,
429         errno.ENOENT    : ObjectNotFound,
430         errno.EIO       : IOError,
431         errno.ENOSPC    : NoSpace,
432         errno.EEXIST    : ObjectExists,
433         errno.EBUSY     : ObjectBusy,
434         errno.ENODATA   : NoData,
435         errno.EINTR     : InterruptedOrTimeoutError,
436         errno.ETIMEDOUT : TimedOut,
437         errno.EACCES    : PermissionDeniedError,
438         errno.EINVAL    : InvalidArgumentError,
439     }
440
441
442 cdef make_ex(ret, msg):
443     """
444     Translate a librados return code into an exception.
445
446     :param ret: the return code
447     :type ret: int
448     :param msg: the error message to use
449     :type msg: str
450     :returns: a subclass of :class:`Error`
451     """
452     ret = abs(ret)
453     if ret in errno_to_exception:
454         return errno_to_exception[ret](msg, errno=ret)
455     else:
456         return OSError(msg, errno=ret)
457
458
459 # helper to specify an optional argument, where in addition to `cls`, `None`
460 # is also acceptable
461 def opt(cls):
462     return (cls, None)
463
464
465 # validate argument types of an instance method
466 # kwargs is an un-ordered dict, so use args instead
467 def requires(*types):
468     def is_type_of(v, t):
469         if t is None:
470             return v is None
471         else:
472             return isinstance(v, t)
473
474     def check_type(val, arg_name, arg_type):
475         if isinstance(arg_type, tuple):
476             if any(is_type_of(val, t) for t in arg_type):
477                 return
478             type_names = ' or '.join('None' if t is None else t.__name__
479                                      for t in arg_type)
480             raise TypeError('%s must be %s' % (arg_name, type_names))
481         else:
482             if is_type_of(val, arg_type):
483                 return
484             assert(arg_type is not None)
485             raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
486
487     def wrapper(f):
488         # FIXME(sileht): this stop with
489         # AttributeError: 'method_descriptor' object has no attribute '__module__'
490         # @wraps(f)
491         def validate_func(*args, **kwargs):
492             # ignore the `self` arg
493             pos_args = zip(args[1:], types)
494             named_args = ((kwargs[name], (name, spec)) for name, spec in types
495                           if name in kwargs)
496             for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
497                 check_type(arg_val, arg_name, arg_type)
498             return f(*args, **kwargs)
499         return validate_func
500     return wrapper
501
502
503 def cstr(val, name, encoding="utf-8", opt=False):
504     """
505     Create a byte string from a Python string
506
507     :param basestring val: Python string
508     :param str name: Name of the string parameter, for exceptions
509     :param str encoding: Encoding to use
510     :param bool opt: If True, None is allowed
511     :rtype: bytes
512     :raises: :class:`InvalidArgument`
513     """
514     if opt and val is None:
515         return None
516     if isinstance(val, bytes):
517         return val
518     elif isinstance(val, unicode):
519         return val.encode(encoding)
520     else:
521         raise TypeError('%s must be a string' % name)
522
523
524 def cstr_list(list_str, name, encoding="utf-8"):
525     return [cstr(s, name) for s in list_str]
526
527
528 def decode_cstr(val, encoding="utf-8"):
529     """
530     Decode a byte string into a Python string.
531
532     :param bytes val: byte string
533     :rtype: unicode or None
534     """
535     if val is None:
536         return None
537
538     return val.decode(encoding)
539
540
541 cdef char* opt_str(s) except? NULL:
542     if s is None:
543         return NULL
544     return s
545
546
547 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
548     cdef void *ret = realloc(ptr, size)
549     if ret == NULL:
550         raise MemoryError("realloc failed")
551     return ret
552
553
554 cdef size_t * to_csize_t_array(list_int):
555     cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
556     if ret == NULL:
557         raise MemoryError("malloc failed")
558     for i in xrange(len(list_int)):
559         ret[i] = <size_t>list_int[i]
560     return ret
561
562
563 cdef char ** to_bytes_array(list_bytes):
564     cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
565     if ret == NULL:
566         raise MemoryError("malloc failed")
567     for i in xrange(len(list_bytes)):
568         ret[i] = <char *>list_bytes[i]
569     return ret
570
571
572
573 cdef int __monitor_callback(void *arg, const char *line, const char *who,
574                              uint64_t sec, uint64_t nsec, uint64_t seq,
575                              const char *level, const char *msg) with gil:
576     cdef object cb_info = <object>arg
577     cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
578     return 0
579
580 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
581                              const char *who,
582                              const char *name,
583                              uint64_t sec, uint64_t nsec, uint64_t seq,
584                              const char *level, const char *msg) with gil:
585     cdef object cb_info = <object>arg
586     cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
587     return 0
588
589
590 class Version(object):
591     """ Version information """
592     def __init__(self, major, minor, extra):
593         self.major = major
594         self.minor = minor
595         self.extra = extra
596
597     def __str__(self):
598         return "%d.%d.%d" % (self.major, self.minor, self.extra)
599
600
601 cdef class Rados(object):
602     """This class wraps librados functions"""
603     # NOTE(sileht): attributes declared in .pyd
604
605     def __init__(self, *args, **kwargs):
606         PyEval_InitThreads()
607         self.__setup(*args, **kwargs)
608
609     @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
610               ('conffile', opt(str_type)))
611     def __setup(self, rados_id=None, name=None, clustername=None,
612                 conf_defaults=None, conffile=None, conf=None, flags=0,
613                 context=None):
614         self.monitor_callback = None
615         self.monitor_callback2 = None
616         self.parsed_args = []
617         self.conf_defaults = conf_defaults
618         self.conffile = conffile
619         self.rados_id = rados_id
620
621         if rados_id and name:
622             raise Error("Rados(): can't supply both rados_id and name")
623         elif rados_id:
624             name = 'client.' + rados_id
625         elif name is None:
626             name = 'client.admin'
627         if clustername is None:
628             clustername = ''
629
630         name = cstr(name, 'name')
631         clustername = cstr(clustername, 'clustername')
632         cdef:
633             char *_name = name
634             char *_clustername = clustername
635             int _flags = flags
636             int ret
637
638         if context:
639             # Unpack void* (aka rados_config_t) from capsule
640             rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
641             with nogil:
642                 ret = rados_create_with_context(&self.cluster, rados_config)
643         else:
644             with nogil:
645                 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
646         if ret != 0:
647             raise Error("rados_initialize failed with error code: %d" % ret)
648
649         self.state = "configuring"
650         # order is important: conf_defaults, then conffile, then conf
651         if conf_defaults:
652             for key, value in conf_defaults.items():
653                 self.conf_set(key, value)
654         if conffile is not None:
655             # read the default conf file when '' is given
656             if conffile == '':
657                 conffile = None
658             self.conf_read_file(conffile)
659         if conf:
660             for key, value in conf.items():
661                 self.conf_set(key, value)
662
663     def require_state(self, *args):
664         """
665         Checks if the Rados object is in a special state
666
667         :raises: RadosStateError
668         """
669         if self.state in args:
670             return
671         raise RadosStateError("You cannot perform that operation on a \
672 Rados object in state %s." % self.state)
673
674     def shutdown(self):
675         """
676         Disconnects from the cluster.  Call this explicitly when a
677         Rados.connect()ed object is no longer used.
678         """
679         if self.state != "shutdown":
680             with nogil:
681                 rados_shutdown(self.cluster)
682             self.state = "shutdown"
683
684     def __enter__(self):
685         self.connect()
686         return self
687
688     def __exit__(self, type_, value, traceback):
689         self.shutdown()
690         return False
691
692     def version(self):
693         """
694         Get the version number of the ``librados`` C library.
695
696         :returns: a tuple of ``(major, minor, extra)`` components of the
697                   librados version
698         """
699         cdef int major = 0
700         cdef int minor = 0
701         cdef int extra = 0
702         with nogil:
703             rados_version(&major, &minor, &extra)
704         return Version(major, minor, extra)
705
706     @requires(('path', opt(str_type)))
707     def conf_read_file(self, path=None):
708         """
709         Configure the cluster handle using a Ceph config file.
710
711         :param path: path to the config file
712         :type path: str
713         """
714         self.require_state("configuring", "connected")
715         path = cstr(path, 'path', opt=True)
716         cdef:
717             char *_path = opt_str(path)
718         with nogil:
719             ret = rados_conf_read_file(self.cluster, _path)
720         if ret != 0:
721             raise make_ex(ret, "error calling conf_read_file")
722
723     def conf_parse_argv(self, args):
724         """
725         Parse known arguments from args, and remove; returned
726         args contain only those unknown to ceph
727         """
728         self.require_state("configuring", "connected")
729         if not args:
730             return
731
732         cargs = cstr_list(args, 'args')
733         cdef:
734             int _argc = len(args)
735             char **_argv = to_bytes_array(cargs)
736             char **_remargv = NULL
737
738         try:
739             _remargv = <char **>malloc(_argc * sizeof(char *))
740             with nogil:
741                 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
742                                                       <const char**>_argv,
743                                                       <const char**>_remargv)
744             if ret:
745                 raise make_ex(ret, "error calling conf_parse_argv_remainder")
746
747             # _remargv was allocated with fixed argc; collapse return
748             # list to eliminate any missing args
749             retargs = [decode_cstr(a) for a in _remargv[:_argc]
750                        if a != NULL]
751             self.parsed_args = args
752             return retargs
753         finally:
754             free(_argv)
755             free(_remargv)
756
757     def conf_parse_env(self, var='CEPH_ARGS'):
758         """
759         Parse known arguments from an environment variable, normally
760         CEPH_ARGS.
761         """
762         self.require_state("configuring", "connected")
763         if not var:
764             return
765
766         var = cstr(var, 'var')
767         cdef:
768             char *_var = var
769         with nogil:
770             ret = rados_conf_parse_env(self.cluster, _var)
771         if ret != 0:
772             raise make_ex(ret, "error calling conf_parse_env")
773
774     @requires(('option', str_type))
775     def conf_get(self, option):
776         """
777         Get the value of a configuration option
778
779         :param option: which option to read
780         :type option: str
781
782         :returns: str - value of the option or None
783         :raises: :class:`TypeError`
784         """
785         self.require_state("configuring", "connected")
786         option = cstr(option, 'option')
787         cdef:
788             char *_option = option
789             size_t length = 20
790             char *ret_buf = NULL
791
792         try:
793             while True:
794                 ret_buf = <char *>realloc_chk(ret_buf, length)
795                 with nogil:
796                     ret = rados_conf_get(self.cluster, _option, ret_buf, length)
797                 if ret == 0:
798                     return decode_cstr(ret_buf)
799                 elif ret == -errno.ENAMETOOLONG:
800                     length = length * 2
801                 elif ret == -errno.ENOENT:
802                     return None
803                 else:
804                     raise make_ex(ret, "error calling conf_get")
805         finally:
806             free(ret_buf)
807
808     @requires(('option', str_type), ('val', str_type))
809     def conf_set(self, option, val):
810         """
811         Set the value of a configuration option
812
813         :param option: which option to set
814         :type option: str
815         :param option: value of the option
816         :type option: str
817
818         :raises: :class:`TypeError`, :class:`ObjectNotFound`
819         """
820         self.require_state("configuring", "connected")
821         option = cstr(option, 'option')
822         val = cstr(val, 'val')
823         cdef:
824             char *_option = option
825             char *_val = val
826
827         with nogil:
828             ret = rados_conf_set(self.cluster, _option, _val)
829         if ret != 0:
830             raise make_ex(ret, "error calling conf_set")
831
832     def ping_monitor(self, mon_id):
833         """
834         Ping a monitor to assess liveness
835
836         May be used as a simply way to assess liveness, or to obtain
837         information about the monitor in a simple way even in the
838         absence of quorum.
839
840         :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
841         :type mon_id: str
842         :returns: the string reply from the monitor
843         """
844
845         self.require_state("configuring", "connected")
846
847         mon_id = cstr(mon_id, 'mon_id')
848         cdef:
849             char *_mon_id = mon_id
850             size_t outstrlen = 0
851             char *outstr
852
853         with nogil:
854             ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
855
856         if ret != 0:
857             raise make_ex(ret, "error calling ping_monitor")
858
859         if outstrlen:
860             my_outstr = outstr[:outstrlen]
861             rados_buffer_free(outstr)
862             return decode_cstr(my_outstr)
863
864     def connect(self, timeout=0):
865         """
866         Connect to the cluster.  Use shutdown() to release resources.
867         """
868         self.require_state("configuring")
869         # NOTE(sileht): timeout was supported by old python API,
870         # but this is not something available in C API, so ignore
871         # for now and remove it later
872         with nogil:
873             ret = rados_connect(self.cluster)
874         if ret != 0:
875             raise make_ex(ret, "error connecting to the cluster")
876         self.state = "connected"
877
878     def get_cluster_stats(self):
879         """
880         Read usage info about the cluster
881
882         This tells you total space, space used, space available, and number
883         of objects. These are not updated immediately when data is written,
884         they are eventually consistent.
885
886         :returns: dict - contains the following keys:
887
888             - ``kb`` (int) - total space
889
890             - ``kb_used`` (int) - space used
891
892             - ``kb_avail`` (int) - free space available
893
894             - ``num_objects`` (int) - number of objects
895
896         """
897         cdef:
898             rados_cluster_stat_t stats
899
900         with nogil:
901             ret = rados_cluster_stat(self.cluster, &stats)
902
903         if ret < 0:
904             raise make_ex(
905                 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
906         return {'kb': stats.kb,
907                 'kb_used': stats.kb_used,
908                 'kb_avail': stats.kb_avail,
909                 'num_objects': stats.num_objects}
910
911     @requires(('pool_name', str_type))
912     def pool_exists(self, pool_name):
913         """
914         Checks if a given pool exists.
915
916         :param pool_name: name of the pool to check
917         :type pool_name: str
918
919         :raises: :class:`TypeError`, :class:`Error`
920         :returns: bool - whether the pool exists, false otherwise.
921         """
922         self.require_state("connected")
923
924         pool_name = cstr(pool_name, 'pool_name')
925         cdef:
926             char *_pool_name = pool_name
927
928         with nogil:
929             ret = rados_pool_lookup(self.cluster, _pool_name)
930         if ret >= 0:
931             return True
932         elif ret == -errno.ENOENT:
933             return False
934         else:
935             raise make_ex(ret, "error looking up pool '%s'" % pool_name)
936
937     @requires(('pool_name', str_type))
938     def pool_lookup(self, pool_name):
939         """
940         Returns a pool's ID based on its name.
941
942         :param pool_name: name of the pool to look up
943         :type pool_name: str
944
945         :raises: :class:`TypeError`, :class:`Error`
946         :returns: int - pool ID, or None if it doesn't exist
947         """
948         self.require_state("connected")
949         pool_name = cstr(pool_name, 'pool_name')
950         cdef:
951             char *_pool_name = pool_name
952
953         with nogil:
954             ret = rados_pool_lookup(self.cluster, _pool_name)
955         if ret >= 0:
956             return int(ret)
957         elif ret == -errno.ENOENT:
958             return None
959         else:
960             raise make_ex(ret, "error looking up pool '%s'" % pool_name)
961
962     @requires(('pool_id', int))
963     def pool_reverse_lookup(self, pool_id):
964         """
965         Returns a pool's name based on its ID.
966
967         :param pool_id: ID of the pool to look up
968         :type pool_id: int
969
970         :raises: :class:`TypeError`, :class:`Error`
971         :returns: string - pool name, or None if it doesn't exist
972         """
973         self.require_state("connected")
974         cdef:
975             int64_t _pool_id = pool_id
976             size_t size = 512
977             char *name = NULL
978
979         try:
980             while True:
981                 name = <char *>realloc_chk(name, size)
982                 with nogil:
983                     ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
984                 if ret >= 0:
985                     break
986                 elif ret != -errno.ERANGE and size <= 4096:
987                     size *= 2
988                 elif ret == -errno.ENOENT:
989                     return None
990                 elif ret < 0:
991                     raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
992
993             return decode_cstr(name)
994
995         finally:
996             free(name)
997
998     @requires(('pool_name', str_type), ('auid', opt(int)), ('crush_rule', opt(int)))
999     def create_pool(self, pool_name, auid=None, crush_rule=None):
1000         """
1001         Create a pool:
1002         - with default settings: if auid=None and crush_rule=None
1003         - owned by a specific auid: auid given and crush_rule=None
1004         - with a specific CRUSH rule: if auid=None and crush_rule given
1005         - with a specific CRUSH rule and auid: if auid and crush_rule given
1006
1007         :param pool_name: name of the pool to create
1008         :type pool_name: str
1009         :param auid: the id of the owner of the new pool
1010         :type auid: int
1011         :param crush_rule: rule to use for placement in the new pool
1012         :type crush_rule: int
1013
1014         :raises: :class:`TypeError`, :class:`Error`
1015         """
1016         self.require_state("connected")
1017
1018         pool_name = cstr(pool_name, 'pool_name')
1019         cdef:
1020             char *_pool_name = pool_name
1021             uint8_t _crush_rule
1022             uint64_t _auid
1023
1024         if auid is None and crush_rule is None:
1025             with nogil:
1026                 ret = rados_pool_create(self.cluster, _pool_name)
1027         elif auid is None:
1028             _crush_rule = crush_rule
1029             with nogil:
1030                 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
1031         elif crush_rule is None:
1032             _auid = auid
1033             with nogil:
1034                 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
1035         else:
1036             _auid = auid
1037             _crush_rule = crush_rule
1038             with nogil:
1039                 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
1040         if ret < 0:
1041             raise make_ex(ret, "error creating pool '%s'" % pool_name)
1042
1043     @requires(('pool_id', int))
1044     def get_pool_base_tier(self, pool_id):
1045         """
1046         Get base pool
1047
1048         :returns: base pool, or pool_id if tiering is not configured for the pool
1049         """
1050         self.require_state("connected")
1051         cdef:
1052             int64_t base_tier = 0
1053             int64_t _pool_id = pool_id
1054
1055         with nogil:
1056             ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1057         if ret < 0:
1058             raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1059         return int(base_tier)
1060
1061     @requires(('pool_name', str_type))
1062     def delete_pool(self, pool_name):
1063         """
1064         Delete a pool and all data inside it.
1065
1066         The pool is removed from the cluster immediately,
1067         but the actual data is deleted in the background.
1068
1069         :param pool_name: name of the pool to delete
1070         :type pool_name: str
1071
1072         :raises: :class:`TypeError`, :class:`Error`
1073         """
1074         self.require_state("connected")
1075
1076         pool_name = cstr(pool_name, 'pool_name')
1077         cdef:
1078             char *_pool_name = pool_name
1079
1080         with nogil:
1081             ret = rados_pool_delete(self.cluster, _pool_name)
1082         if ret < 0:
1083             raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1084
1085     @requires(('pool_id', int))
1086     def get_inconsistent_pgs(self, pool_id):
1087         """
1088         List inconsistent placement groups in the given pool
1089
1090         :param pool_id: ID of the pool in which PGs are listed
1091         :type pool_id: int
1092         :returns: list - inconsistent placement groups
1093         """
1094         self.require_state("connected")
1095         cdef:
1096             int64_t pool = pool_id
1097             size_t size = 512
1098             char *pgs = NULL
1099
1100         try:
1101             while True:
1102                 pgs = <char *>realloc_chk(pgs, size);
1103                 with nogil:
1104                     ret = rados_inconsistent_pg_list(self.cluster, pool,
1105                                                      pgs, size)
1106                 if ret > <int>size:
1107                     size *= 2
1108                 elif ret >= 0:
1109                     break
1110                 else:
1111                     raise make_ex(ret, "error calling inconsistent_pg_list")
1112             return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1113         finally:
1114             free(pgs)
1115
1116     def list_pools(self):
1117         """
1118         Gets a list of pool names.
1119
1120         :returns: list - of pool names.
1121         """
1122         self.require_state("connected")
1123         cdef:
1124             size_t size = 512
1125             char *c_names = NULL
1126
1127         try:
1128             while True:
1129                 c_names = <char *>realloc_chk(c_names, size)
1130                 with nogil:
1131                     ret = rados_pool_list(self.cluster, c_names, size)
1132                 if ret > <int>size:
1133                     size *= 2
1134                 elif ret >= 0:
1135                     break
1136             return [name for name in decode_cstr(c_names[:ret]).split('\0')
1137                     if name]
1138         finally:
1139             free(c_names)
1140
1141     def get_fsid(self):
1142         """
1143         Get the fsid of the cluster as a hexadecimal string.
1144
1145         :raises: :class:`Error`
1146         :returns: str - cluster fsid
1147         """
1148         self.require_state("connected")
1149         cdef:
1150             char *ret_buf
1151             size_t buf_len = 37
1152             PyObject* ret_s = NULL
1153
1154         ret_s = PyBytes_FromStringAndSize(NULL, buf_len)
1155         try:
1156             ret_buf = PyBytes_AsString(ret_s)
1157             with nogil:
1158                 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1159             if ret < 0:
1160                 raise make_ex(ret, "error getting cluster fsid")
1161             if ret != <int>buf_len:
1162                 _PyBytes_Resize(&ret_s, ret)
1163             return <object>ret_s
1164         finally:
1165             # We DECREF unconditionally: the cast to object above will have
1166             # INCREFed if necessary. This also takes care of exceptions,
1167             # including if _PyString_Resize fails (that will free the string
1168             # itself and set ret_s to NULL, hence XDECREF).
1169             ref.Py_XDECREF(ret_s)
1170
1171     @requires(('ioctx_name', str_type))
1172     def open_ioctx(self, ioctx_name):
1173         """
1174         Create an io context
1175
1176         The io context allows you to perform operations within a particular
1177         pool.
1178
1179         :param ioctx_name: name of the pool
1180         :type ioctx_name: str
1181
1182         :raises: :class:`TypeError`, :class:`Error`
1183         :returns: Ioctx - Rados Ioctx object
1184         """
1185         self.require_state("connected")
1186         ioctx_name = cstr(ioctx_name, 'ioctx_name')
1187         cdef:
1188             rados_ioctx_t ioctx
1189             char *_ioctx_name = ioctx_name
1190         with nogil:
1191             ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1192         if ret < 0:
1193             raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1194         io = Ioctx(ioctx_name)
1195         io.io = ioctx
1196         return io
1197
1198     def mon_command(self, cmd, inbuf, timeout=0, target=None):
1199         """
1200         mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1201         returns (int ret, string outbuf, string outs)
1202         """
1203         # NOTE(sileht): timeout is ignored because C API doesn't provide
1204         # timeout argument, but we keep it for backward compat with old python binding
1205
1206         self.require_state("connected")
1207         cmd = cstr_list(cmd, 'c')
1208
1209         if isinstance(target, int):
1210         # NOTE(sileht): looks weird but test_monmap_dump pass int
1211             target = str(target)
1212
1213         target = cstr(target, 'target', opt=True)
1214         inbuf = cstr(inbuf, 'inbuf')
1215
1216         cdef:
1217             char *_target = opt_str(target)
1218             char **_cmd = to_bytes_array(cmd)
1219             size_t _cmdlen = len(cmd)
1220
1221             char *_inbuf = inbuf
1222             size_t _inbuf_len = len(inbuf)
1223
1224             char *_outbuf
1225             size_t _outbuf_len
1226             char *_outs
1227             size_t _outs_len
1228
1229         try:
1230             if target:
1231                 with nogil:
1232                     ret = rados_mon_command_target(self.cluster, _target,
1233                                                 <const char **>_cmd, _cmdlen,
1234                                                 <const char*>_inbuf, _inbuf_len,
1235                                                 &_outbuf, &_outbuf_len,
1236                                                 &_outs, &_outs_len)
1237             else:
1238                 with nogil:
1239                     ret = rados_mon_command(self.cluster,
1240                                             <const char **>_cmd, _cmdlen,
1241                                             <const char*>_inbuf, _inbuf_len,
1242                                             &_outbuf, &_outbuf_len,
1243                                             &_outs, &_outs_len)
1244
1245             my_outs = decode_cstr(_outs[:_outs_len])
1246             my_outbuf = _outbuf[:_outbuf_len]
1247             if _outs_len:
1248                 rados_buffer_free(_outs)
1249             if _outbuf_len:
1250                 rados_buffer_free(_outbuf)
1251             return (ret, my_outbuf, my_outs)
1252         finally:
1253             free(_cmd)
1254
1255     def osd_command(self, osdid, cmd, inbuf, timeout=0):
1256         """
1257         osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1258         returns (int ret, string outbuf, string outs)
1259         """
1260         # NOTE(sileht): timeout is ignored because C API doesn't provide
1261         # timeout argument, but we keep it for backward compat with old python binding
1262         self.require_state("connected")
1263
1264         cmd = cstr_list(cmd, 'cmd')
1265         inbuf = cstr(inbuf, 'inbuf')
1266
1267         cdef:
1268             int _osdid = osdid
1269             char **_cmd = to_bytes_array(cmd)
1270             size_t _cmdlen = len(cmd)
1271
1272             char *_inbuf = inbuf
1273             size_t _inbuf_len = len(inbuf)
1274
1275             char *_outbuf
1276             size_t _outbuf_len
1277             char *_outs
1278             size_t _outs_len
1279
1280         try:
1281             with nogil:
1282                 ret = rados_osd_command(self.cluster, _osdid,
1283                                         <const char **>_cmd, _cmdlen,
1284                                         <const char*>_inbuf, _inbuf_len,
1285                                         &_outbuf, &_outbuf_len,
1286                                         &_outs, &_outs_len)
1287
1288             my_outs = decode_cstr(_outs[:_outs_len])
1289             my_outbuf = _outbuf[:_outbuf_len]
1290             if _outs_len:
1291                 rados_buffer_free(_outs)
1292             if _outbuf_len:
1293                 rados_buffer_free(_outbuf)
1294             return (ret, my_outbuf, my_outs)
1295         finally:
1296             free(_cmd)
1297
1298     def mgr_command(self, cmd, inbuf, timeout=0):
1299         """
1300         returns (int ret, string outbuf, string outs)
1301         """
1302         # NOTE(sileht): timeout is ignored because C API doesn't provide
1303         # timeout argument, but we keep it for backward compat with old python binding
1304         self.require_state("connected")
1305
1306         cmd = cstr_list(cmd, 'cmd')
1307         inbuf = cstr(inbuf, 'inbuf')
1308
1309         cdef:
1310             char **_cmd = to_bytes_array(cmd)
1311             size_t _cmdlen = len(cmd)
1312
1313             char *_inbuf = inbuf
1314             size_t _inbuf_len = len(inbuf)
1315
1316             char *_outbuf
1317             size_t _outbuf_len
1318             char *_outs
1319             size_t _outs_len
1320
1321         try:
1322             with nogil:
1323                 ret = rados_mgr_command(self.cluster,
1324                                         <const char **>_cmd, _cmdlen,
1325                                         <const char*>_inbuf, _inbuf_len,
1326                                         &_outbuf, &_outbuf_len,
1327                                         &_outs, &_outs_len)
1328
1329             my_outs = decode_cstr(_outs[:_outs_len])
1330             my_outbuf = _outbuf[:_outbuf_len]
1331             if _outs_len:
1332                 rados_buffer_free(_outs)
1333             if _outbuf_len:
1334                 rados_buffer_free(_outbuf)
1335             return (ret, my_outbuf, my_outs)
1336         finally:
1337             free(_cmd)
1338
1339     def pg_command(self, pgid, cmd, inbuf, timeout=0):
1340         """
1341         pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1342         returns (int ret, string outbuf, string outs)
1343         """
1344         # NOTE(sileht): timeout is ignored because C API doesn't provide
1345         # timeout argument, but we keep it for backward compat with old python binding
1346         self.require_state("connected")
1347
1348         pgid = cstr(pgid, 'pgid')
1349         cmd = cstr_list(cmd, 'cmd')
1350         inbuf = cstr(inbuf, 'inbuf')
1351
1352         cdef:
1353             char *_pgid = pgid
1354             char **_cmd = to_bytes_array(cmd)
1355             size_t _cmdlen = len(cmd)
1356
1357             char *_inbuf = inbuf
1358             size_t _inbuf_len = len(inbuf)
1359
1360             char *_outbuf
1361             size_t _outbuf_len
1362             char *_outs
1363             size_t _outs_len
1364
1365         try:
1366             with nogil:
1367                 ret = rados_pg_command(self.cluster, _pgid,
1368                                        <const char **>_cmd, _cmdlen,
1369                                        <const char *>_inbuf, _inbuf_len,
1370                                        &_outbuf, &_outbuf_len,
1371                                        &_outs, &_outs_len)
1372
1373             my_outs = decode_cstr(_outs[:_outs_len])
1374             my_outbuf = _outbuf[:_outbuf_len]
1375             if _outs_len:
1376                 rados_buffer_free(_outs)
1377             if _outbuf_len:
1378                 rados_buffer_free(_outbuf)
1379             return (ret, my_outbuf, my_outs)
1380         finally:
1381             free(_cmd)
1382
1383     def wait_for_latest_osdmap(self):
1384         self.require_state("connected")
1385         with nogil:
1386             ret = rados_wait_for_latest_osdmap(self.cluster)
1387         return ret
1388
1389     def blacklist_add(self, client_address, expire_seconds=0):
1390         """
1391         Blacklist a client from the OSDs
1392
1393         :param client_address: client address
1394         :type client_address: str
1395         :param expire_seconds: number of seconds to blacklist
1396         :type expire_seconds: int
1397
1398         :raises: :class:`Error`
1399         """
1400         self.require_state("connected")
1401         client_address =  cstr(client_address, 'client_address')
1402         cdef:
1403             uint32_t _expire_seconds = expire_seconds
1404             char *_client_address = client_address
1405
1406         with nogil:
1407             ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1408         if ret < 0:
1409             raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1410
1411     def monitor_log(self, level, callback, arg):
1412         if level not in MONITOR_LEVELS:
1413             raise LogicError("invalid monitor level " + level)
1414         if callback is not None and not callable(callback):
1415             raise LogicError("callback must be a callable function or None")
1416
1417         level = cstr(level, 'level')
1418         cdef char *_level = level
1419
1420         if callback is None:
1421             with nogil:
1422                 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1423             self.monitor_callback = None
1424             self.monitor_callback2 = None
1425             return
1426
1427         cb = (callback, arg)
1428         cdef PyObject* _arg = <PyObject*>cb
1429         with nogil:
1430             r = rados_monitor_log(self.cluster, <const char*>_level,
1431                                   <rados_log_callback_t>&__monitor_callback, _arg)
1432
1433         if r:
1434             raise make_ex(r, 'error calling rados_monitor_log')
1435         # NOTE(sileht): Prevents the callback method from being garbage collected
1436         self.monitor_callback = cb
1437         self.monitor_callback2 = None
1438
1439     def monitor_log2(self, level, callback, arg):
1440         if level not in MONITOR_LEVELS:
1441             raise LogicError("invalid monitor level " + level)
1442         if callback is not None and not callable(callback):
1443             raise LogicError("callback must be a callable function or None")
1444
1445         level = cstr(level, 'level')
1446         cdef char *_level = level
1447
1448         if callback is None:
1449             with nogil:
1450                 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1451             self.monitor_callback = None
1452             self.monitor_callback2 = None
1453             return
1454
1455         cb = (callback, arg)
1456         cdef PyObject* _arg = <PyObject*>cb
1457         with nogil:
1458             r = rados_monitor_log2(self.cluster, <const char*>_level,
1459                                   <rados_log_callback2_t>&__monitor_callback2, _arg)
1460
1461         if r:
1462             raise make_ex(r, 'error calling rados_monitor_log')
1463         # NOTE(sileht): Prevents the callback method from being garbage collected
1464         self.monitor_callback = None
1465         self.monitor_callback2 = cb
1466
1467
1468 cdef class OmapIterator(object):
1469     """Omap iterator"""
1470
1471     cdef public Ioctx ioctx
1472     cdef rados_omap_iter_t ctx
1473
1474     def __cinit__(self, Ioctx ioctx):
1475         self.ioctx = ioctx
1476
1477     def __iter__(self):
1478         return self
1479
1480     def __next__(self):
1481         """
1482         Get the next key-value pair in the object
1483         :returns: next rados.OmapItem
1484         """
1485         cdef:
1486             char *key_ = NULL
1487             char *val_ = NULL
1488             size_t len_
1489
1490         with nogil:
1491             ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1492
1493         if ret != 0:
1494             raise make_ex(ret, "error iterating over the omap")
1495         if key_ == NULL:
1496             raise StopIteration()
1497         key = decode_cstr(key_)
1498         val = None
1499         if val_ != NULL:
1500             val = val_[:len_]
1501         return (key, val)
1502
1503     def __dealloc__(self):
1504         with nogil:
1505             rados_omap_get_end(self.ctx)
1506
1507
1508 cdef class ObjectIterator(object):
1509     """rados.Ioctx Object iterator"""
1510
1511     cdef rados_list_ctx_t ctx
1512
1513     cdef public object ioctx
1514
1515     def __cinit__(self, Ioctx ioctx):
1516         self.ioctx = ioctx
1517
1518         with nogil:
1519             ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1520         if ret < 0:
1521             raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1522                           % self.ioctx.name)
1523
1524     def __iter__(self):
1525         return self
1526
1527     def __next__(self):
1528         """
1529         Get the next object name and locator in the pool
1530
1531         :raises: StopIteration
1532         :returns: next rados.Ioctx Object
1533         """
1534         cdef:
1535             const char *key_ = NULL
1536             const char *locator_ = NULL
1537             const char *nspace_ = NULL
1538
1539         with nogil:
1540             ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1541
1542         if ret < 0:
1543             raise StopIteration()
1544
1545         key = decode_cstr(key_)
1546         locator = decode_cstr(locator_) if locator_ != NULL else None
1547         nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1548         return Object(self.ioctx, key, locator, nspace)
1549
1550     def __dealloc__(self):
1551         with nogil:
1552             rados_nobjects_list_close(self.ctx)
1553
1554
1555 cdef class XattrIterator(object):
1556     """Extended attribute iterator"""
1557
1558     cdef rados_xattrs_iter_t it
1559     cdef char* _oid
1560
1561     cdef public Ioctx ioctx
1562     cdef public object oid
1563
1564     def __cinit__(self, Ioctx ioctx, oid):
1565         self.ioctx = ioctx
1566         self.oid = cstr(oid, 'oid')
1567         self._oid = self.oid
1568
1569         with nogil:
1570             ret = rados_getxattrs(ioctx.io,  self._oid, &self.it)
1571         if ret != 0:
1572             raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1573
1574     def __iter__(self):
1575         return self
1576
1577     def __next__(self):
1578         """
1579         Get the next xattr on the object
1580
1581         :raises: StopIteration
1582         :returns: pair - of name and value of the next Xattr
1583         """
1584         cdef:
1585             const char *name_ = NULL
1586             const char *val_ = NULL
1587             size_t len_ = 0
1588
1589         with nogil:
1590             ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1591         if ret != 0:
1592             raise make_ex(ret, "error iterating over the extended attributes \
1593 in '%s'" % self.oid)
1594         if name_ == NULL:
1595             raise StopIteration()
1596         name = decode_cstr(name_)
1597         val = val_[:len_]
1598         return (name, val)
1599
1600     def __dealloc__(self):
1601         with nogil:
1602             rados_getxattrs_end(self.it)
1603
1604
1605 cdef class SnapIterator(object):
1606     """Snapshot iterator"""
1607
1608     cdef public Ioctx ioctx
1609
1610     cdef rados_snap_t *snaps
1611     cdef int max_snap
1612     cdef int cur_snap
1613
1614     def __cinit__(self, Ioctx ioctx):
1615         self.ioctx = ioctx
1616         # We don't know how big a buffer we need until we've called the
1617         # function. So use the exponential doubling strategy.
1618         cdef int num_snaps = 10
1619         while True:
1620             self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1621                                                     num_snaps *
1622                                                     sizeof(rados_snap_t))
1623
1624             with nogil:
1625                 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1626             if ret >= 0:
1627                 self.max_snap = ret
1628                 break
1629             elif ret != -errno.ERANGE:
1630                 raise make_ex(ret, "error calling rados_snap_list for \
1631 ioctx '%s'" % self.ioctx.name)
1632             num_snaps = num_snaps * 2
1633         self.cur_snap = 0
1634
1635     def __iter__(self):
1636         return self
1637
1638     def __next__(self):
1639         """
1640         Get the next Snapshot
1641
1642         :raises: :class:`Error`, StopIteration
1643         :returns: Snap - next snapshot
1644         """
1645         if self.cur_snap >= self.max_snap:
1646             raise StopIteration
1647
1648         cdef:
1649             rados_snap_t snap_id = self.snaps[self.cur_snap]
1650             int name_len = 10
1651             char *name = NULL
1652
1653         try:
1654             while True:
1655                 name = <char *>realloc_chk(name, name_len)
1656                 with nogil:
1657                     ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1658                 if ret == 0:
1659                     break
1660                 elif ret != -errno.ERANGE:
1661                     raise make_ex(ret, "rados_snap_get_name error")
1662                 else:
1663                     name_len = name_len * 2
1664
1665             snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1666             self.cur_snap = self.cur_snap + 1
1667             return snap
1668         finally:
1669             free(name)
1670
1671
1672 cdef class Snap(object):
1673     """Snapshot object"""
1674     cdef public Ioctx ioctx
1675     cdef public object name
1676
1677     # NOTE(sileht): old API was storing the ctypes object
1678     # instead of the value ....
1679     cdef public rados_snap_t snap_id
1680
1681     def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1682         self.ioctx = ioctx
1683         self.name = name
1684         self.snap_id = snap_id
1685
1686     def __str__(self):
1687         return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1688             % (str(self.ioctx), self.name, self.snap_id)
1689
1690     def get_timestamp(self):
1691         """
1692         Find when a snapshot in the current pool occurred
1693
1694         :raises: :class:`Error`
1695         :returns: datetime - the data and time the snapshot was created
1696         """
1697         cdef time_t snap_time
1698
1699         with nogil:
1700             ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1701         if ret != 0:
1702             raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1703         return datetime.fromtimestamp(snap_time)
1704
1705
1706 cdef class Completion(object):
1707     """completion object"""
1708
1709     cdef public:
1710          Ioctx ioctx
1711          object oncomplete
1712          object onsafe
1713
1714     cdef:
1715          rados_callback_t complete_cb
1716          rados_callback_t safe_cb
1717          rados_completion_t rados_comp
1718          PyObject* buf
1719
1720     def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1721         self.oncomplete = oncomplete
1722         self.onsafe = onsafe
1723         self.ioctx = ioctx
1724
1725     def is_safe(self):
1726         """
1727         Is an asynchronous operation safe?
1728
1729         This does not imply that the safe callback has finished.
1730
1731         :returns: True if the operation is safe
1732         """
1733         with nogil:
1734             ret = rados_aio_is_safe(self.rados_comp)
1735         return ret == 1
1736
1737     def is_complete(self):
1738         """
1739         Has an asynchronous operation completed?
1740
1741         This does not imply that the safe callback has finished.
1742
1743         :returns: True if the operation is completed
1744         """
1745         with nogil:
1746             ret = rados_aio_is_complete(self.rados_comp)
1747         return ret == 1
1748
1749     def wait_for_safe(self):
1750         """
1751         Wait for an asynchronous operation to be marked safe
1752
1753         This does not imply that the safe callback has finished.
1754         """
1755         with nogil:
1756             rados_aio_wait_for_safe(self.rados_comp)
1757
1758     def wait_for_complete(self):
1759         """
1760         Wait for an asynchronous operation to complete
1761
1762         This does not imply that the complete callback has finished.
1763         """
1764         with nogil:
1765             rados_aio_wait_for_complete(self.rados_comp)
1766
1767     def wait_for_safe_and_cb(self):
1768         """
1769         Wait for an asynchronous operation to be marked safe and for
1770         the safe callback to have returned
1771         """
1772         with nogil:
1773             rados_aio_wait_for_safe_and_cb(self.rados_comp)
1774
1775     def wait_for_complete_and_cb(self):
1776         """
1777         Wait for an asynchronous operation to complete and for the
1778         complete callback to have returned
1779
1780         :returns:  whether the operation is completed
1781         """
1782         with nogil:
1783             ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1784         return ret
1785
1786     def get_return_value(self):
1787         """
1788         Get the return value of an asychronous operation
1789
1790         The return value is set when the operation is complete or safe,
1791         whichever comes first.
1792
1793         :returns: int - return value of the operation
1794         """
1795         with nogil:
1796             ret = rados_aio_get_return_value(self.rados_comp)
1797         return ret
1798
1799     def __dealloc__(self):
1800         """
1801         Release a completion
1802
1803         Call this when you no longer need the completion. It may not be
1804         freed immediately if the operation is not acked and committed.
1805         """
1806         ref.Py_XDECREF(self.buf)
1807         self.buf = NULL
1808         if self.rados_comp != NULL:
1809             with nogil:
1810                 rados_aio_release(self.rados_comp)
1811                 self.rados_comp = NULL
1812
1813     def _complete(self):
1814         self.oncomplete(self)
1815         with self.ioctx.lock:
1816             if self.oncomplete:
1817                 self.ioctx.complete_completions.remove(self)
1818
1819     def _safe(self):
1820         self.onsafe(self)
1821         with self.ioctx.lock:
1822             if self.onsafe:
1823                 self.ioctx.safe_completions.remove(self)
1824
1825     def _cleanup(self):
1826         with self.ioctx.lock:
1827             if self.oncomplete:
1828                 self.ioctx.complete_completions.remove(self)
1829             if self.onsafe:
1830                 self.ioctx.safe_completions.remove(self)
1831
1832
1833 class OpCtx(object):
1834     def __enter__(self):
1835         return self.create()
1836
1837     def __exit__(self, type, msg, traceback):
1838         self.release()
1839
1840
1841 cdef class WriteOp(object):
1842     cdef rados_write_op_t write_op
1843
1844     def create(self):
1845         with nogil:
1846             self.write_op = rados_create_write_op()
1847         return self
1848
1849     def release(self):
1850         with nogil:
1851             rados_release_write_op(self.write_op)
1852
1853     @requires(('exclusive', opt(int)))
1854     def new(self, exclusive=None):
1855         """
1856         Create the object.
1857         """
1858
1859         cdef:
1860             int _exclusive = exclusive
1861
1862         with nogil:
1863             rados_write_op_create(self.write_op, _exclusive, NULL)
1864
1865
1866     def remove(self):
1867         """
1868         Remove object.
1869         """
1870         with nogil:
1871             rados_write_op_remove(self.write_op)
1872
1873     @requires(('flags', int))
1874     def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1875         """
1876         Set flags for the last operation added to this write_op.
1877         :para flags: flags to apply to the last operation
1878         :type flags: int
1879         """
1880
1881         cdef:
1882             int _flags = flags
1883
1884         with nogil:
1885             rados_write_op_set_flags(self.write_op, _flags)
1886
1887     @requires(('to_write', bytes))
1888     def append(self, to_write):
1889         """
1890         Append data to an object synchronously
1891         :param to_write: data to write
1892         :type to_write: bytes
1893         """
1894
1895         cdef:
1896             char *_to_write = to_write
1897             size_t length = len(to_write)
1898
1899         with nogil:
1900             rados_write_op_append(self.write_op, _to_write, length)
1901
1902     @requires(('to_write', bytes))
1903     def write_full(self, to_write):
1904         """
1905         Write whole object, atomically replacing it.
1906         :param to_write: data to write
1907         :type to_write: bytes
1908         """
1909
1910         cdef:
1911             char *_to_write = to_write
1912             size_t length = len(to_write)
1913
1914         with nogil:
1915             rados_write_op_write_full(self.write_op, _to_write, length)
1916
1917     @requires(('to_write', bytes), ('offset', int))
1918     def write(self, to_write, offset=0):
1919         """
1920         Write to offset.
1921         :param to_write: data to write
1922         :type to_write: bytes
1923         :param offset: byte offset in the object to begin writing at
1924         :type offset: int
1925         """
1926
1927         cdef:
1928             char *_to_write = to_write
1929             size_t length = len(to_write)
1930             uint64_t _offset = offset
1931
1932         with nogil:
1933             rados_write_op_write(self.write_op, _to_write, length, _offset)
1934
1935     @requires(('offset', int), ('length', int))
1936     def zero(self, offset, length):
1937         """
1938         Zero part of an object.
1939         :param offset: byte offset in the object to begin writing at
1940         :type offset: int
1941         :param offset: number of zero to write
1942         :type offset: int
1943         """
1944
1945         cdef:
1946             size_t _length = length
1947             uint64_t _offset = offset
1948
1949         with nogil:
1950             rados_write_op_zero(self.write_op, _length, _offset)
1951
1952     @requires(('offset', int))
1953     def truncate(self, offset):
1954         """
1955         Truncate an object.
1956         :param offset: byte offset in the object to begin truncating at
1957         :type offset: int
1958         """
1959
1960         cdef:
1961             uint64_t _offset = offset
1962
1963         with nogil:
1964             rados_write_op_truncate(self.write_op,  _offset)
1965
1966
1967 class WriteOpCtx(WriteOp, OpCtx):
1968     """write operation context manager"""
1969
1970
1971 cdef class ReadOp(object):
1972     cdef rados_read_op_t read_op
1973
1974     def create(self):
1975         with nogil:
1976             self.read_op = rados_create_read_op()
1977         return self
1978
1979     def release(self):
1980         with nogil:
1981             rados_release_read_op(self.read_op)
1982
1983     @requires(('flags', int))
1984     def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1985         """
1986         Set flags for the last operation added to this read_op.
1987         :para flags: flags to apply to the last operation
1988         :type flags: int
1989         """
1990
1991         cdef:
1992             int _flags = flags
1993
1994         with nogil:
1995             rados_read_op_set_flags(self.read_op, _flags)
1996
1997
1998 class ReadOpCtx(ReadOp, OpCtx):
1999     """read operation context manager"""
2000
2001
2002 cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
2003     """
2004     Callback to onsafe() for asynchronous operations
2005     """
2006     cdef object cb = <object>args
2007     cb._safe()
2008     return 0
2009
2010
2011 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2012     """
2013     Callback to oncomplete() for asynchronous operations
2014     """
2015     cdef object cb = <object>args
2016     cb._complete()
2017     return 0
2018
2019
2020 cdef class Ioctx(object):
2021     """rados.Ioctx object"""
2022     # NOTE(sileht): attributes declared in .pyd
2023
2024     def __init__(self, name):
2025         self.name = name
2026         self.state = "open"
2027
2028         self.locator_key = ""
2029         self.nspace = ""
2030         self.lock = threading.Lock()
2031         self.safe_completions = []
2032         self.complete_completions = []
2033
2034     def __enter__(self):
2035         return self
2036
2037     def __exit__(self, type_, value, traceback):
2038         self.close()
2039         return False
2040
2041     def __dealloc__(self):
2042         self.close()
2043
2044     def __track_completion(self, completion_obj):
2045         if completion_obj.oncomplete:
2046             with self.lock:
2047                 self.complete_completions.append(completion_obj)
2048         if completion_obj.onsafe:
2049             with self.lock:
2050                 self.safe_completions.append(completion_obj)
2051
2052     def __get_completion(self, oncomplete, onsafe):
2053         """
2054         Constructs a completion to use with asynchronous operations
2055
2056         :param oncomplete: what to do when the write is safe and complete in memory
2057             on all replicas
2058         :type oncomplete: completion
2059         :param onsafe:  what to do when the write is safe and complete on storage
2060             on all replicas
2061         :type onsafe: completion
2062
2063         :raises: :class:`Error`
2064         :returns: completion object
2065         """
2066
2067         completion_obj = Completion(self, oncomplete, onsafe)
2068
2069         cdef:
2070             rados_callback_t complete_cb = NULL
2071             rados_callback_t safe_cb = NULL
2072             rados_completion_t completion
2073             PyObject* p_completion_obj= <PyObject*>completion_obj
2074
2075         if oncomplete:
2076             complete_cb = <rados_callback_t>&__aio_complete_cb
2077         if onsafe:
2078             safe_cb = <rados_callback_t>&__aio_safe_cb
2079
2080         with nogil:
2081             ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2082                                               &completion)
2083         if ret < 0:
2084             raise make_ex(ret, "error getting a completion")
2085
2086         completion_obj.rados_comp = completion
2087         return completion_obj
2088
2089     @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2090     def aio_stat(self, object_name, oncomplete):
2091         """
2092         Asynchronously get object stats (size/mtime)
2093
2094         oncomplete will be called with the returned size and mtime
2095         as well as the completion:
2096
2097         oncomplete(completion, size, mtime)
2098
2099         :param object_name: the name of the object to get stats from
2100         :type object_name: str
2101         :param oncomplete: what to do when the stat is complete
2102         :type oncomplete: completion
2103
2104         :raises: :class:`Error`
2105         :returns: completion object
2106         """
2107
2108         object_name = cstr(object_name, 'object_name')
2109
2110         cdef:
2111             Completion completion
2112             char *_object_name = object_name
2113             uint64_t psize
2114             time_t pmtime
2115
2116         def oncomplete_(completion_v):
2117             cdef Completion _completion_v = completion_v
2118             return_value = _completion_v.get_return_value()
2119             if return_value >= 0:
2120                 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2121             else:
2122                 return oncomplete(_completion_v, None, None)
2123
2124         completion = self.__get_completion(oncomplete_, None)
2125         self.__track_completion(completion)
2126         with nogil:
2127             ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2128                                  &psize, &pmtime)
2129
2130         if ret < 0:
2131             completion._cleanup()
2132             raise make_ex(ret, "error stating %s" % object_name)
2133         return completion
2134
2135     @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2136               ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2137     def aio_write(self, object_name, to_write, offset=0,
2138                   oncomplete=None, onsafe=None):
2139         """
2140         Write data to an object asynchronously
2141
2142         Queues the write and returns.
2143
2144         :param object_name: name of the object
2145         :type object_name: str
2146         :param to_write: data to write
2147         :type to_write: bytes
2148         :param offset: byte offset in the object to begin writing at
2149         :type offset: int
2150         :param oncomplete: what to do when the write is safe and complete in memory
2151             on all replicas
2152         :type oncomplete: completion
2153         :param onsafe:  what to do when the write is safe and complete on storage
2154             on all replicas
2155         :type onsafe: completion
2156
2157         :raises: :class:`Error`
2158         :returns: completion object
2159         """
2160
2161         object_name = cstr(object_name, 'object_name')
2162
2163         cdef:
2164             Completion completion
2165             char* _object_name = object_name
2166             char* _to_write = to_write
2167             size_t size = len(to_write)
2168             uint64_t _offset = offset
2169
2170         completion = self.__get_completion(oncomplete, onsafe)
2171         self.__track_completion(completion)
2172         with nogil:
2173             ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2174                                 _to_write, size, _offset)
2175         if ret < 0:
2176             completion._cleanup()
2177             raise make_ex(ret, "error writing object %s" % object_name)
2178         return completion
2179
2180     @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2181               ('onsafe', opt(Callable)))
2182     def aio_write_full(self, object_name, to_write,
2183                        oncomplete=None, onsafe=None):
2184         """
2185         Asychronously write an entire object
2186
2187         The object is filled with the provided data. If the object exists,
2188         it is atomically truncated and then written.
2189         Queues the write and returns.
2190
2191         :param object_name: name of the object
2192         :type object_name: str
2193         :param to_write: data to write
2194         :type to_write: str
2195         :param oncomplete: what to do when the write is safe and complete in memory
2196             on all replicas
2197         :type oncomplete: completion
2198         :param onsafe:  what to do when the write is safe and complete on storage
2199             on all replicas
2200         :type onsafe: completion
2201
2202         :raises: :class:`Error`
2203         :returns: completion object
2204         """
2205
2206         object_name = cstr(object_name, 'object_name')
2207
2208         cdef:
2209             Completion completion
2210             char* _object_name = object_name
2211             char* _to_write = to_write
2212             size_t size = len(to_write)
2213
2214         completion = self.__get_completion(oncomplete, onsafe)
2215         self.__track_completion(completion)
2216         with nogil:
2217             ret = rados_aio_write_full(self.io, _object_name,
2218                                     completion.rados_comp,
2219                                     _to_write, size)
2220         if ret < 0:
2221             completion._cleanup()
2222             raise make_ex(ret, "error writing object %s" % object_name)
2223         return completion
2224
2225     @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2226               ('onsafe', opt(Callable)))
2227     def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2228         """
2229         Asychronously append data to an object
2230
2231         Queues the write and returns.
2232
2233         :param object_name: name of the object
2234         :type object_name: str
2235         :param to_append: data to append
2236         :type to_append: str
2237         :param offset: byte offset in the object to begin writing at
2238         :type offset: int
2239         :param oncomplete: what to do when the write is safe and complete in memory
2240             on all replicas
2241         :type oncomplete: completion
2242         :param onsafe:  what to do when the write is safe and complete on storage
2243             on all replicas
2244         :type onsafe: completion
2245
2246         :raises: :class:`Error`
2247         :returns: completion object
2248         """
2249         object_name = cstr(object_name, 'object_name')
2250
2251         cdef:
2252             Completion completion
2253             char* _object_name = object_name
2254             char* _to_append = to_append
2255             size_t size = len(to_append)
2256
2257         completion = self.__get_completion(oncomplete, onsafe)
2258         self.__track_completion(completion)
2259         with nogil:
2260             ret = rados_aio_append(self.io, _object_name,
2261                                 completion.rados_comp,
2262                                 _to_append, size)
2263         if ret < 0:
2264             completion._cleanup()
2265             raise make_ex(ret, "error appending object %s" % object_name)
2266         return completion
2267
2268     def aio_flush(self):
2269         """
2270         Block until all pending writes in an io context are safe
2271
2272         :raises: :class:`Error`
2273         """
2274         with nogil:
2275             ret = rados_aio_flush(self.io)
2276         if ret < 0:
2277             raise make_ex(ret, "error flushing")
2278
2279     @requires(('object_name', str_type), ('length', int), ('offset', int),
2280               ('oncomplete', opt(Callable)))
2281     def aio_read(self, object_name, length, offset, oncomplete):
2282         """
2283         Asychronously read data from an object
2284
2285         oncomplete will be called with the returned read value as
2286         well as the completion:
2287
2288         oncomplete(completion, data_read)
2289
2290         :param object_name: name of the object to read from
2291         :type object_name: str
2292         :param length: the number of bytes to read
2293         :type length: int
2294         :param offset: byte offset in the object to begin reading from
2295         :type offset: int
2296         :param oncomplete: what to do when the read is complete
2297         :type oncomplete: completion
2298
2299         :raises: :class:`Error`
2300         :returns: completion object
2301         """
2302
2303         object_name = cstr(object_name, 'object_name')
2304
2305         cdef:
2306             Completion completion
2307             char* _object_name = object_name
2308             uint64_t _offset = offset
2309
2310             char *ref_buf
2311             size_t _length = length
2312
2313         def oncomplete_(completion_v):
2314             cdef Completion _completion_v = completion_v
2315             return_value = _completion_v.get_return_value()
2316             if return_value > 0 and return_value != length:
2317                 _PyBytes_Resize(&_completion_v.buf, return_value)
2318             return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2319
2320         completion = self.__get_completion(oncomplete_, None)
2321         completion.buf = PyBytes_FromStringAndSize(NULL, length)
2322         ret_buf = PyBytes_AsString(completion.buf)
2323         self.__track_completion(completion)
2324         with nogil:
2325             ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2326                                 ret_buf, _length, _offset)
2327         if ret < 0:
2328             completion._cleanup()
2329             raise make_ex(ret, "error reading %s" % object_name)
2330         return completion
2331
2332     @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2333               ('data', bytes), ('length', int),
2334               ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2335     def aio_execute(self, object_name, cls, method, data,
2336                     length=8192, oncomplete=None, onsafe=None):
2337         """
2338         Asynchronously execute an OSD class method on an object.
2339
2340         oncomplete and onsafe will be called with the data returned from
2341         the plugin as well as the completion:
2342
2343         oncomplete(completion, data)
2344         onsafe(completion, data)
2345
2346         :param object_name: name of the object
2347         :type object_name: str
2348         :param cls: name of the object class
2349         :type cls: str
2350         :param method: name of the method
2351         :type method: str
2352         :param data: input data
2353         :type data: bytes
2354         :param length: size of output buffer in bytes (default=8192)
2355         :type length: int
2356         :param oncomplete: what to do when the execution is complete
2357         :type oncomplete: completion
2358         :param onsafe:  what to do when the execution is safe and complete
2359         :type onsafe: completion
2360
2361         :raises: :class:`Error`
2362         :returns: completion object
2363         """
2364
2365         object_name = cstr(object_name, 'object_name')
2366         cls = cstr(cls, 'cls')
2367         method = cstr(method, 'method')
2368         cdef:
2369             Completion completion
2370             char *_object_name = object_name
2371             char *_cls = cls
2372             char *_method = method
2373             char *_data = data
2374             size_t _data_len = len(data)
2375
2376             char *ref_buf
2377             size_t _length = length
2378
2379         def oncomplete_(completion_v):
2380             cdef Completion _completion_v = completion_v
2381             return_value = _completion_v.get_return_value()
2382             if return_value > 0 and return_value != length:
2383                 _PyBytes_Resize(&_completion_v.buf, return_value)
2384             return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2385
2386         def onsafe_(completion_v):
2387             cdef Completion _completion_v = completion_v
2388             return_value = _completion_v.get_return_value()
2389             return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2390
2391         completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2392         completion.buf = PyBytes_FromStringAndSize(NULL, length)
2393         ret_buf = PyBytes_AsString(completion.buf)
2394         self.__track_completion(completion)
2395         with nogil:
2396             ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2397                                  _cls, _method, _data, _data_len, ret_buf, _length)
2398         if ret < 0:
2399             completion._cleanup()
2400             raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2401         return completion
2402
2403     @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2404     def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2405         """
2406         Asychronously remove an object
2407
2408         :param object_name: name of the object to remove
2409         :type object_name: str
2410         :param oncomplete: what to do when the remove is safe and complete in memory
2411             on all replicas
2412         :type oncomplete: completion
2413         :param onsafe:  what to do when the remove is safe and complete on storage
2414             on all replicas
2415         :type onsafe: completion
2416
2417         :raises: :class:`Error`
2418         :returns: completion object
2419         """
2420         object_name = cstr(object_name, 'object_name')
2421
2422         cdef:
2423             Completion completion
2424             char* _object_name = object_name
2425
2426         completion = self.__get_completion(oncomplete, onsafe)
2427         self.__track_completion(completion)
2428         with nogil:
2429             ret = rados_aio_remove(self.io, _object_name,
2430                                 completion.rados_comp)
2431         if ret < 0:
2432             completion._cleanup()
2433             raise make_ex(ret, "error removing %s" % object_name)
2434         return completion
2435
2436     def require_ioctx_open(self):
2437         """
2438         Checks if the rados.Ioctx object state is 'open'
2439
2440         :raises: IoctxStateError
2441         """
2442         if self.state != "open":
2443             raise IoctxStateError("The pool is %s" % self.state)
2444
2445     def change_auid(self, auid):
2446         """
2447         Attempt to change an io context's associated auid "owner."
2448
2449         Requires that you have write permission on both the current and new
2450         auid.
2451
2452         :raises: :class:`Error`
2453         """
2454         self.require_ioctx_open()
2455
2456         cdef:
2457             uint64_t _auid = auid
2458
2459         with nogil:
2460             ret = rados_ioctx_pool_set_auid(self.io, _auid)
2461         if ret < 0:
2462             raise make_ex(ret, "error changing auid of '%s' to %d"
2463                           % (self.name, auid))
2464
2465     @requires(('loc_key', str_type))
2466     def set_locator_key(self, loc_key):
2467         """
2468         Set the key for mapping objects to pgs within an io context.
2469
2470         The key is used instead of the object name to determine which
2471         placement groups an object is put in. This affects all subsequent
2472         operations of the io context - until a different locator key is
2473         set, all objects in this io context will be placed in the same pg.
2474
2475         :param loc_key: the key to use as the object locator, or NULL to discard
2476             any previously set key
2477         :type loc_key: str
2478
2479         :raises: :class:`TypeError`
2480         """
2481         self.require_ioctx_open()
2482         cloc_key = cstr(loc_key, 'loc_key')
2483         cdef char *_loc_key = cloc_key
2484         with nogil:
2485             rados_ioctx_locator_set_key(self.io, _loc_key)
2486         self.locator_key = loc_key
2487
2488     def get_locator_key(self):
2489         """
2490         Get the locator_key of context
2491
2492         :returns: locator_key
2493         """
2494         return self.locator_key
2495
2496     @requires(('snap_id', long))
2497     def set_read(self, snap_id):
2498         """
2499         Set the snapshot for reading objects.
2500
2501         To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2502
2503         :param snap_id: the snapshot Id
2504         :type snap_id: int
2505
2506         :raises: :class:`TypeError`
2507         """
2508         self.require_ioctx_open()
2509         cdef rados_snap_t _snap_id = snap_id
2510         with nogil:
2511             rados_ioctx_snap_set_read(self.io, _snap_id)
2512
2513     @requires(('nspace', str_type))
2514     def set_namespace(self, nspace):
2515         """
2516         Set the namespace for objects within an io context.
2517
2518         The namespace in addition to the object name fully identifies
2519         an object. This affects all subsequent operations of the io context
2520         - until a different namespace is set, all objects in this io context
2521         will be placed in the same namespace.
2522
2523         :param nspace: the namespace to use, or None/"" for the default namespace
2524         :type nspace: str
2525
2526         :raises: :class:`TypeError`
2527         """
2528         self.require_ioctx_open()
2529         if nspace is None:
2530             nspace = ""
2531         cnspace = cstr(nspace, 'nspace')
2532         cdef char *_nspace = cnspace
2533         with nogil:
2534             rados_ioctx_set_namespace(self.io, _nspace)
2535         self.nspace = nspace
2536
2537     def get_namespace(self):
2538         """
2539         Get the namespace of context
2540
2541         :returns: namespace
2542         """
2543         return self.nspace
2544
2545     def close(self):
2546         """
2547         Close a rados.Ioctx object.
2548
2549         This just tells librados that you no longer need to use the io context.
2550         It may not be freed immediately if there are pending asynchronous
2551         requests on it, but you should not use an io context again after
2552         calling this function on it.
2553         """
2554         if self.state == "open":
2555             self.require_ioctx_open()
2556             with nogil:
2557                 rados_ioctx_destroy(self.io)
2558             self.state = "closed"
2559
2560
2561     @requires(('key', str_type), ('data', bytes))
2562     def write(self, key, data, offset=0):
2563         """
2564         Write data to an object synchronously
2565
2566         :param key: name of the object
2567         :type key: str
2568         :param data: data to write
2569         :type data: bytes
2570         :param offset: byte offset in the object to begin writing at
2571         :type offset: int
2572
2573         :raises: :class:`TypeError`
2574         :raises: :class:`LogicError`
2575         :returns: int - 0 on success
2576         """
2577         self.require_ioctx_open()
2578
2579         key = cstr(key, 'key')
2580         cdef:
2581             char *_key = key
2582             char *_data = data
2583             size_t length = len(data)
2584             uint64_t _offset = offset
2585
2586         with nogil:
2587             ret = rados_write(self.io, _key, _data, length, _offset)
2588         if ret == 0:
2589             return ret
2590         elif ret < 0:
2591             raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2592                           % (self.name, key))
2593         else:
2594             raise LogicError("Ioctx.write(%s): rados_write \
2595 returned %d, but should return zero on success." % (self.name, ret))
2596
2597     @requires(('key', str_type), ('data', bytes))
2598     def write_full(self, key, data):
2599         """
2600         Write an entire object synchronously.
2601
2602         The object is filled with the provided data. If the object exists,
2603         it is atomically truncated and then written.
2604
2605         :param key: name of the object
2606         :type key: str
2607         :param data: data to write
2608         :type data: bytes
2609
2610         :raises: :class:`TypeError`
2611         :raises: :class:`Error`
2612         :returns: int - 0 on success
2613         """
2614         self.require_ioctx_open()
2615         key = cstr(key, 'key')
2616         cdef:
2617             char *_key = key
2618             char *_data = data
2619             size_t length = len(data)
2620
2621         with nogil:
2622             ret = rados_write_full(self.io, _key, _data, length)
2623         if ret == 0:
2624             return ret
2625         elif ret < 0:
2626             raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2627                           % (self.name, key))
2628         else:
2629             raise LogicError("Ioctx.write_full(%s): rados_write_full \
2630 returned %d, but should return zero on success." % (self.name, ret))
2631
2632     @requires(('key', str_type), ('data', bytes))
2633     def append(self, key, data):
2634         """
2635         Append data to an object synchronously
2636
2637         :param key: name of the object
2638         :type key: str
2639         :param data: data to write
2640         :type data: bytes
2641
2642         :raises: :class:`TypeError`
2643         :raises: :class:`LogicError`
2644         :returns: int - 0 on success
2645         """
2646         self.require_ioctx_open()
2647         key = cstr(key, 'key')
2648         cdef:
2649             char *_key = key
2650             char *_data = data
2651             size_t length = len(data)
2652
2653         with nogil:
2654             ret = rados_append(self.io, _key, _data, length)
2655         if ret == 0:
2656             return ret
2657         elif ret < 0:
2658             raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2659                           % (self.name, key))
2660         else:
2661             raise LogicError("Ioctx.append(%s): rados_append \
2662 returned %d, but should return zero on success." % (self.name, ret))
2663
2664     @requires(('key', str_type))
2665     def read(self, key, length=8192, offset=0):
2666         """
2667         Read data from an object synchronously
2668
2669         :param key: name of the object
2670         :type key: str
2671         :param length: the number of bytes to read (default=8192)
2672         :type length: int
2673         :param offset: byte offset in the object to begin reading at
2674         :type offset: int
2675
2676         :raises: :class:`TypeError`
2677         :raises: :class:`Error`
2678         :returns: str - data read from object
2679         """
2680         self.require_ioctx_open()
2681         key = cstr(key, 'key')
2682         cdef:
2683             char *_key = key
2684             char *ret_buf
2685             uint64_t _offset = offset
2686             size_t _length = length
2687             PyObject* ret_s = NULL
2688
2689         ret_s = PyBytes_FromStringAndSize(NULL, length)
2690         try:
2691             ret_buf = PyBytes_AsString(ret_s)
2692             with nogil:
2693                 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2694             if ret < 0:
2695                 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2696
2697             if ret != length:
2698                 _PyBytes_Resize(&ret_s, ret)
2699
2700             return <object>ret_s
2701         finally:
2702             # We DECREF unconditionally: the cast to object above will have
2703             # INCREFed if necessary. This also takes care of exceptions,
2704             # including if _PyString_Resize fails (that will free the string
2705             # itself and set ret_s to NULL, hence XDECREF).
2706             ref.Py_XDECREF(ret_s)
2707
2708     @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2709     def execute(self, key, cls, method, data, length=8192):
2710         """
2711         Execute an OSD class method on an object.
2712
2713         :param key: name of the object
2714         :type key: str
2715         :param cls: name of the object class
2716         :type cls: str
2717         :param method: name of the method
2718         :type method: str
2719         :param data: input data
2720         :type data: bytes
2721         :param length: size of output buffer in bytes (default=8192)
2722         :type length: int
2723
2724         :raises: :class:`TypeError`
2725         :raises: :class:`Error`
2726         :returns: (ret, method output)
2727         """
2728         self.require_ioctx_open()
2729
2730         key = cstr(key, 'key')
2731         cls = cstr(cls, 'cls')
2732         method = cstr(method, 'method')
2733         cdef:
2734             char *_key = key
2735             char *_cls = cls
2736             char *_method = method
2737             char *_data = data
2738             size_t _data_len = len(data)
2739
2740             char *ref_buf
2741             size_t _length = length
2742             PyObject* ret_s = NULL
2743
2744         ret_s = PyBytes_FromStringAndSize(NULL, length)
2745         try:
2746             ret_buf = PyBytes_AsString(ret_s)
2747             with nogil:
2748                 ret = rados_exec(self.io, _key, _cls, _method, _data,
2749                                  _data_len, ret_buf, _length)
2750             if ret < 0:
2751                 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2752
2753             if ret != length:
2754                 _PyBytes_Resize(&ret_s, ret)
2755
2756             return ret, <object>ret_s
2757         finally:
2758             # We DECREF unconditionally: the cast to object above will have
2759             # INCREFed if necessary. This also takes care of exceptions,
2760             # including if _PyString_Resize fails (that will free the string
2761             # itself and set ret_s to NULL, hence XDECREF).
2762             ref.Py_XDECREF(ret_s)
2763
2764     def get_stats(self):
2765         """
2766         Get pool usage statistics
2767
2768         :returns: dict - contains the following keys:
2769
2770             - ``num_bytes`` (int) - size of pool in bytes
2771
2772             - ``num_kb`` (int) - size of pool in kbytes
2773
2774             - ``num_objects`` (int) - number of objects in the pool
2775
2776             - ``num_object_clones`` (int) - number of object clones
2777
2778             - ``num_object_copies`` (int) - number of object copies
2779
2780             - ``num_objects_missing_on_primary`` (int) - number of objets
2781                 missing on primary
2782
2783             - ``num_objects_unfound`` (int) - number of unfound objects
2784
2785             - ``num_objects_degraded`` (int) - number of degraded objects
2786
2787             - ``num_rd`` (int) - bytes read
2788
2789             - ``num_rd_kb`` (int) - kbytes read
2790
2791             - ``num_wr`` (int) - bytes written
2792
2793             - ``num_wr_kb`` (int) - kbytes written
2794         """
2795         self.require_ioctx_open()
2796         cdef rados_pool_stat_t stats
2797         with nogil:
2798             ret = rados_ioctx_pool_stat(self.io, &stats)
2799         if ret < 0:
2800             raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2801         return {'num_bytes': stats.num_bytes,
2802                 'num_kb': stats.num_kb,
2803                 'num_objects': stats.num_objects,
2804                 'num_object_clones': stats.num_object_clones,
2805                 'num_object_copies': stats.num_object_copies,
2806                 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2807                 "num_objects_unfound": stats.num_objects_unfound,
2808                 "num_objects_degraded": stats.num_objects_degraded,
2809                 "num_rd": stats.num_rd,
2810                 "num_rd_kb": stats.num_rd_kb,
2811                 "num_wr": stats.num_wr,
2812                 "num_wr_kb": stats.num_wr_kb}
2813
2814     @requires(('key', str_type))
2815     def remove_object(self, key):
2816         """
2817         Delete an object
2818
2819         This does not delete any snapshots of the object.
2820
2821         :param key: the name of the object to delete
2822         :type key: str
2823
2824         :raises: :class:`TypeError`
2825         :raises: :class:`Error`
2826         :returns: bool - True on success
2827         """
2828         self.require_ioctx_open()
2829         key = cstr(key, 'key')
2830         cdef:
2831             char *_key = key
2832
2833         with nogil:
2834             ret = rados_remove(self.io, _key)
2835         if ret < 0:
2836             raise make_ex(ret, "Failed to remove '%s'" % key)
2837         return True
2838
2839     @requires(('key', str_type))
2840     def trunc(self, key, size):
2841         """
2842         Resize an object
2843
2844         If this enlarges the object, the new area is logically filled with
2845         zeroes. If this shrinks the object, the excess data is removed.
2846
2847         :param key: the name of the object to resize
2848         :type key: str
2849         :param size: the new size of the object in bytes
2850         :type size: int
2851
2852         :raises: :class:`TypeError`
2853         :raises: :class:`Error`
2854         :returns: int - 0 on success, otherwise raises error
2855         """
2856
2857         self.require_ioctx_open()
2858         key = cstr(key, 'key')
2859         cdef:
2860             char *_key = key
2861             uint64_t _size = size
2862
2863         with nogil:
2864             ret = rados_trunc(self.io, _key, _size)
2865         if ret < 0:
2866             raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2867         return ret
2868
2869     @requires(('key', str_type))
2870     def stat(self, key):
2871         """
2872         Get object stats (size/mtime)
2873
2874         :param key: the name of the object to get stats from
2875         :type key: str
2876
2877         :raises: :class:`TypeError`
2878         :raises: :class:`Error`
2879         :returns: (size,timestamp)
2880         """
2881         self.require_ioctx_open()
2882
2883         key = cstr(key, 'key')
2884         cdef:
2885             char *_key = key
2886             uint64_t psize
2887             time_t pmtime
2888
2889         with nogil:
2890             ret = rados_stat(self.io, _key, &psize, &pmtime)
2891         if ret < 0:
2892             raise make_ex(ret, "Failed to stat %r" % key)
2893         return psize, time.localtime(pmtime)
2894
2895     @requires(('key', str_type), ('xattr_name', str_type))
2896     def get_xattr(self, key, xattr_name):
2897         """
2898         Get the value of an extended attribute on an object.
2899
2900         :param key: the name of the object to get xattr from
2901         :type key: str
2902         :param xattr_name: which extended attribute to read
2903         :type xattr_name: str
2904
2905         :raises: :class:`TypeError`
2906         :raises: :class:`Error`
2907         :returns: str - value of the xattr
2908         """
2909         self.require_ioctx_open()
2910
2911         key = cstr(key, 'key')
2912         xattr_name = cstr(xattr_name, 'xattr_name')
2913         cdef:
2914             char *_key = key
2915             char *_xattr_name = xattr_name
2916             size_t ret_length = 4096
2917             char *ret_buf = NULL
2918
2919         try:
2920             while ret_length < 4096 * 1024 * 1024:
2921                 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2922                 with nogil:
2923                     ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2924                 if ret == -errno.ERANGE:
2925                     ret_length *= 2
2926                 elif ret < 0:
2927                     raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2928                 else:
2929                     break
2930             return ret_buf[:ret]
2931         finally:
2932             free(ret_buf)
2933
2934     @requires(('oid', str_type))
2935     def get_xattrs(self, oid):
2936         """
2937         Start iterating over xattrs on an object.
2938
2939         :param oid: the name of the object to get xattrs from
2940         :type oid: str
2941
2942         :raises: :class:`TypeError`
2943         :raises: :class:`Error`
2944         :returns: XattrIterator
2945         """
2946         self.require_ioctx_open()
2947         return XattrIterator(self, oid)
2948
2949     @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
2950     def set_xattr(self, key, xattr_name, xattr_value):
2951         """
2952         Set an extended attribute on an object.
2953
2954         :param key: the name of the object to set xattr to
2955         :type key: str
2956         :param xattr_name: which extended attribute to set
2957         :type xattr_name: str
2958         :param xattr_value: the value of the  extended attribute
2959         :type xattr_value: bytes
2960
2961         :raises: :class:`TypeError`
2962         :raises: :class:`Error`
2963         :returns: bool - True on success, otherwise raise an error
2964         """
2965         self.require_ioctx_open()
2966
2967         key = cstr(key, 'key')
2968         xattr_name = cstr(xattr_name, 'xattr_name')
2969         cdef:
2970             char *_key = key
2971             char *_xattr_name = xattr_name
2972             char *_xattr_value = xattr_value
2973             size_t _xattr_value_len = len(xattr_value)
2974
2975         with nogil:
2976             ret = rados_setxattr(self.io, _key, _xattr_name,
2977                                  _xattr_value, _xattr_value_len)
2978         if ret < 0:
2979             raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2980         return True
2981
2982     @requires(('key', str_type), ('xattr_name', str_type))
2983     def rm_xattr(self, key, xattr_name):
2984         """
2985         Removes an extended attribute on from an object.
2986
2987         :param key: the name of the object to remove xattr from
2988         :type key: str
2989         :param xattr_name: which extended attribute to remove
2990         :type xattr_name: str
2991
2992         :raises: :class:`TypeError`
2993         :raises: :class:`Error`
2994         :returns: bool - True on success, otherwise raise an error
2995         """
2996         self.require_ioctx_open()
2997
2998         key = cstr(key, 'key')
2999         xattr_name = cstr(xattr_name, 'xattr_name')
3000         cdef:
3001             char *_key = key
3002             char *_xattr_name = xattr_name
3003
3004         with nogil:
3005             ret = rados_rmxattr(self.io, _key, _xattr_name)
3006         if ret < 0:
3007             raise make_ex(ret, "Failed to delete key %r xattr %r" %
3008                           (key, xattr_name))
3009         return True
3010
3011     def list_objects(self):
3012         """
3013         Get ObjectIterator on rados.Ioctx object.
3014
3015         :returns: ObjectIterator
3016         """
3017         self.require_ioctx_open()
3018         return ObjectIterator(self)
3019
3020     def list_snaps(self):
3021         """
3022         Get SnapIterator on rados.Ioctx object.
3023
3024         :returns: SnapIterator
3025         """
3026         self.require_ioctx_open()
3027         return SnapIterator(self)
3028
3029     @requires(('snap_name', str_type))
3030     def create_snap(self, snap_name):
3031         """
3032         Create a pool-wide snapshot
3033
3034         :param snap_name: the name of the snapshot
3035         :type snap_name: str
3036
3037         :raises: :class:`TypeError`
3038         :raises: :class:`Error`
3039         """
3040         self.require_ioctx_open()
3041         snap_name = cstr(snap_name, 'snap_name')
3042         cdef char *_snap_name = snap_name
3043
3044         with nogil:
3045             ret = rados_ioctx_snap_create(self.io, _snap_name)
3046         if ret != 0:
3047             raise make_ex(ret, "Failed to create snap %s" % snap_name)
3048
3049     @requires(('snap_name', str_type))
3050     def remove_snap(self, snap_name):
3051         """
3052         Removes a pool-wide snapshot
3053
3054         :param snap_name: the name of the snapshot
3055         :type snap_name: str
3056
3057         :raises: :class:`TypeError`
3058         :raises: :class:`Error`
3059         """
3060         self.require_ioctx_open()
3061         snap_name = cstr(snap_name, 'snap_name')
3062         cdef char *_snap_name = snap_name
3063
3064         with nogil:
3065             ret = rados_ioctx_snap_remove(self.io, _snap_name)
3066         if ret != 0:
3067             raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3068
3069     @requires(('snap_name', str_type))
3070     def lookup_snap(self, snap_name):
3071         """
3072         Get the id of a pool snapshot
3073
3074         :param snap_name: the name of the snapshot to lookop
3075         :type snap_name: str
3076
3077         :raises: :class:`TypeError`
3078         :raises: :class:`Error`
3079         :returns: Snap - on success
3080         """
3081         self.require_ioctx_open()
3082         csnap_name = cstr(snap_name, 'snap_name')
3083         cdef:
3084             char *_snap_name = csnap_name
3085             rados_snap_t snap_id
3086
3087         with nogil:
3088             ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3089         if ret != 0:
3090             raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3091         return Snap(self, snap_name, int(snap_id))
3092
3093     @requires(('oid', str_type), ('snap_name', str_type))
3094     def snap_rollback(self, oid, snap_name):
3095         """
3096         Rollback an object to a snapshot
3097
3098         :param oid: the name of the object
3099         :type oid: str
3100         :param snap_name: the name of the snapshot
3101         :type snap_name: str
3102
3103         :raises: :class:`TypeError`
3104         :raises: :class:`Error`
3105         """
3106         self.require_ioctx_open()
3107         oid = cstr(oid, 'oid')
3108         snap_name = cstr(snap_name, 'snap_name')
3109         cdef:
3110             char *_snap_name = snap_name
3111             char *_oid = oid
3112
3113         with nogil:
3114             ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3115         if ret != 0:
3116             raise make_ex(ret, "Failed to rollback %s" % oid)
3117
3118     def get_last_version(self):
3119         """
3120         Return the version of the last object read or written to.
3121
3122         This exposes the internal version number of the last object read or
3123         written via this io context
3124
3125         :returns: version of the last object used
3126         """
3127         self.require_ioctx_open()
3128         with nogil:
3129             ret = rados_get_last_version(self.io)
3130         return int(ret)
3131
3132     def create_write_op(self):
3133         """
3134         create write operation object.
3135         need call release_write_op after use
3136         """
3137         return WriteOp().create()
3138
3139     def create_read_op(self):
3140         """
3141         create read operation object.
3142         need call release_read_op after use
3143         """
3144         return ReadOp().create()
3145
3146     def release_write_op(self, write_op):
3147         """
3148         release memory alloc by create_write_op
3149         """
3150         write_op.release()
3151
3152     def release_read_op(self, read_op):
3153         """
3154         release memory alloc by create_read_op
3155         :para read_op: read_op object
3156         :type: int
3157         """
3158         read_op.release()
3159
3160     @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3161     def set_omap(self, write_op, keys, values):
3162         """
3163         set keys values to write_op
3164         :para write_op: write_operation object
3165         :type write_op: WriteOp
3166         :para keys: a tuple of keys
3167         :type keys: tuple
3168         :para values: a tuple of values
3169         :type values: tuple
3170         """
3171
3172         if len(keys) != len(values):
3173             raise Error("Rados(): keys and values must have the same number of items")
3174
3175         keys = cstr_list(keys, 'keys')
3176         cdef:
3177             WriteOp _write_op = write_op
3178             size_t key_num = len(keys)
3179             char **_keys = to_bytes_array(keys)
3180             char **_values = to_bytes_array(values)
3181             size_t *_lens = to_csize_t_array([len(v) for v in values])
3182
3183         try:
3184             with nogil:
3185                 rados_write_op_omap_set(_write_op.write_op,
3186                                         <const char**>_keys,
3187                                         <const char**>_values,
3188                                         <const size_t*>_lens, key_num)
3189         finally:
3190             free(_keys)
3191             free(_values)
3192             free(_lens)
3193
3194     @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3195     def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3196         """
3197         excute the real write operation
3198         :para write_op: write operation object
3199         :type write_op: WriteOp
3200         :para oid: object name
3201         :type oid: str
3202         :para mtime: the time to set the mtime to, 0 for the current time
3203         :type mtime: int
3204         :para flags: flags to apply to the entire operation
3205         :type flags: int
3206         """
3207
3208         oid = cstr(oid, 'oid')
3209         cdef:
3210             WriteOp _write_op = write_op
3211             char *_oid = oid
3212             time_t _mtime = mtime
3213             int _flags = flags
3214
3215         with nogil:
3216             ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3217         if ret != 0:
3218             raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3219
3220     @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3221     def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3222         """
3223         excute the real write operation asynchronously
3224         :para write_op: write operation object
3225         :type write_op: WriteOp
3226         :para oid: object name
3227         :type oid: str
3228         :param oncomplete: what to do when the remove is safe and complete in memory
3229             on all replicas
3230         :type oncomplete: completion
3231         :param onsafe:  what to do when the remove is safe and complete on storage
3232             on all replicas
3233         :type onsafe: completion
3234         :para mtime: the time to set the mtime to, 0 for the current time
3235         :type mtime: int
3236         :para flags: flags to apply to the entire operation
3237         :type flags: int
3238
3239         :raises: :class:`Error`
3240         :returns: completion object
3241         """
3242
3243         oid = cstr(oid, 'oid')
3244         cdef:
3245             WriteOp _write_op = write_op
3246             char *_oid = oid
3247             Completion completion
3248             time_t _mtime = mtime
3249             int _flags = flags
3250
3251         completion = self.__get_completion(oncomplete, onsafe)
3252         self.__track_completion(completion)
3253
3254         with nogil:
3255             ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3256                                              &_mtime, _flags)
3257         if ret != 0:
3258             completion._cleanup()
3259             raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3260         return completion
3261
3262     @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3263     def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3264         """
3265         excute the real read operation
3266         :para read_op: read operation object
3267         :type read_op: ReadOp
3268         :para oid: object name
3269         :type oid: str
3270         :para flag: flags to apply to the entire operation
3271         :type flag: int
3272         """
3273         oid = cstr(oid, 'oid')
3274         cdef:
3275             ReadOp _read_op = read_op
3276             char *_oid = oid
3277             int _flag = flag
3278
3279         with nogil:
3280             ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3281         if ret != 0:
3282             raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3283
3284     @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3285     def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3286         """
3287         excute the real read operation
3288         :para read_op: read operation object
3289         :type read_op: ReadOp
3290         :para oid: object name
3291         :type oid: str
3292         :param oncomplete: what to do when the remove is safe and complete in memory
3293             on all replicas
3294         :type oncomplete: completion
3295         :param onsafe:  what to do when the remove is safe and complete on storage
3296             on all replicas
3297         :type onsafe: completion
3298         :para flag: flags to apply to the entire operation
3299         :type flag: int
3300         """
3301         oid = cstr(oid, 'oid')
3302         cdef:
3303             ReadOp _read_op = read_op
3304             char *_oid = oid
3305             Completion completion
3306             int _flag = flag
3307
3308         completion = self.__get_completion(oncomplete, onsafe)
3309         self.__track_completion(completion)
3310
3311         with nogil:
3312             ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3313         if ret != 0:
3314             completion._cleanup()
3315             raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3316         return completion
3317
3318     @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3319     def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3320         """
3321         get the omap values
3322         :para read_op: read operation object
3323         :type read_op: ReadOp
3324         :para start_after: list keys starting after start_after
3325         :type start_after: str
3326         :para filter_prefix: list only keys beginning with filter_prefix
3327         :type filter_prefix: str
3328         :para max_return: list no more than max_return key/value pairs
3329         :type max_return: int
3330         :returns: an iterator over the requested omap values, return value from this action
3331         """
3332
3333         start_after = cstr(start_after, 'start_after') if start_after else None
3334         filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3335         cdef:
3336             char *_start_after = opt_str(start_after)
3337             char *_filter_prefix = opt_str(filter_prefix)
3338             ReadOp _read_op = read_op
3339             rados_omap_iter_t iter_addr = NULL
3340             int _max_return = max_return
3341             int prval = 0
3342
3343         with nogil:
3344             rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
3345                                          _max_return, &iter_addr, NULL, &prval)
3346         it = OmapIterator(self)
3347         it.ctx = iter_addr
3348         return it, int(prval)
3349
3350     @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3351     def get_omap_keys(self, read_op, start_after, max_return):
3352         """
3353         get the omap keys
3354         :para read_op: read operation object
3355         :type read_op: ReadOp
3356         :para start_after: list keys starting after start_after
3357         :type start_after: str
3358         :para max_return: list no more than max_return key/value pairs
3359         :type max_return: int
3360         :returns: an iterator over the requested omap values, return value from this action
3361         """
3362         start_after = cstr(start_after, 'start_after') if start_after else None
3363         cdef:
3364             char *_start_after = opt_str(start_after)
3365             ReadOp _read_op = read_op
3366             rados_omap_iter_t iter_addr = NULL
3367             int _max_return = max_return
3368             int prval = 0
3369
3370         with nogil:
3371             rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
3372                                          _max_return, &iter_addr, NULL, &prval)
3373         it = OmapIterator(self)
3374         it.ctx = iter_addr
3375         return it, int(prval)
3376
3377     @requires(('read_op', ReadOp), ('keys', tuple))
3378     def get_omap_vals_by_keys(self, read_op, keys):
3379         """
3380         get the omap values by keys
3381         :para read_op: read operation object
3382         :type read_op: ReadOp
3383         :para keys: input key tuple
3384         :type keys: tuple
3385         :returns: an iterator over the requested omap values, return value from this action
3386         """
3387         keys = cstr_list(keys, 'keys')
3388         cdef:
3389             ReadOp _read_op = read_op
3390             rados_omap_iter_t iter_addr
3391             char **_keys = to_bytes_array(keys)
3392             size_t key_num = len(keys)
3393             int prval = 0
3394
3395         try:
3396             with nogil:
3397                 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3398                                                     <const char**>_keys,
3399                                                     key_num, &iter_addr,  &prval)
3400             it = OmapIterator(self)
3401             it.ctx = iter_addr
3402             return it, int(prval)
3403         finally:
3404             free(_keys)
3405
3406     @requires(('write_op', WriteOp), ('keys', tuple))
3407     def remove_omap_keys(self, write_op, keys):
3408         """
3409         remove omap keys specifiled
3410         :para write_op: write operation object
3411         :type write_op: WriteOp
3412         :para keys: input key tuple
3413         :type keys: tuple
3414         """
3415
3416         keys = cstr_list(keys, 'keys')
3417         cdef:
3418             WriteOp _write_op = write_op
3419             size_t key_num = len(keys)
3420             char **_keys = to_bytes_array(keys)
3421
3422         try:
3423             with nogil:
3424                 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3425         finally:
3426             free(_keys)
3427
3428     @requires(('write_op', WriteOp))
3429     def clear_omap(self, write_op):
3430         """
3431         Remove all key/value pairs from an object
3432         :para write_op: write operation object
3433         :type write_op: WriteOp
3434         """
3435
3436         cdef:
3437             WriteOp _write_op = write_op
3438
3439         with nogil:
3440             rados_write_op_omap_clear(_write_op.write_op)
3441
3442     @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3443               ('duration', opt(int)), ('flags', int))
3444     def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3445
3446         """
3447         Take an exclusive lock on an object
3448
3449         :param key: name of the object
3450         :type key: str
3451         :param name: name of the lock
3452         :type name: str
3453         :param cookie: cookie of the lock
3454         :type cookie: str
3455         :param desc: description of the lock
3456         :type desc: str
3457         :param duration: duration of the lock in seconds
3458         :type duration: int
3459         :param flags: flags
3460         :type flags: int
3461
3462         :raises: :class:`TypeError`
3463         :raises: :class:`Error`
3464         """
3465         self.require_ioctx_open()
3466
3467         key = cstr(key, 'key')
3468         name = cstr(name, 'name')
3469         cookie = cstr(cookie, 'cookie')
3470         desc = cstr(desc, 'desc')
3471
3472         cdef:
3473             char* _key = key
3474             char* _name = name
3475             char* _cookie = cookie
3476             char* _desc = desc
3477             uint8_t _flags = flags
3478             timeval _duration
3479
3480         if duration is None:
3481             with nogil:
3482                 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3483                                            NULL, _flags)
3484         else:
3485             _duration.tv_sec = duration
3486             with nogil:
3487                 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3488                                            &_duration, _flags)
3489
3490         if ret < 0:
3491             raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3492
3493     @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3494               ('desc', str_type), ('duration', opt(int)), ('flags', int))
3495     def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3496
3497         """
3498         Take a shared lock on an object
3499
3500         :param key: name of the object
3501         :type key: str
3502         :param name: name of the lock
3503         :type name: str
3504         :param cookie: cookie of the lock
3505         :type cookie: str
3506         :param tag: tag of the lock
3507         :type tag: str
3508         :param desc: description of the lock
3509         :type desc: str
3510         :param duration: duration of the lock in seconds
3511         :type duration: int
3512         :param flags: flags
3513         :type flags: int
3514
3515         :raises: :class:`TypeError`
3516         :raises: :class:`Error`
3517         """
3518         self.require_ioctx_open()
3519
3520         key = cstr(key, 'key')
3521         tag = cstr(tag, 'tag')
3522         name = cstr(name, 'name')
3523         cookie = cstr(cookie, 'cookie')
3524         desc = cstr(desc, 'desc')
3525
3526         cdef:
3527             char* _key = key
3528             char* _tag = tag
3529             char* _name = name
3530             char* _cookie = cookie
3531             char* _desc = desc
3532             uint8_t _flags = flags
3533             timeval _duration
3534
3535         if duration is None:
3536             with nogil:
3537                 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3538                                         NULL, _flags)
3539         else:
3540             _duration.tv_sec = duration
3541             with nogil:
3542                 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3543                                         &_duration, _flags)
3544         if ret < 0:
3545             raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3546
3547     @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3548     def unlock(self, key, name, cookie):
3549
3550         """
3551         Release a shared or exclusive lock on an object
3552
3553         :param key: name of the object
3554         :type key: str
3555         :param name: name of the lock
3556         :type name: str
3557         :param cookie: cookie of the lock
3558         :type cookie: str
3559
3560         :raises: :class:`TypeError`
3561         :raises: :class:`Error`
3562         """
3563         self.require_ioctx_open()
3564
3565         key = cstr(key, 'key')
3566         name = cstr(name, 'name')
3567         cookie = cstr(cookie, 'cookie')
3568
3569         cdef:
3570             char* _key = key
3571             char* _name = name
3572             char* _cookie = cookie
3573
3574         with nogil:
3575             ret = rados_unlock(self.io, _key, _name, _cookie)
3576         if ret < 0:
3577             raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3578
3579     def application_enable(self, app_name, force=False):
3580         """
3581         Enable an application on an OSD pool
3582
3583         :param app_name: application name
3584         :type app_name: str
3585         :param force: False if only a single app should exist per pool
3586         :type expire_seconds: boool
3587
3588         :raises: :class:`Error`
3589         """
3590         app_name =  cstr(app_name, 'app_name')
3591         cdef:
3592             char *_app_name = app_name
3593             int _force = (1 if force else 0)
3594
3595         with nogil:
3596             ret = rados_application_enable(self.io, _app_name, _force)
3597         if ret < 0:
3598             raise make_ex(ret, "error enabling application")
3599
3600     def application_list(self):
3601         """
3602         Returns a list of enabled applications
3603
3604         :returns: list of app name string
3605         """
3606         cdef:
3607             size_t length = 128
3608             char *apps = NULL
3609
3610         try:
3611             while True:
3612                 apps = <char *>realloc_chk(apps, length)
3613                 with nogil:
3614                     ret = rados_application_list(self.io, apps, &length)
3615                 if ret == 0:
3616                     return [decode_cstr(app) for app in
3617                                 apps[:length].split(b'\0') if app]
3618                 elif ret == -errno.ENOENT:
3619                     return None
3620                 elif ret == -errno.ERANGE:
3621                     pass
3622                 else:
3623                     raise make_ex(ret, "error listing applications")
3624         finally:
3625             free(apps)
3626
3627     def application_metadata_set(self, app_name, key, value):
3628         """
3629         Sets application metadata on an OSD pool
3630
3631         :param app_name: application name
3632         :type app_name: str
3633         :param key: metadata key
3634         :type key: str
3635         :param value: metadata value
3636         :type value: str
3637
3638         :raises: :class:`Error`
3639         """
3640         app_name =  cstr(app_name, 'app_name')
3641         key =  cstr(key, 'key')
3642         value =  cstr(value, 'value')
3643         cdef:
3644             char *_app_name = app_name
3645             char *_key = key
3646             char *_value = value
3647
3648         with nogil:
3649             ret = rados_application_metadata_set(self.io, _app_name, _key,
3650                                                  _value)
3651         if ret < 0:
3652             raise make_ex(ret, "error setting application metadata")
3653
3654     def application_metadata_remove(self, app_name, key):
3655         """
3656         Remove application metadata from an OSD pool
3657
3658         :param app_name: application name
3659         :type app_name: str
3660         :param key: metadata key
3661         :type key: str
3662
3663         :raises: :class:`Error`
3664         """
3665         app_name =  cstr(app_name, 'app_name')
3666         key =  cstr(key, 'key')
3667         cdef:
3668             char *_app_name = app_name
3669             char *_key = key
3670
3671         with nogil:
3672             ret = rados_application_metadata_remove(self.io, _app_name, _key)
3673         if ret < 0:
3674             raise make_ex(ret, "error removing application metadata")
3675
3676     def application_metadata_list(self, app_name):
3677         """
3678         Returns a list of enabled applications
3679
3680         :param app_name: application name
3681         :type app_name: str
3682         :returns: list of key/value tuples
3683         """
3684         app_name =  cstr(app_name, 'app_name')
3685         cdef:
3686             char *_app_name = app_name
3687             size_t key_length = 128
3688             size_t val_length = 128
3689             char *c_keys = NULL
3690             char *c_vals = NULL
3691
3692         try:
3693             while True:
3694                 c_keys = <char *>realloc_chk(c_keys, key_length)
3695                 c_vals = <char *>realloc_chk(c_vals, val_length)
3696                 with nogil:
3697                     ret = rados_application_metadata_list(self.io, _app_name,
3698                                                           c_keys, &key_length,
3699                                                           c_vals, &val_length)
3700                 if ret == 0:
3701                     keys = [decode_cstr(key) for key in
3702                                 c_keys[:key_length].split(b'\0') if key]
3703                     vals = [decode_cstr(val) for val in
3704                                 c_vals[:val_length].split(b'\0') if val]
3705                     return zip(keys, vals)
3706                 elif ret == -errno.ERANGE:
3707                     pass
3708                 else:
3709                     raise make_ex(ret, "error listing application metadata")
3710         finally:
3711             free(c_keys)
3712             free(c_vals)
3713
3714
3715 def set_object_locator(func):
3716     def retfunc(self, *args, **kwargs):
3717         if self.locator_key is not None:
3718             old_locator = self.ioctx.get_locator_key()
3719             self.ioctx.set_locator_key(self.locator_key)
3720             retval = func(self, *args, **kwargs)
3721             self.ioctx.set_locator_key(old_locator)
3722             return retval
3723         else:
3724             return func(self, *args, **kwargs)
3725     return retfunc
3726
3727
3728 def set_object_namespace(func):
3729     def retfunc(self, *args, **kwargs):
3730         if self.nspace is None:
3731             raise LogicError("Namespace not set properly in context")
3732         old_nspace = self.ioctx.get_namespace()
3733         self.ioctx.set_namespace(self.nspace)
3734         retval = func(self, *args, **kwargs)
3735         self.ioctx.set_namespace(old_nspace)
3736         return retval
3737     return retfunc
3738
3739
3740 class Object(object):
3741     """Rados object wrapper, makes the object look like a file"""
3742     def __init__(self, ioctx, key, locator_key=None, nspace=None):
3743         self.key = key
3744         self.ioctx = ioctx
3745         self.offset = 0
3746         self.state = "exists"
3747         self.locator_key = locator_key
3748         self.nspace = "" if nspace is None else nspace
3749
3750     def __str__(self):
3751         return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3752             (str(self.ioctx), self.key, "--default--"
3753              if self.nspace is "" else self.nspace, self.locator_key)
3754
3755     def require_object_exists(self):
3756         if self.state != "exists":
3757             raise ObjectStateError("The object is %s" % self.state)
3758
3759     @set_object_locator
3760     @set_object_namespace
3761     def read(self, length=1024 * 1024):
3762         self.require_object_exists()
3763         ret = self.ioctx.read(self.key, length, self.offset)
3764         self.offset += len(ret)
3765         return ret
3766
3767     @set_object_locator
3768     @set_object_namespace
3769     def write(self, string_to_write):
3770         self.require_object_exists()
3771         ret = self.ioctx.write(self.key, string_to_write, self.offset)
3772         if ret == 0:
3773             self.offset += len(string_to_write)
3774         return ret
3775
3776     @set_object_locator
3777     @set_object_namespace
3778     def remove(self):
3779         self.require_object_exists()
3780         self.ioctx.remove_object(self.key)
3781         self.state = "removed"
3782
3783     @set_object_locator
3784     @set_object_namespace
3785     def stat(self):
3786         self.require_object_exists()
3787         return self.ioctx.stat(self.key)
3788
3789     def seek(self, position):
3790         self.require_object_exists()
3791         self.offset = position
3792
3793     @set_object_locator
3794     @set_object_namespace
3795     def get_xattr(self, xattr_name):
3796         self.require_object_exists()
3797         return self.ioctx.get_xattr(self.key, xattr_name)
3798
3799     @set_object_locator
3800     @set_object_namespace
3801     def get_xattrs(self):
3802         self.require_object_exists()
3803         return self.ioctx.get_xattrs(self.key)
3804
3805     @set_object_locator
3806     @set_object_namespace
3807     def set_xattr(self, xattr_name, xattr_value):
3808         self.require_object_exists()
3809         return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
3810
3811     @set_object_locator
3812     @set_object_namespace
3813     def rm_xattr(self, xattr_name):
3814         self.require_object_exists()
3815         return self.ioctx.rm_xattr(self.key, xattr_name)
3816
3817 MONITOR_LEVELS = [
3818     "debug",
3819     "info",
3820     "warn", "warning",
3821     "err", "error",
3822     "sec",
3823     ]
3824
3825
3826 class MonitorLog(object):
3827     # NOTE(sileht): Keep this class for backward compat
3828     # method moved to Rados.monitor_log()
3829     """
3830     For watching cluster log messages.  Instantiate an object and keep
3831     it around while callback is periodically called.  Construct with
3832     'level' to monitor 'level' messages (one of MONITOR_LEVELS).
3833     arg will be passed to the callback.
3834
3835     callback will be called with:
3836         arg (given to __init__)
3837         line (the full line, including timestamp, who, level, msg)
3838         who (which entity issued the log message)
3839         timestamp_sec (sec of a struct timespec)
3840         timestamp_nsec (sec of a struct timespec)
3841         seq (sequence number)
3842         level (string representing the level of the log message)
3843         msg (the message itself)
3844     callback's return value is ignored
3845     """
3846     def __init__(self, cluster, level, callback, arg):
3847         self.level = level
3848         self.callback = callback
3849         self.arg = arg
3850         self.cluster = cluster
3851         self.cluster.monitor_log(level, callback, arg)
3852