-# cython: embedsignature=True
-"""
-This module is a thin wrapper around librados.
-
-Error codes from librados are turned into exceptions that subclass
-:class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
-(the base class of all rados exceptions), :class:`PermissionError`
-and :class:`IOError`, in addition to those documented for the
-method.
-"""
-# Copyright 2011 Josh Durgin
-# Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
-# Copyright 2015 Hector Martin <marcan@marcan.st>
-# Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
-
-from cpython cimport PyObject, ref
-from cpython.pycapsule cimport *
-from libc cimport errno
-from libc.stdint cimport *
-from libc.stdlib cimport malloc, realloc, free
-
-import sys
-import threading
-import time
-
-from collections import Callable
-from datetime import datetime
-from functools import partial, wraps
-from itertools import chain
-
-# Are we running Python 2.x
-if sys.version_info[0] < 3:
- str_type = basestring
-else:
- str_type = str
-
-
-cdef extern from "Python.h":
- # These are in cpython/string.pxd, but use "object" types instead of
- # PyObject*, which invokes assumptions in cpython that we need to
- # legitimately break to implement zero-copy string buffers in Ioctx.read().
- # This is valid use of the Python API and documented as a special case.
- PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
- char* PyBytes_AsString(PyObject *string) except NULL
- int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
- void PyEval_InitThreads()
-
-
-cdef extern from "time.h":
- ctypedef long int time_t
- ctypedef long int suseconds_t
-
-
-cdef extern from "sys/time.h":
- cdef struct timeval:
- time_t tv_sec
- suseconds_t tv_usec
-
-
-cdef extern from "rados/rados_types.h" nogil:
- cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
-
-
-cdef extern from "rados/librados.h" nogil:
- enum:
- _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
- _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
- _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
- _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
- _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
- _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
- _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
-
-
- enum:
- _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
- _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
- _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
- _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
- _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
- _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
- _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
- _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
- _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
-
- cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
-
- ctypedef void* rados_t
- ctypedef void* rados_config_t
- ctypedef void* rados_ioctx_t
- ctypedef void* rados_xattrs_iter_t
- ctypedef void* rados_omap_iter_t
- ctypedef void* rados_list_ctx_t
- ctypedef uint64_t rados_snap_t
- ctypedef void *rados_write_op_t
- ctypedef void *rados_read_op_t
- ctypedef void *rados_completion_t
- ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
- ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
- uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
- ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
- uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
-
-
- cdef struct rados_cluster_stat_t:
- uint64_t kb
- uint64_t kb_used
- uint64_t kb_avail
- uint64_t num_objects
-
- cdef struct rados_pool_stat_t:
- uint64_t num_bytes
- uint64_t num_kb
- uint64_t num_objects
- uint64_t num_object_clones
- uint64_t num_object_copies
- uint64_t num_objects_missing_on_primary
- uint64_t num_objects_unfound
- uint64_t num_objects_degraded
- uint64_t num_rd
- uint64_t num_rd_kb
- uint64_t num_wr
- uint64_t num_wr_kb
-
- void rados_buffer_free(char *buf)
-
- void rados_version(int *major, int *minor, int *extra)
- int rados_create2(rados_t *pcluster, const char *const clustername,
- const char * const name, uint64_t flags)
- int rados_create_with_context(rados_t *cluster, rados_config_t cct)
- int rados_connect(rados_t cluster)
- void rados_shutdown(rados_t cluster)
- int rados_conf_read_file(rados_t cluster, const char *path)
- int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
- int rados_conf_parse_env(rados_t cluster, const char *var)
- int rados_conf_set(rados_t cluster, char *option, const char *value)
- int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
-
- int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
- int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
- int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
- int rados_pool_create(rados_t cluster, const char *pool_name)
- int rados_pool_create_with_auid(rados_t cluster, const char *pool_name, uint64_t auid)
- int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
- int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t auid, uint8_t crush_rule_num)
- int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
- int rados_pool_list(rados_t cluster, char *buf, size_t len)
- int rados_pool_delete(rados_t cluster, const char *pool_name)
- int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
-
- int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
- int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
- int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
- int rados_application_enable(rados_ioctx_t io, const char *app_name,
- int force)
- int rados_application_list(rados_ioctx_t io, char *values,
- size_t *values_len)
- int rados_application_metadata_get(rados_ioctx_t io, const char *app_name,
- const char *key, char *value,
- size_t *value_len)
- int rados_application_metadata_set(rados_ioctx_t io, const char *app_name,
- const char *key, const char *value)
- int rados_application_metadata_remove(rados_ioctx_t io,
- const char *app_name, const char *key)
- int rados_application_metadata_list(rados_ioctx_t io,
- const char *app_name, char *keys,
- size_t *key_len, char *values,
- size_t *value_len)
- int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
- int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
- const char *inbuf, size_t inbuflen,
- char **outbuf, size_t *outbuflen,
- char **outs, size_t *outslen)
- int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
- const char *inbuf, size_t inbuflen,
- char **outbuf, size_t *outbuflen,
- char **outs, size_t *outslen)
- int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
- const char *inbuf, size_t inbuflen,
- char **outbuf, size_t *outbuflen,
- char **outs, size_t *outslen)
- int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
- const char *inbuf, size_t inbuflen,
- char **outbuf, size_t *outbuflen,
- char **outs, size_t *outslen)
- int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
- const char *inbuf, size_t inbuflen,
- char **outbuf, size_t *outbuflen,
- char **outs, size_t *outslen)
- int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
- int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
-
- int rados_wait_for_latest_osdmap(rados_t cluster)
-
- int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
- void rados_ioctx_destroy(rados_ioctx_t io)
- int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
- void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
- void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
-
- uint64_t rados_get_last_version(rados_ioctx_t io)
- int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
- int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
- int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
- int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
- int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
- int rados_remove(rados_ioctx_t io, const char *oid)
- int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
- int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
- int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
- int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
- int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
- int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
- void rados_getxattrs_end(rados_xattrs_iter_t iter)
-
- int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
- int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
- void rados_nobjects_list_close(rados_list_ctx_t ctx)
-
- int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
- int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
- int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
- int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
- int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
- void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
- int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
- int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
-
- int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
- const char * cookie, const char * desc,
- timeval * duration, uint8_t flags)
- int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
- const char * cookie, const char * tag, const char * desc,
- timeval * duration, uint8_t flags)
- int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
-
- rados_write_op_t rados_create_write_op()
- void rados_release_write_op(rados_write_op_t write_op)
-
- rados_read_op_t rados_create_read_op()
- void rados_release_read_op(rados_read_op_t read_op)
-
- int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
- void rados_aio_release(rados_completion_t c)
- int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
- int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
- int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
- int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
- int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
- int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
- int rados_aio_flush(rados_ioctx_t io)
-
- int rados_aio_get_return_value(rados_completion_t c)
- int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
- int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
- int rados_aio_wait_for_complete(rados_completion_t c)
- int rados_aio_wait_for_safe(rados_completion_t c)
- int rados_aio_is_complete(rados_completion_t c)
- int rados_aio_is_safe(rados_completion_t c)
-
- int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
- const char * in_buf, size_t in_len, char * buf, size_t out_len)
- int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
- const char * in_buf, size_t in_len, char * buf, size_t out_len)
-
- int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
- int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
- void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
- void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
- void rados_write_op_omap_clear(rados_write_op_t write_op)
- void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
-
- void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
- void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
- void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
- void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
- void rados_write_op_remove(rados_write_op_t write_op)
- void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
- void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
-
- void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
- void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
- void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
- int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
- int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
- void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
- int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
- void rados_omap_get_end(rados_omap_iter_t iter)
-
-
-LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
-LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
-LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
-LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
-LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
-LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
-LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
-
-LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
-
-LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
-LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
-LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
-LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
-LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
-LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
-LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
-
-LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
-
-LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
-LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
-
-ANONYMOUS_AUID = 0xffffffffffffffff
-ADMIN_AUID = 0
-
-
-class Error(Exception):
- """ `Error` class, derived from `Exception` """
- pass
-
-
-class InvalidArgumentError(Error):
- pass
-
-
-class OSError(Error):
- """ `OSError` class, derived from `Error` """
- def __init__(self, message, errno=None):
- super(OSError, self).__init__(message)
- self.errno = errno
-
- def __str__(self):
- msg = super(OSError, self).__str__()
- if self.errno is None:
- return msg
- return '[errno {0}] {1}'.format(self.errno, msg)
-
- def __reduce__(self):
- return (self.__class__, (self.message, self.errno))
-
-class InterruptedOrTimeoutError(OSError):
- """ `InterruptedOrTimeoutError` class, derived from `OSError` """
- pass
-
-
-class PermissionError(OSError):
- """ `PermissionError` class, derived from `OSError` """
- pass
-
-
-class PermissionDeniedError(OSError):
- """ deal with EACCES related. """
- pass
-
-
-class ObjectNotFound(OSError):
- """ `ObjectNotFound` class, derived from `OSError` """
- pass
-
-
-class NoData(OSError):
- """ `NoData` class, derived from `OSError` """
- pass
-
-
-class ObjectExists(OSError):
- """ `ObjectExists` class, derived from `OSError` """
- pass
-
-
-class ObjectBusy(OSError):
- """ `ObjectBusy` class, derived from `IOError` """
- pass
-
-
-class IOError(OSError):
- """ `ObjectBusy` class, derived from `OSError` """
- pass
-
-
-class NoSpace(OSError):
- """ `NoSpace` class, derived from `OSError` """
- pass
-
-
-class RadosStateError(Error):
- """ `RadosStateError` class, derived from `Error` """
- pass
-
-
-class IoctxStateError(Error):
- """ `IoctxStateError` class, derived from `Error` """
- pass
-
-
-class ObjectStateError(Error):
- """ `ObjectStateError` class, derived from `Error` """
- pass
-
-
-class LogicError(Error):
- """ `` class, derived from `Error` """
- pass
-
-
-class TimedOut(OSError):
- """ `TimedOut` class, derived from `OSError` """
- pass
-
-
-IF UNAME_SYSNAME == "FreeBSD":
- cdef errno_to_exception = {
- errno.EPERM : PermissionError,
- errno.ENOENT : ObjectNotFound,
- errno.EIO : IOError,
- errno.ENOSPC : NoSpace,
- errno.EEXIST : ObjectExists,
- errno.EBUSY : ObjectBusy,
- errno.ENOATTR : NoData,
- errno.EINTR : InterruptedOrTimeoutError,
- errno.ETIMEDOUT : TimedOut,
- errno.EACCES : PermissionDeniedError,
- errno.EINVAL : InvalidArgumentError,
- }
-ELSE:
- cdef errno_to_exception = {
- errno.EPERM : PermissionError,
- errno.ENOENT : ObjectNotFound,
- errno.EIO : IOError,
- errno.ENOSPC : NoSpace,
- errno.EEXIST : ObjectExists,
- errno.EBUSY : ObjectBusy,
- errno.ENODATA : NoData,
- errno.EINTR : InterruptedOrTimeoutError,
- errno.ETIMEDOUT : TimedOut,
- errno.EACCES : PermissionDeniedError,
- errno.EINVAL : InvalidArgumentError,
- }
-
-
-cdef make_ex(ret, msg):
- """
- Translate a librados return code into an exception.
-
- :param ret: the return code
- :type ret: int
- :param msg: the error message to use
- :type msg: str
- :returns: a subclass of :class:`Error`
- """
- ret = abs(ret)
- if ret in errno_to_exception:
- return errno_to_exception[ret](msg, errno=ret)
- else:
- return OSError(msg, errno=ret)
-
-
-# helper to specify an optional argument, where in addition to `cls`, `None`
-# is also acceptable
-def opt(cls):
- return (cls, None)
-
-
-# validate argument types of an instance method
-# kwargs is an un-ordered dict, so use args instead
-def requires(*types):
- def is_type_of(v, t):
- if t is None:
- return v is None
- else:
- return isinstance(v, t)
-
- def check_type(val, arg_name, arg_type):
- if isinstance(arg_type, tuple):
- if any(is_type_of(val, t) for t in arg_type):
- return
- type_names = ' or '.join('None' if t is None else t.__name__
- for t in arg_type)
- raise TypeError('%s must be %s' % (arg_name, type_names))
- else:
- if is_type_of(val, arg_type):
- return
- assert(arg_type is not None)
- raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
-
- def wrapper(f):
- # FIXME(sileht): this stop with
- # AttributeError: 'method_descriptor' object has no attribute '__module__'
- # @wraps(f)
- def validate_func(*args, **kwargs):
- # ignore the `self` arg
- pos_args = zip(args[1:], types)
- named_args = ((kwargs[name], (name, spec)) for name, spec in types
- if name in kwargs)
- for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
- check_type(arg_val, arg_name, arg_type)
- return f(*args, **kwargs)
- return validate_func
- return wrapper
-
-
-def cstr(val, name, encoding="utf-8", opt=False):
- """
- Create a byte string from a Python string
-
- :param basestring val: Python string
- :param str name: Name of the string parameter, for exceptions
- :param str encoding: Encoding to use
- :param bool opt: If True, None is allowed
- :rtype: bytes
- :raises: :class:`InvalidArgument`
- """
- if opt and val is None:
- return None
- if isinstance(val, bytes):
- return val
- elif isinstance(val, unicode):
- return val.encode(encoding)
- else:
- raise TypeError('%s must be a string' % name)
-
-
-def cstr_list(list_str, name, encoding="utf-8"):
- return [cstr(s, name) for s in list_str]
-
-
-def decode_cstr(val, encoding="utf-8"):
- """
- Decode a byte string into a Python string.
-
- :param bytes val: byte string
- :rtype: unicode or None
- """
- if val is None:
- return None
-
- return val.decode(encoding)
-
-
-cdef char* opt_str(s) except? NULL:
- if s is None:
- return NULL
- return s
-
-
-cdef void* realloc_chk(void* ptr, size_t size) except NULL:
- cdef void *ret = realloc(ptr, size)
- if ret == NULL:
- raise MemoryError("realloc failed")
- return ret
-
-
-cdef size_t * to_csize_t_array(list_int):
- cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
- if ret == NULL:
- raise MemoryError("malloc failed")
- for i in xrange(len(list_int)):
- ret[i] = <size_t>list_int[i]
- return ret
-
-
-cdef char ** to_bytes_array(list_bytes):
- cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
- if ret == NULL:
- raise MemoryError("malloc failed")
- for i in xrange(len(list_bytes)):
- ret[i] = <char *>list_bytes[i]
- return ret
-
-
-
-cdef int __monitor_callback(void *arg, const char *line, const char *who,
- uint64_t sec, uint64_t nsec, uint64_t seq,
- const char *level, const char *msg) with gil:
- cdef object cb_info = <object>arg
- cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
- return 0
-
-cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
- const char *who,
- const char *name,
- uint64_t sec, uint64_t nsec, uint64_t seq,
- const char *level, const char *msg) with gil:
- cdef object cb_info = <object>arg
- cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
- return 0
-
-
-class Version(object):
- """ Version information """
- def __init__(self, major, minor, extra):
- self.major = major
- self.minor = minor
- self.extra = extra
-
- def __str__(self):
- return "%d.%d.%d" % (self.major, self.minor, self.extra)
-
-
-cdef class Rados(object):
- """This class wraps librados functions"""
- # NOTE(sileht): attributes declared in .pyd
-
- def __init__(self, *args, **kwargs):
- PyEval_InitThreads()
- self.__setup(*args, **kwargs)
-
- @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
- ('conffile', opt(str_type)))
- def __setup(self, rados_id=None, name=None, clustername=None,
- conf_defaults=None, conffile=None, conf=None, flags=0,
- context=None):
- self.monitor_callback = None
- self.monitor_callback2 = None
- self.parsed_args = []
- self.conf_defaults = conf_defaults
- self.conffile = conffile
- self.rados_id = rados_id
-
- if rados_id and name:
- raise Error("Rados(): can't supply both rados_id and name")
- elif rados_id:
- name = 'client.' + rados_id
- elif name is None:
- name = 'client.admin'
- if clustername is None:
- clustername = ''
-
- name = cstr(name, 'name')
- clustername = cstr(clustername, 'clustername')
- cdef:
- char *_name = name
- char *_clustername = clustername
- int _flags = flags
- int ret
-
- if context:
- # Unpack void* (aka rados_config_t) from capsule
- rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
- with nogil:
- ret = rados_create_with_context(&self.cluster, rados_config)
- else:
- with nogil:
- ret = rados_create2(&self.cluster, _clustername, _name, _flags)
- if ret != 0:
- raise Error("rados_initialize failed with error code: %d" % ret)
-
- self.state = "configuring"
- # order is important: conf_defaults, then conffile, then conf
- if conf_defaults:
- for key, value in conf_defaults.items():
- self.conf_set(key, value)
- if conffile is not None:
- # read the default conf file when '' is given
- if conffile == '':
- conffile = None
- self.conf_read_file(conffile)
- if conf:
- for key, value in conf.items():
- self.conf_set(key, value)
-
- def require_state(self, *args):
- """
- Checks if the Rados object is in a special state
-
- :raises: RadosStateError
- """
- if self.state in args:
- return
- raise RadosStateError("You cannot perform that operation on a \
-Rados object in state %s." % self.state)
-
- def shutdown(self):
- """
- Disconnects from the cluster. Call this explicitly when a
- Rados.connect()ed object is no longer used.
- """
- if self.state != "shutdown":
- with nogil:
- rados_shutdown(self.cluster)
- self.state = "shutdown"
-
- def __enter__(self):
- self.connect()
- return self
-
- def __exit__(self, type_, value, traceback):
- self.shutdown()
- return False
-
- def version(self):
- """
- Get the version number of the ``librados`` C library.
-
- :returns: a tuple of ``(major, minor, extra)`` components of the
- librados version
- """
- cdef int major = 0
- cdef int minor = 0
- cdef int extra = 0
- with nogil:
- rados_version(&major, &minor, &extra)
- return Version(major, minor, extra)
-
- @requires(('path', opt(str_type)))
- def conf_read_file(self, path=None):
- """
- Configure the cluster handle using a Ceph config file.
-
- :param path: path to the config file
- :type path: str
- """
- self.require_state("configuring", "connected")
- path = cstr(path, 'path', opt=True)
- cdef:
- char *_path = opt_str(path)
- with nogil:
- ret = rados_conf_read_file(self.cluster, _path)
- if ret != 0:
- raise make_ex(ret, "error calling conf_read_file")
-
- def conf_parse_argv(self, args):
- """
- Parse known arguments from args, and remove; returned
- args contain only those unknown to ceph
- """
- self.require_state("configuring", "connected")
- if not args:
- return
-
- cargs = cstr_list(args, 'args')
- cdef:
- int _argc = len(args)
- char **_argv = to_bytes_array(cargs)
- char **_remargv = NULL
-
- try:
- _remargv = <char **>malloc(_argc * sizeof(char *))
- with nogil:
- ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
- <const char**>_argv,
- <const char**>_remargv)
- if ret:
- raise make_ex(ret, "error calling conf_parse_argv_remainder")
-
- # _remargv was allocated with fixed argc; collapse return
- # list to eliminate any missing args
- retargs = [decode_cstr(a) for a in _remargv[:_argc]
- if a != NULL]
- self.parsed_args = args
- return retargs
- finally:
- free(_argv)
- free(_remargv)
-
- def conf_parse_env(self, var='CEPH_ARGS'):
- """
- Parse known arguments from an environment variable, normally
- CEPH_ARGS.
- """
- self.require_state("configuring", "connected")
- if not var:
- return
-
- var = cstr(var, 'var')
- cdef:
- char *_var = var
- with nogil:
- ret = rados_conf_parse_env(self.cluster, _var)
- if ret != 0:
- raise make_ex(ret, "error calling conf_parse_env")
-
- @requires(('option', str_type))
- def conf_get(self, option):
- """
- Get the value of a configuration option
-
- :param option: which option to read
- :type option: str
-
- :returns: str - value of the option or None
- :raises: :class:`TypeError`
- """
- self.require_state("configuring", "connected")
- option = cstr(option, 'option')
- cdef:
- char *_option = option
- size_t length = 20
- char *ret_buf = NULL
-
- try:
- while True:
- ret_buf = <char *>realloc_chk(ret_buf, length)
- with nogil:
- ret = rados_conf_get(self.cluster, _option, ret_buf, length)
- if ret == 0:
- return decode_cstr(ret_buf)
- elif ret == -errno.ENAMETOOLONG:
- length = length * 2
- elif ret == -errno.ENOENT:
- return None
- else:
- raise make_ex(ret, "error calling conf_get")
- finally:
- free(ret_buf)
-
- @requires(('option', str_type), ('val', str_type))
- def conf_set(self, option, val):
- """
- Set the value of a configuration option
-
- :param option: which option to set
- :type option: str
- :param option: value of the option
- :type option: str
-
- :raises: :class:`TypeError`, :class:`ObjectNotFound`
- """
- self.require_state("configuring", "connected")
- option = cstr(option, 'option')
- val = cstr(val, 'val')
- cdef:
- char *_option = option
- char *_val = val
-
- with nogil:
- ret = rados_conf_set(self.cluster, _option, _val)
- if ret != 0:
- raise make_ex(ret, "error calling conf_set")
-
- def ping_monitor(self, mon_id):
- """
- Ping a monitor to assess liveness
-
- May be used as a simply way to assess liveness, or to obtain
- information about the monitor in a simple way even in the
- absence of quorum.
-
- :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
- :type mon_id: str
- :returns: the string reply from the monitor
- """
-
- self.require_state("configuring", "connected")
-
- mon_id = cstr(mon_id, 'mon_id')
- cdef:
- char *_mon_id = mon_id
- size_t outstrlen = 0
- char *outstr
-
- with nogil:
- ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
-
- if ret != 0:
- raise make_ex(ret, "error calling ping_monitor")
-
- if outstrlen:
- my_outstr = outstr[:outstrlen]
- rados_buffer_free(outstr)
- return decode_cstr(my_outstr)
-
- def connect(self, timeout=0):
- """
- Connect to the cluster. Use shutdown() to release resources.
- """
- self.require_state("configuring")
- # NOTE(sileht): timeout was supported by old python API,
- # but this is not something available in C API, so ignore
- # for now and remove it later
- with nogil:
- ret = rados_connect(self.cluster)
- if ret != 0:
- raise make_ex(ret, "error connecting to the cluster")
- self.state = "connected"
-
- def get_cluster_stats(self):
- """
- Read usage info about the cluster
-
- This tells you total space, space used, space available, and number
- of objects. These are not updated immediately when data is written,
- they are eventually consistent.
-
- :returns: dict - contains the following keys:
-
- - ``kb`` (int) - total space
-
- - ``kb_used`` (int) - space used
-
- - ``kb_avail`` (int) - free space available
-
- - ``num_objects`` (int) - number of objects
-
- """
- cdef:
- rados_cluster_stat_t stats
-
- with nogil:
- ret = rados_cluster_stat(self.cluster, &stats)
-
- if ret < 0:
- raise make_ex(
- ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
- return {'kb': stats.kb,
- 'kb_used': stats.kb_used,
- 'kb_avail': stats.kb_avail,
- 'num_objects': stats.num_objects}
-
- @requires(('pool_name', str_type))
- def pool_exists(self, pool_name):
- """
- Checks if a given pool exists.
-
- :param pool_name: name of the pool to check
- :type pool_name: str
-
- :raises: :class:`TypeError`, :class:`Error`
- :returns: bool - whether the pool exists, false otherwise.
- """
- self.require_state("connected")
-
- pool_name = cstr(pool_name, 'pool_name')
- cdef:
- char *_pool_name = pool_name
-
- with nogil:
- ret = rados_pool_lookup(self.cluster, _pool_name)
- if ret >= 0:
- return True
- elif ret == -errno.ENOENT:
- return False
- else:
- raise make_ex(ret, "error looking up pool '%s'" % pool_name)
-
- @requires(('pool_name', str_type))
- def pool_lookup(self, pool_name):
- """
- Returns a pool's ID based on its name.
-
- :param pool_name: name of the pool to look up
- :type pool_name: str
-
- :raises: :class:`TypeError`, :class:`Error`
- :returns: int - pool ID, or None if it doesn't exist
- """
- self.require_state("connected")
- pool_name = cstr(pool_name, 'pool_name')
- cdef:
- char *_pool_name = pool_name
-
- with nogil:
- ret = rados_pool_lookup(self.cluster, _pool_name)
- if ret >= 0:
- return int(ret)
- elif ret == -errno.ENOENT:
- return None
- else:
- raise make_ex(ret, "error looking up pool '%s'" % pool_name)
-
- @requires(('pool_id', int))
- def pool_reverse_lookup(self, pool_id):
- """
- Returns a pool's name based on its ID.
-
- :param pool_id: ID of the pool to look up
- :type pool_id: int
-
- :raises: :class:`TypeError`, :class:`Error`
- :returns: string - pool name, or None if it doesn't exist
- """
- self.require_state("connected")
- cdef:
- int64_t _pool_id = pool_id
- size_t size = 512
- char *name = NULL
-
- try:
- while True:
- name = <char *>realloc_chk(name, size)
- with nogil:
- ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
- if ret >= 0:
- break
- elif ret != -errno.ERANGE and size <= 4096:
- size *= 2
- elif ret == -errno.ENOENT:
- return None
- elif ret < 0:
- raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
-
- return decode_cstr(name)
-
- finally:
- free(name)
-
- @requires(('pool_name', str_type), ('auid', opt(int)), ('crush_rule', opt(int)))
- def create_pool(self, pool_name, auid=None, crush_rule=None):
- """
- Create a pool:
- - with default settings: if auid=None and crush_rule=None
- - owned by a specific auid: auid given and crush_rule=None
- - with a specific CRUSH rule: if auid=None and crush_rule given
- - with a specific CRUSH rule and auid: if auid and crush_rule given
-
- :param pool_name: name of the pool to create
- :type pool_name: str
- :param auid: the id of the owner of the new pool
- :type auid: int
- :param crush_rule: rule to use for placement in the new pool
- :type crush_rule: int
-
- :raises: :class:`TypeError`, :class:`Error`
- """
- self.require_state("connected")
-
- pool_name = cstr(pool_name, 'pool_name')
- cdef:
- char *_pool_name = pool_name
- uint8_t _crush_rule
- uint64_t _auid
-
- if auid is None and crush_rule is None:
- with nogil:
- ret = rados_pool_create(self.cluster, _pool_name)
- elif auid is None:
- _crush_rule = crush_rule
- with nogil:
- ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
- elif crush_rule is None:
- _auid = auid
- with nogil:
- ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
- else:
- _auid = auid
- _crush_rule = crush_rule
- with nogil:
- ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
- if ret < 0:
- raise make_ex(ret, "error creating pool '%s'" % pool_name)
-
- @requires(('pool_id', int))
- def get_pool_base_tier(self, pool_id):
- """
- Get base pool
-
- :returns: base pool, or pool_id if tiering is not configured for the pool
- """
- self.require_state("connected")
- cdef:
- int64_t base_tier = 0
- int64_t _pool_id = pool_id
-
- with nogil:
- ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
- if ret < 0:
- raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
- return int(base_tier)
-
- @requires(('pool_name', str_type))
- def delete_pool(self, pool_name):
- """
- Delete a pool and all data inside it.
-
- The pool is removed from the cluster immediately,
- but the actual data is deleted in the background.
-
- :param pool_name: name of the pool to delete
- :type pool_name: str
-
- :raises: :class:`TypeError`, :class:`Error`
- """
- self.require_state("connected")
-
- pool_name = cstr(pool_name, 'pool_name')
- cdef:
- char *_pool_name = pool_name
-
- with nogil:
- ret = rados_pool_delete(self.cluster, _pool_name)
- if ret < 0:
- raise make_ex(ret, "error deleting pool '%s'" % pool_name)
-
- @requires(('pool_id', int))
- def get_inconsistent_pgs(self, pool_id):
- """
- List inconsistent placement groups in the given pool
-
- :param pool_id: ID of the pool in which PGs are listed
- :type pool_id: int
- :returns: list - inconsistent placement groups
- """
- self.require_state("connected")
- cdef:
- int64_t pool = pool_id
- size_t size = 512
- char *pgs = NULL
-
- try:
- while True:
- pgs = <char *>realloc_chk(pgs, size);
- with nogil:
- ret = rados_inconsistent_pg_list(self.cluster, pool,
- pgs, size)
- if ret > <int>size:
- size *= 2
- elif ret >= 0:
- break
- else:
- raise make_ex(ret, "error calling inconsistent_pg_list")
- return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
- finally:
- free(pgs)
-
- def list_pools(self):
- """
- Gets a list of pool names.
-
- :returns: list - of pool names.
- """
- self.require_state("connected")
- cdef:
- size_t size = 512
- char *c_names = NULL
-
- try:
- while True:
- c_names = <char *>realloc_chk(c_names, size)
- with nogil:
- ret = rados_pool_list(self.cluster, c_names, size)
- if ret > <int>size:
- size *= 2
- elif ret >= 0:
- break
- return [name for name in decode_cstr(c_names[:ret]).split('\0')
- if name]
- finally:
- free(c_names)
-
- def get_fsid(self):
- """
- Get the fsid of the cluster as a hexadecimal string.
-
- :raises: :class:`Error`
- :returns: str - cluster fsid
- """
- self.require_state("connected")
- cdef:
- char *ret_buf
- size_t buf_len = 37
- PyObject* ret_s = NULL
-
- ret_s = PyBytes_FromStringAndSize(NULL, buf_len)
- try:
- ret_buf = PyBytes_AsString(ret_s)
- with nogil:
- ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
- if ret < 0:
- raise make_ex(ret, "error getting cluster fsid")
- if ret != <int>buf_len:
- _PyBytes_Resize(&ret_s, ret)
- return <object>ret_s
- finally:
- # We DECREF unconditionally: the cast to object above will have
- # INCREFed if necessary. This also takes care of exceptions,
- # including if _PyString_Resize fails (that will free the string
- # itself and set ret_s to NULL, hence XDECREF).
- ref.Py_XDECREF(ret_s)
-
- @requires(('ioctx_name', str_type))
- def open_ioctx(self, ioctx_name):
- """
- Create an io context
-
- The io context allows you to perform operations within a particular
- pool.
-
- :param ioctx_name: name of the pool
- :type ioctx_name: str
-
- :raises: :class:`TypeError`, :class:`Error`
- :returns: Ioctx - Rados Ioctx object
- """
- self.require_state("connected")
- ioctx_name = cstr(ioctx_name, 'ioctx_name')
- cdef:
- rados_ioctx_t ioctx
- char *_ioctx_name = ioctx_name
- with nogil:
- ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
- if ret < 0:
- raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
- io = Ioctx(ioctx_name)
- io.io = ioctx
- return io
-
- def mon_command(self, cmd, inbuf, timeout=0, target=None):
- """
- mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
- returns (int ret, string outbuf, string outs)
- """
- # NOTE(sileht): timeout is ignored because C API doesn't provide
- # timeout argument, but we keep it for backward compat with old python binding
-
- self.require_state("connected")
- cmd = cstr_list(cmd, 'c')
-
- if isinstance(target, int):
- # NOTE(sileht): looks weird but test_monmap_dump pass int
- target = str(target)
-
- target = cstr(target, 'target', opt=True)
- inbuf = cstr(inbuf, 'inbuf')
-
- cdef:
- char *_target = opt_str(target)
- char **_cmd = to_bytes_array(cmd)
- size_t _cmdlen = len(cmd)
-
- char *_inbuf = inbuf
- size_t _inbuf_len = len(inbuf)
-
- char *_outbuf
- size_t _outbuf_len
- char *_outs
- size_t _outs_len
-
- try:
- if target:
- with nogil:
- ret = rados_mon_command_target(self.cluster, _target,
- <const char **>_cmd, _cmdlen,
- <const char*>_inbuf, _inbuf_len,
- &_outbuf, &_outbuf_len,
- &_outs, &_outs_len)
- else:
- with nogil:
- ret = rados_mon_command(self.cluster,
- <const char **>_cmd, _cmdlen,
- <const char*>_inbuf, _inbuf_len,
- &_outbuf, &_outbuf_len,
- &_outs, &_outs_len)
-
- my_outs = decode_cstr(_outs[:_outs_len])
- my_outbuf = _outbuf[:_outbuf_len]
- if _outs_len:
- rados_buffer_free(_outs)
- if _outbuf_len:
- rados_buffer_free(_outbuf)
- return (ret, my_outbuf, my_outs)
- finally:
- free(_cmd)
-
- def osd_command(self, osdid, cmd, inbuf, timeout=0):
- """
- osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
- returns (int ret, string outbuf, string outs)
- """
- # NOTE(sileht): timeout is ignored because C API doesn't provide
- # timeout argument, but we keep it for backward compat with old python binding
- self.require_state("connected")
-
- cmd = cstr_list(cmd, 'cmd')
- inbuf = cstr(inbuf, 'inbuf')
-
- cdef:
- int _osdid = osdid
- char **_cmd = to_bytes_array(cmd)
- size_t _cmdlen = len(cmd)
-
- char *_inbuf = inbuf
- size_t _inbuf_len = len(inbuf)
-
- char *_outbuf
- size_t _outbuf_len
- char *_outs
- size_t _outs_len
-
- try:
- with nogil:
- ret = rados_osd_command(self.cluster, _osdid,
- <const char **>_cmd, _cmdlen,
- <const char*>_inbuf, _inbuf_len,
- &_outbuf, &_outbuf_len,
- &_outs, &_outs_len)
-
- my_outs = decode_cstr(_outs[:_outs_len])
- my_outbuf = _outbuf[:_outbuf_len]
- if _outs_len:
- rados_buffer_free(_outs)
- if _outbuf_len:
- rados_buffer_free(_outbuf)
- return (ret, my_outbuf, my_outs)
- finally:
- free(_cmd)
-
- def mgr_command(self, cmd, inbuf, timeout=0):
- """
- returns (int ret, string outbuf, string outs)
- """
- # NOTE(sileht): timeout is ignored because C API doesn't provide
- # timeout argument, but we keep it for backward compat with old python binding
- self.require_state("connected")
-
- cmd = cstr_list(cmd, 'cmd')
- inbuf = cstr(inbuf, 'inbuf')
-
- cdef:
- char **_cmd = to_bytes_array(cmd)
- size_t _cmdlen = len(cmd)
-
- char *_inbuf = inbuf
- size_t _inbuf_len = len(inbuf)
-
- char *_outbuf
- size_t _outbuf_len
- char *_outs
- size_t _outs_len
-
- try:
- with nogil:
- ret = rados_mgr_command(self.cluster,
- <const char **>_cmd, _cmdlen,
- <const char*>_inbuf, _inbuf_len,
- &_outbuf, &_outbuf_len,
- &_outs, &_outs_len)
-
- my_outs = decode_cstr(_outs[:_outs_len])
- my_outbuf = _outbuf[:_outbuf_len]
- if _outs_len:
- rados_buffer_free(_outs)
- if _outbuf_len:
- rados_buffer_free(_outbuf)
- return (ret, my_outbuf, my_outs)
- finally:
- free(_cmd)
-
- def pg_command(self, pgid, cmd, inbuf, timeout=0):
- """
- pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
- returns (int ret, string outbuf, string outs)
- """
- # NOTE(sileht): timeout is ignored because C API doesn't provide
- # timeout argument, but we keep it for backward compat with old python binding
- self.require_state("connected")
-
- pgid = cstr(pgid, 'pgid')
- cmd = cstr_list(cmd, 'cmd')
- inbuf = cstr(inbuf, 'inbuf')
-
- cdef:
- char *_pgid = pgid
- char **_cmd = to_bytes_array(cmd)
- size_t _cmdlen = len(cmd)
-
- char *_inbuf = inbuf
- size_t _inbuf_len = len(inbuf)
-
- char *_outbuf
- size_t _outbuf_len
- char *_outs
- size_t _outs_len
-
- try:
- with nogil:
- ret = rados_pg_command(self.cluster, _pgid,
- <const char **>_cmd, _cmdlen,
- <const char *>_inbuf, _inbuf_len,
- &_outbuf, &_outbuf_len,
- &_outs, &_outs_len)
-
- my_outs = decode_cstr(_outs[:_outs_len])
- my_outbuf = _outbuf[:_outbuf_len]
- if _outs_len:
- rados_buffer_free(_outs)
- if _outbuf_len:
- rados_buffer_free(_outbuf)
- return (ret, my_outbuf, my_outs)
- finally:
- free(_cmd)
-
- def wait_for_latest_osdmap(self):
- self.require_state("connected")
- with nogil:
- ret = rados_wait_for_latest_osdmap(self.cluster)
- return ret
-
- def blacklist_add(self, client_address, expire_seconds=0):
- """
- Blacklist a client from the OSDs
-
- :param client_address: client address
- :type client_address: str
- :param expire_seconds: number of seconds to blacklist
- :type expire_seconds: int
-
- :raises: :class:`Error`
- """
- self.require_state("connected")
- client_address = cstr(client_address, 'client_address')
- cdef:
- uint32_t _expire_seconds = expire_seconds
- char *_client_address = client_address
-
- with nogil:
- ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
- if ret < 0:
- raise make_ex(ret, "error blacklisting client '%s'" % client_address)
-
- def monitor_log(self, level, callback, arg):
- if level not in MONITOR_LEVELS:
- raise LogicError("invalid monitor level " + level)
- if callback is not None and not callable(callback):
- raise LogicError("callback must be a callable function or None")
-
- level = cstr(level, 'level')
- cdef char *_level = level
-
- if callback is None:
- with nogil:
- r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
- self.monitor_callback = None
- self.monitor_callback2 = None
- return
-
- cb = (callback, arg)
- cdef PyObject* _arg = <PyObject*>cb
- with nogil:
- r = rados_monitor_log(self.cluster, <const char*>_level,
- <rados_log_callback_t>&__monitor_callback, _arg)
-
- if r:
- raise make_ex(r, 'error calling rados_monitor_log')
- # NOTE(sileht): Prevents the callback method from being garbage collected
- self.monitor_callback = cb
- self.monitor_callback2 = None
-
- def monitor_log2(self, level, callback, arg):
- if level not in MONITOR_LEVELS:
- raise LogicError("invalid monitor level " + level)
- if callback is not None and not callable(callback):
- raise LogicError("callback must be a callable function or None")
-
- level = cstr(level, 'level')
- cdef char *_level = level
-
- if callback is None:
- with nogil:
- r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
- self.monitor_callback = None
- self.monitor_callback2 = None
- return
-
- cb = (callback, arg)
- cdef PyObject* _arg = <PyObject*>cb
- with nogil:
- r = rados_monitor_log2(self.cluster, <const char*>_level,
- <rados_log_callback2_t>&__monitor_callback2, _arg)
-
- if r:
- raise make_ex(r, 'error calling rados_monitor_log')
- # NOTE(sileht): Prevents the callback method from being garbage collected
- self.monitor_callback = None
- self.monitor_callback2 = cb
-
-
-cdef class OmapIterator(object):
- """Omap iterator"""
-
- cdef public Ioctx ioctx
- cdef rados_omap_iter_t ctx
-
- def __cinit__(self, Ioctx ioctx):
- self.ioctx = ioctx
-
- def __iter__(self):
- return self
-
- def __next__(self):
- """
- Get the next key-value pair in the object
- :returns: next rados.OmapItem
- """
- cdef:
- char *key_ = NULL
- char *val_ = NULL
- size_t len_
-
- with nogil:
- ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
-
- if ret != 0:
- raise make_ex(ret, "error iterating over the omap")
- if key_ == NULL:
- raise StopIteration()
- key = decode_cstr(key_)
- val = None
- if val_ != NULL:
- val = val_[:len_]
- return (key, val)
-
- def __dealloc__(self):
- with nogil:
- rados_omap_get_end(self.ctx)
-
-
-cdef class ObjectIterator(object):
- """rados.Ioctx Object iterator"""
-
- cdef rados_list_ctx_t ctx
-
- cdef public object ioctx
-
- def __cinit__(self, Ioctx ioctx):
- self.ioctx = ioctx
-
- with nogil:
- ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
- if ret < 0:
- raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
- % self.ioctx.name)
-
- def __iter__(self):
- return self
-
- def __next__(self):
- """
- Get the next object name and locator in the pool
-
- :raises: StopIteration
- :returns: next rados.Ioctx Object
- """
- cdef:
- const char *key_ = NULL
- const char *locator_ = NULL
- const char *nspace_ = NULL
-
- with nogil:
- ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
-
- if ret < 0:
- raise StopIteration()
-
- key = decode_cstr(key_)
- locator = decode_cstr(locator_) if locator_ != NULL else None
- nspace = decode_cstr(nspace_) if nspace_ != NULL else None
- return Object(self.ioctx, key, locator, nspace)
-
- def __dealloc__(self):
- with nogil:
- rados_nobjects_list_close(self.ctx)
-
-
-cdef class XattrIterator(object):
- """Extended attribute iterator"""
-
- cdef rados_xattrs_iter_t it
- cdef char* _oid
-
- cdef public Ioctx ioctx
- cdef public object oid
-
- def __cinit__(self, Ioctx ioctx, oid):
- self.ioctx = ioctx
- self.oid = cstr(oid, 'oid')
- self._oid = self.oid
-
- with nogil:
- ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
- if ret != 0:
- raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
-
- def __iter__(self):
- return self
-
- def __next__(self):
- """
- Get the next xattr on the object
-
- :raises: StopIteration
- :returns: pair - of name and value of the next Xattr
- """
- cdef:
- const char *name_ = NULL
- const char *val_ = NULL
- size_t len_ = 0
-
- with nogil:
- ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
- if ret != 0:
- raise make_ex(ret, "error iterating over the extended attributes \
-in '%s'" % self.oid)
- if name_ == NULL:
- raise StopIteration()
- name = decode_cstr(name_)
- val = val_[:len_]
- return (name, val)
-
- def __dealloc__(self):
- with nogil:
- rados_getxattrs_end(self.it)
-
-
-cdef class SnapIterator(object):
- """Snapshot iterator"""
-
- cdef public Ioctx ioctx
-
- cdef rados_snap_t *snaps
- cdef int max_snap
- cdef int cur_snap
-
- def __cinit__(self, Ioctx ioctx):
- self.ioctx = ioctx
- # We don't know how big a buffer we need until we've called the
- # function. So use the exponential doubling strategy.
- cdef int num_snaps = 10
- while True:
- self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
- num_snaps *
- sizeof(rados_snap_t))
-
- with nogil:
- ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
- if ret >= 0:
- self.max_snap = ret
- break
- elif ret != -errno.ERANGE:
- raise make_ex(ret, "error calling rados_snap_list for \
-ioctx '%s'" % self.ioctx.name)
- num_snaps = num_snaps * 2
- self.cur_snap = 0
-
- def __iter__(self):
- return self
-
- def __next__(self):
- """
- Get the next Snapshot
-
- :raises: :class:`Error`, StopIteration
- :returns: Snap - next snapshot
- """
- if self.cur_snap >= self.max_snap:
- raise StopIteration
-
- cdef:
- rados_snap_t snap_id = self.snaps[self.cur_snap]
- int name_len = 10
- char *name = NULL
-
- try:
- while True:
- name = <char *>realloc_chk(name, name_len)
- with nogil:
- ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
- if ret == 0:
- break
- elif ret != -errno.ERANGE:
- raise make_ex(ret, "rados_snap_get_name error")
- else:
- name_len = name_len * 2
-
- snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
- self.cur_snap = self.cur_snap + 1
- return snap
- finally:
- free(name)
-
-
-cdef class Snap(object):
- """Snapshot object"""
- cdef public Ioctx ioctx
- cdef public object name
-
- # NOTE(sileht): old API was storing the ctypes object
- # instead of the value ....
- cdef public rados_snap_t snap_id
-
- def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
- self.ioctx = ioctx
- self.name = name
- self.snap_id = snap_id
-
- def __str__(self):
- return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
- % (str(self.ioctx), self.name, self.snap_id)
-
- def get_timestamp(self):
- """
- Find when a snapshot in the current pool occurred
-
- :raises: :class:`Error`
- :returns: datetime - the data and time the snapshot was created
- """
- cdef time_t snap_time
-
- with nogil:
- ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
- if ret != 0:
- raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
- return datetime.fromtimestamp(snap_time)
-
-
-cdef class Completion(object):
- """completion object"""
-
- cdef public:
- Ioctx ioctx
- object oncomplete
- object onsafe
-
- cdef:
- rados_callback_t complete_cb
- rados_callback_t safe_cb
- rados_completion_t rados_comp
- PyObject* buf
-
- def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
- self.oncomplete = oncomplete
- self.onsafe = onsafe
- self.ioctx = ioctx
-
- def is_safe(self):
- """
- Is an asynchronous operation safe?
-
- This does not imply that the safe callback has finished.
-
- :returns: True if the operation is safe
- """
- with nogil:
- ret = rados_aio_is_safe(self.rados_comp)
- return ret == 1
-
- def is_complete(self):
- """
- Has an asynchronous operation completed?
-
- This does not imply that the safe callback has finished.
-
- :returns: True if the operation is completed
- """
- with nogil:
- ret = rados_aio_is_complete(self.rados_comp)
- return ret == 1
-
- def wait_for_safe(self):
- """
- Wait for an asynchronous operation to be marked safe
-
- This does not imply that the safe callback has finished.
- """
- with nogil:
- rados_aio_wait_for_safe(self.rados_comp)
-
- def wait_for_complete(self):
- """
- Wait for an asynchronous operation to complete
-
- This does not imply that the complete callback has finished.
- """
- with nogil:
- rados_aio_wait_for_complete(self.rados_comp)
-
- def wait_for_safe_and_cb(self):
- """
- Wait for an asynchronous operation to be marked safe and for
- the safe callback to have returned
- """
- with nogil:
- rados_aio_wait_for_safe_and_cb(self.rados_comp)
-
- def wait_for_complete_and_cb(self):
- """
- Wait for an asynchronous operation to complete and for the
- complete callback to have returned
-
- :returns: whether the operation is completed
- """
- with nogil:
- ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
- return ret
-
- def get_return_value(self):
- """
- Get the return value of an asychronous operation
-
- The return value is set when the operation is complete or safe,
- whichever comes first.
-
- :returns: int - return value of the operation
- """
- with nogil:
- ret = rados_aio_get_return_value(self.rados_comp)
- return ret
-
- def __dealloc__(self):
- """
- Release a completion
-
- Call this when you no longer need the completion. It may not be
- freed immediately if the operation is not acked and committed.
- """
- ref.Py_XDECREF(self.buf)
- self.buf = NULL
- if self.rados_comp != NULL:
- with nogil:
- rados_aio_release(self.rados_comp)
- self.rados_comp = NULL
-
- def _complete(self):
- self.oncomplete(self)
- with self.ioctx.lock:
- if self.oncomplete:
- self.ioctx.complete_completions.remove(self)
-
- def _safe(self):
- self.onsafe(self)
- with self.ioctx.lock:
- if self.onsafe:
- self.ioctx.safe_completions.remove(self)
-
- def _cleanup(self):
- with self.ioctx.lock:
- if self.oncomplete:
- self.ioctx.complete_completions.remove(self)
- if self.onsafe:
- self.ioctx.safe_completions.remove(self)
-
-
-class OpCtx(object):
- def __enter__(self):
- return self.create()
-
- def __exit__(self, type, msg, traceback):
- self.release()
-
-
-cdef class WriteOp(object):
- cdef rados_write_op_t write_op
-
- def create(self):
- with nogil:
- self.write_op = rados_create_write_op()
- return self
-
- def release(self):
- with nogil:
- rados_release_write_op(self.write_op)
-
- @requires(('exclusive', opt(int)))
- def new(self, exclusive=None):
- """
- Create the object.
- """
-
- cdef:
- int _exclusive = exclusive
-
- with nogil:
- rados_write_op_create(self.write_op, _exclusive, NULL)
-
-
- def remove(self):
- """
- Remove object.
- """
- with nogil:
- rados_write_op_remove(self.write_op)
-
- @requires(('flags', int))
- def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
- """
- Set flags for the last operation added to this write_op.
- :para flags: flags to apply to the last operation
- :type flags: int
- """
-
- cdef:
- int _flags = flags
-
- with nogil:
- rados_write_op_set_flags(self.write_op, _flags)
-
- @requires(('to_write', bytes))
- def append(self, to_write):
- """
- Append data to an object synchronously
- :param to_write: data to write
- :type to_write: bytes
- """
-
- cdef:
- char *_to_write = to_write
- size_t length = len(to_write)
-
- with nogil:
- rados_write_op_append(self.write_op, _to_write, length)
-
- @requires(('to_write', bytes))
- def write_full(self, to_write):
- """
- Write whole object, atomically replacing it.
- :param to_write: data to write
- :type to_write: bytes
- """
-
- cdef:
- char *_to_write = to_write
- size_t length = len(to_write)
-
- with nogil:
- rados_write_op_write_full(self.write_op, _to_write, length)
-
- @requires(('to_write', bytes), ('offset', int))
- def write(self, to_write, offset=0):
- """
- Write to offset.
- :param to_write: data to write
- :type to_write: bytes
- :param offset: byte offset in the object to begin writing at
- :type offset: int
- """
-
- cdef:
- char *_to_write = to_write
- size_t length = len(to_write)
- uint64_t _offset = offset
-
- with nogil:
- rados_write_op_write(self.write_op, _to_write, length, _offset)
-
- @requires(('offset', int), ('length', int))
- def zero(self, offset, length):
- """
- Zero part of an object.
- :param offset: byte offset in the object to begin writing at
- :type offset: int
- :param offset: number of zero to write
- :type offset: int
- """
-
- cdef:
- size_t _length = length
- uint64_t _offset = offset
-
- with nogil:
- rados_write_op_zero(self.write_op, _length, _offset)
-
- @requires(('offset', int))
- def truncate(self, offset):
- """
- Truncate an object.
- :param offset: byte offset in the object to begin truncating at
- :type offset: int
- """
-
- cdef:
- uint64_t _offset = offset
-
- with nogil:
- rados_write_op_truncate(self.write_op, _offset)
-
-
-class WriteOpCtx(WriteOp, OpCtx):
- """write operation context manager"""
-
-
-cdef class ReadOp(object):
- cdef rados_read_op_t read_op
-
- def create(self):
- with nogil:
- self.read_op = rados_create_read_op()
- return self
-
- def release(self):
- with nogil:
- rados_release_read_op(self.read_op)
-
- @requires(('flags', int))
- def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
- """
- Set flags for the last operation added to this read_op.
- :para flags: flags to apply to the last operation
- :type flags: int
- """
-
- cdef:
- int _flags = flags
-
- with nogil:
- rados_read_op_set_flags(self.read_op, _flags)
-
-
-class ReadOpCtx(ReadOp, OpCtx):
- """read operation context manager"""
-
-
-cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
- """
- Callback to onsafe() for asynchronous operations
- """
- cdef object cb = <object>args
- cb._safe()
- return 0
-
-
-cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
- """
- Callback to oncomplete() for asynchronous operations
- """
- cdef object cb = <object>args
- cb._complete()
- return 0
-
-
-cdef class Ioctx(object):
- """rados.Ioctx object"""
- # NOTE(sileht): attributes declared in .pyd
-
- def __init__(self, name):
- self.name = name
- self.state = "open"
-
- self.locator_key = ""
- self.nspace = ""
- self.lock = threading.Lock()
- self.safe_completions = []
- self.complete_completions = []
-
- def __enter__(self):
- return self
-
- def __exit__(self, type_, value, traceback):
- self.close()
- return False
-
- def __dealloc__(self):
- self.close()
-
- def __track_completion(self, completion_obj):
- if completion_obj.oncomplete:
- with self.lock:
- self.complete_completions.append(completion_obj)
- if completion_obj.onsafe:
- with self.lock:
- self.safe_completions.append(completion_obj)
-
- def __get_completion(self, oncomplete, onsafe):
- """
- Constructs a completion to use with asynchronous operations
-
- :param oncomplete: what to do when the write is safe and complete in memory
- on all replicas
- :type oncomplete: completion
- :param onsafe: what to do when the write is safe and complete on storage
- on all replicas
- :type onsafe: completion
-
- :raises: :class:`Error`
- :returns: completion object
- """
-
- completion_obj = Completion(self, oncomplete, onsafe)
-
- cdef:
- rados_callback_t complete_cb = NULL
- rados_callback_t safe_cb = NULL
- rados_completion_t completion
- PyObject* p_completion_obj= <PyObject*>completion_obj
-
- if oncomplete:
- complete_cb = <rados_callback_t>&__aio_complete_cb
- if onsafe:
- safe_cb = <rados_callback_t>&__aio_safe_cb
-
- with nogil:
- ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
- &completion)
- if ret < 0:
- raise make_ex(ret, "error getting a completion")
-
- completion_obj.rados_comp = completion
- return completion_obj
-
- @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
- def aio_stat(self, object_name, oncomplete):
- """
- Asynchronously get object stats (size/mtime)
-
- oncomplete will be called with the returned size and mtime
- as well as the completion:
-
- oncomplete(completion, size, mtime)
-
- :param object_name: the name of the object to get stats from
- :type object_name: str
- :param oncomplete: what to do when the stat is complete
- :type oncomplete: completion
-
- :raises: :class:`Error`
- :returns: completion object
- """
-
- object_name = cstr(object_name, 'object_name')
-
- cdef:
- Completion completion
- char *_object_name = object_name
- uint64_t psize
- time_t pmtime
-
- def oncomplete_(completion_v):
- cdef Completion _completion_v = completion_v
- return_value = _completion_v.get_return_value()
- if return_value >= 0:
- return oncomplete(_completion_v, psize, time.localtime(pmtime))
- else:
- return oncomplete(_completion_v, None, None)
-
- completion = self.__get_completion(oncomplete_, None)
- self.__track_completion(completion)
- with nogil:
- ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
- &psize, &pmtime)
-
- if ret < 0:
- completion._cleanup()
- raise make_ex(ret, "error stating %s" % object_name)
- return completion
-
- @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
- ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
- def aio_write(self, object_name, to_write, offset=0,
- oncomplete=None, onsafe=None):
- """
- Write data to an object asynchronously
-
- Queues the write and returns.
-
- :param object_name: name of the object
- :type object_name: str
- :param to_write: data to write
- :type to_write: bytes
- :param offset: byte offset in the object to begin writing at
- :type offset: int
- :param oncomplete: what to do when the write is safe and complete in memory
- on all replicas
- :type oncomplete: completion
- :param onsafe: what to do when the write is safe and complete on storage
- on all replicas
- :type onsafe: completion
-
- :raises: :class:`Error`
- :returns: completion object
- """
-
- object_name = cstr(object_name, 'object_name')
-
- cdef:
- Completion completion
- char* _object_name = object_name
- char* _to_write = to_write
- size_t size = len(to_write)
- uint64_t _offset = offset
-
- completion = self.__get_completion(oncomplete, onsafe)
- self.__track_completion(completion)
- with nogil:
- ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
- _to_write, size, _offset)
- if ret < 0:
- completion._cleanup()
- raise make_ex(ret, "error writing object %s" % object_name)
- return completion
-
- @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
- ('onsafe', opt(Callable)))
- def aio_write_full(self, object_name, to_write,
- oncomplete=None, onsafe=None):
- """
- Asychronously write an entire object
-
- The object is filled with the provided data. If the object exists,
- it is atomically truncated and then written.
- Queues the write and returns.
-
- :param object_name: name of the object
- :type object_name: str
- :param to_write: data to write
- :type to_write: str
- :param oncomplete: what to do when the write is safe and complete in memory
- on all replicas
- :type oncomplete: completion
- :param onsafe: what to do when the write is safe and complete on storage
- on all replicas
- :type onsafe: completion
-
- :raises: :class:`Error`
- :returns: completion object
- """
-
- object_name = cstr(object_name, 'object_name')
-
- cdef:
- Completion completion
- char* _object_name = object_name
- char* _to_write = to_write
- size_t size = len(to_write)
-
- completion = self.__get_completion(oncomplete, onsafe)
- self.__track_completion(completion)
- with nogil:
- ret = rados_aio_write_full(self.io, _object_name,
- completion.rados_comp,
- _to_write, size)
- if ret < 0:
- completion._cleanup()
- raise make_ex(ret, "error writing object %s" % object_name)
- return completion
-
- @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
- ('onsafe', opt(Callable)))
- def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
- """
- Asychronously append data to an object
-
- Queues the write and returns.
-
- :param object_name: name of the object
- :type object_name: str
- :param to_append: data to append
- :type to_append: str
- :param offset: byte offset in the object to begin writing at
- :type offset: int
- :param oncomplete: what to do when the write is safe and complete in memory
- on all replicas
- :type oncomplete: completion
- :param onsafe: what to do when the write is safe and complete on storage
- on all replicas
- :type onsafe: completion
-
- :raises: :class:`Error`
- :returns: completion object
- """
- object_name = cstr(object_name, 'object_name')
-
- cdef:
- Completion completion
- char* _object_name = object_name
- char* _to_append = to_append
- size_t size = len(to_append)
-
- completion = self.__get_completion(oncomplete, onsafe)
- self.__track_completion(completion)
- with nogil:
- ret = rados_aio_append(self.io, _object_name,
- completion.rados_comp,
- _to_append, size)
- if ret < 0:
- completion._cleanup()
- raise make_ex(ret, "error appending object %s" % object_name)
- return completion
-
- def aio_flush(self):
- """
- Block until all pending writes in an io context are safe
-
- :raises: :class:`Error`
- """
- with nogil:
- ret = rados_aio_flush(self.io)
- if ret < 0:
- raise make_ex(ret, "error flushing")
-
- @requires(('object_name', str_type), ('length', int), ('offset', int),
- ('oncomplete', opt(Callable)))
- def aio_read(self, object_name, length, offset, oncomplete):
- """
- Asychronously read data from an object
-
- oncomplete will be called with the returned read value as
- well as the completion:
-
- oncomplete(completion, data_read)
-
- :param object_name: name of the object to read from
- :type object_name: str
- :param length: the number of bytes to read
- :type length: int
- :param offset: byte offset in the object to begin reading from
- :type offset: int
- :param oncomplete: what to do when the read is complete
- :type oncomplete: completion
-
- :raises: :class:`Error`
- :returns: completion object
- """
-
- object_name = cstr(object_name, 'object_name')
-
- cdef:
- Completion completion
- char* _object_name = object_name
- uint64_t _offset = offset
-
- char *ref_buf
- size_t _length = length
-
- def oncomplete_(completion_v):
- cdef Completion _completion_v = completion_v
- return_value = _completion_v.get_return_value()
- if return_value > 0 and return_value != length:
- _PyBytes_Resize(&_completion_v.buf, return_value)
- return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
-
- completion = self.__get_completion(oncomplete_, None)
- completion.buf = PyBytes_FromStringAndSize(NULL, length)
- ret_buf = PyBytes_AsString(completion.buf)
- self.__track_completion(completion)
- with nogil:
- ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
- ret_buf, _length, _offset)
- if ret < 0:
- completion._cleanup()
- raise make_ex(ret, "error reading %s" % object_name)
- return completion
-
- @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
- ('data', bytes), ('length', int),
- ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
- def aio_execute(self, object_name, cls, method, data,
- length=8192, oncomplete=None, onsafe=None):
- """
- Asynchronously execute an OSD class method on an object.
-
- oncomplete and onsafe will be called with the data returned from
- the plugin as well as the completion:
-
- oncomplete(completion, data)
- onsafe(completion, data)
-
- :param object_name: name of the object
- :type object_name: str
- :param cls: name of the object class
- :type cls: str
- :param method: name of the method
- :type method: str
- :param data: input data
- :type data: bytes
- :param length: size of output buffer in bytes (default=8192)
- :type length: int
- :param oncomplete: what to do when the execution is complete
- :type oncomplete: completion
- :param onsafe: what to do when the execution is safe and complete
- :type onsafe: completion
-
- :raises: :class:`Error`
- :returns: completion object
- """
-
- object_name = cstr(object_name, 'object_name')
- cls = cstr(cls, 'cls')
- method = cstr(method, 'method')
- cdef:
- Completion completion
- char *_object_name = object_name
- char *_cls = cls
- char *_method = method
- char *_data = data
- size_t _data_len = len(data)
-
- char *ref_buf
- size_t _length = length
-
- def oncomplete_(completion_v):
- cdef Completion _completion_v = completion_v
- return_value = _completion_v.get_return_value()
- if return_value > 0 and return_value != length:
- _PyBytes_Resize(&_completion_v.buf, return_value)
- return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
-
- def onsafe_(completion_v):
- cdef Completion _completion_v = completion_v
- return_value = _completion_v.get_return_value()
- return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
-
- completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
- completion.buf = PyBytes_FromStringAndSize(NULL, length)
- ret_buf = PyBytes_AsString(completion.buf)
- self.__track_completion(completion)
- with nogil:
- ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
- _cls, _method, _data, _data_len, ret_buf, _length)
- if ret < 0:
- completion._cleanup()
- raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
- return completion
-
- @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
- def aio_remove(self, object_name, oncomplete=None, onsafe=None):
- """
- Asychronously remove an object
-
- :param object_name: name of the object to remove
- :type object_name: str
- :param oncomplete: what to do when the remove is safe and complete in memory
- on all replicas
- :type oncomplete: completion
- :param onsafe: what to do when the remove is safe and complete on storage
- on all replicas
- :type onsafe: completion
-
- :raises: :class:`Error`
- :returns: completion object
- """
- object_name = cstr(object_name, 'object_name')
-
- cdef:
- Completion completion
- char* _object_name = object_name
-
- completion = self.__get_completion(oncomplete, onsafe)
- self.__track_completion(completion)
- with nogil:
- ret = rados_aio_remove(self.io, _object_name,
- completion.rados_comp)
- if ret < 0:
- completion._cleanup()
- raise make_ex(ret, "error removing %s" % object_name)
- return completion
-
- def require_ioctx_open(self):
- """
- Checks if the rados.Ioctx object state is 'open'
-
- :raises: IoctxStateError
- """
- if self.state != "open":
- raise IoctxStateError("The pool is %s" % self.state)
-
- def change_auid(self, auid):
- """
- Attempt to change an io context's associated auid "owner."
-
- Requires that you have write permission on both the current and new
- auid.
-
- :raises: :class:`Error`
- """
- self.require_ioctx_open()
-
- cdef:
- uint64_t _auid = auid
-
- with nogil:
- ret = rados_ioctx_pool_set_auid(self.io, _auid)
- if ret < 0:
- raise make_ex(ret, "error changing auid of '%s' to %d"
- % (self.name, auid))
-
- @requires(('loc_key', str_type))
- def set_locator_key(self, loc_key):
- """
- Set the key for mapping objects to pgs within an io context.
-
- The key is used instead of the object name to determine which
- placement groups an object is put in. This affects all subsequent
- operations of the io context - until a different locator key is
- set, all objects in this io context will be placed in the same pg.
-
- :param loc_key: the key to use as the object locator, or NULL to discard
- any previously set key
- :type loc_key: str
-
- :raises: :class:`TypeError`
- """
- self.require_ioctx_open()
- cloc_key = cstr(loc_key, 'loc_key')
- cdef char *_loc_key = cloc_key
- with nogil:
- rados_ioctx_locator_set_key(self.io, _loc_key)
- self.locator_key = loc_key
-
- def get_locator_key(self):
- """
- Get the locator_key of context
-
- :returns: locator_key
- """
- return self.locator_key
-
- @requires(('snap_id', long))
- def set_read(self, snap_id):
- """
- Set the snapshot for reading objects.
-
- To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
-
- :param snap_id: the snapshot Id
- :type snap_id: int
-
- :raises: :class:`TypeError`
- """
- self.require_ioctx_open()
- cdef rados_snap_t _snap_id = snap_id
- with nogil:
- rados_ioctx_snap_set_read(self.io, _snap_id)
-
- @requires(('nspace', str_type))
- def set_namespace(self, nspace):
- """
- Set the namespace for objects within an io context.
-
- The namespace in addition to the object name fully identifies
- an object. This affects all subsequent operations of the io context
- - until a different namespace is set, all objects in this io context
- will be placed in the same namespace.
-
- :param nspace: the namespace to use, or None/"" for the default namespace
- :type nspace: str
-
- :raises: :class:`TypeError`
- """
- self.require_ioctx_open()
- if nspace is None:
- nspace = ""
- cnspace = cstr(nspace, 'nspace')
- cdef char *_nspace = cnspace
- with nogil:
- rados_ioctx_set_namespace(self.io, _nspace)
- self.nspace = nspace
-
- def get_namespace(self):
- """
- Get the namespace of context
-
- :returns: namespace
- """
- return self.nspace
-
- def close(self):
- """
- Close a rados.Ioctx object.
-
- This just tells librados that you no longer need to use the io context.
- It may not be freed immediately if there are pending asynchronous
- requests on it, but you should not use an io context again after
- calling this function on it.
- """
- if self.state == "open":
- self.require_ioctx_open()
- with nogil:
- rados_ioctx_destroy(self.io)
- self.state = "closed"
-
-
- @requires(('key', str_type), ('data', bytes))
- def write(self, key, data, offset=0):
- """
- Write data to an object synchronously
-
- :param key: name of the object
- :type key: str
- :param data: data to write
- :type data: bytes
- :param offset: byte offset in the object to begin writing at
- :type offset: int
-
- :raises: :class:`TypeError`
- :raises: :class:`LogicError`
- :returns: int - 0 on success
- """
- self.require_ioctx_open()
-
- key = cstr(key, 'key')
- cdef:
- char *_key = key
- char *_data = data
- size_t length = len(data)
- uint64_t _offset = offset
-
- with nogil:
- ret = rados_write(self.io, _key, _data, length, _offset)
- if ret == 0:
- return ret
- elif ret < 0:
- raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
- % (self.name, key))
- else:
- raise LogicError("Ioctx.write(%s): rados_write \
-returned %d, but should return zero on success." % (self.name, ret))
-
- @requires(('key', str_type), ('data', bytes))
- def write_full(self, key, data):
- """
- Write an entire object synchronously.
-
- The object is filled with the provided data. If the object exists,
- it is atomically truncated and then written.
-
- :param key: name of the object
- :type key: str
- :param data: data to write
- :type data: bytes
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- :returns: int - 0 on success
- """
- self.require_ioctx_open()
- key = cstr(key, 'key')
- cdef:
- char *_key = key
- char *_data = data
- size_t length = len(data)
-
- with nogil:
- ret = rados_write_full(self.io, _key, _data, length)
- if ret == 0:
- return ret
- elif ret < 0:
- raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
- % (self.name, key))
- else:
- raise LogicError("Ioctx.write_full(%s): rados_write_full \
-returned %d, but should return zero on success." % (self.name, ret))
-
- @requires(('key', str_type), ('data', bytes))
- def append(self, key, data):
- """
- Append data to an object synchronously
-
- :param key: name of the object
- :type key: str
- :param data: data to write
- :type data: bytes
-
- :raises: :class:`TypeError`
- :raises: :class:`LogicError`
- :returns: int - 0 on success
- """
- self.require_ioctx_open()
- key = cstr(key, 'key')
- cdef:
- char *_key = key
- char *_data = data
- size_t length = len(data)
-
- with nogil:
- ret = rados_append(self.io, _key, _data, length)
- if ret == 0:
- return ret
- elif ret < 0:
- raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
- % (self.name, key))
- else:
- raise LogicError("Ioctx.append(%s): rados_append \
-returned %d, but should return zero on success." % (self.name, ret))
-
- @requires(('key', str_type))
- def read(self, key, length=8192, offset=0):
- """
- Read data from an object synchronously
-
- :param key: name of the object
- :type key: str
- :param length: the number of bytes to read (default=8192)
- :type length: int
- :param offset: byte offset in the object to begin reading at
- :type offset: int
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- :returns: str - data read from object
- """
- self.require_ioctx_open()
- key = cstr(key, 'key')
- cdef:
- char *_key = key
- char *ret_buf
- uint64_t _offset = offset
- size_t _length = length
- PyObject* ret_s = NULL
-
- ret_s = PyBytes_FromStringAndSize(NULL, length)
- try:
- ret_buf = PyBytes_AsString(ret_s)
- with nogil:
- ret = rados_read(self.io, _key, ret_buf, _length, _offset)
- if ret < 0:
- raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
-
- if ret != length:
- _PyBytes_Resize(&ret_s, ret)
-
- return <object>ret_s
- finally:
- # We DECREF unconditionally: the cast to object above will have
- # INCREFed if necessary. This also takes care of exceptions,
- # including if _PyString_Resize fails (that will free the string
- # itself and set ret_s to NULL, hence XDECREF).
- ref.Py_XDECREF(ret_s)
-
- @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
- def execute(self, key, cls, method, data, length=8192):
- """
- Execute an OSD class method on an object.
-
- :param key: name of the object
- :type key: str
- :param cls: name of the object class
- :type cls: str
- :param method: name of the method
- :type method: str
- :param data: input data
- :type data: bytes
- :param length: size of output buffer in bytes (default=8192)
- :type length: int
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- :returns: (ret, method output)
- """
- self.require_ioctx_open()
-
- key = cstr(key, 'key')
- cls = cstr(cls, 'cls')
- method = cstr(method, 'method')
- cdef:
- char *_key = key
- char *_cls = cls
- char *_method = method
- char *_data = data
- size_t _data_len = len(data)
-
- char *ref_buf
- size_t _length = length
- PyObject* ret_s = NULL
-
- ret_s = PyBytes_FromStringAndSize(NULL, length)
- try:
- ret_buf = PyBytes_AsString(ret_s)
- with nogil:
- ret = rados_exec(self.io, _key, _cls, _method, _data,
- _data_len, ret_buf, _length)
- if ret < 0:
- raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
-
- if ret != length:
- _PyBytes_Resize(&ret_s, ret)
-
- return ret, <object>ret_s
- finally:
- # We DECREF unconditionally: the cast to object above will have
- # INCREFed if necessary. This also takes care of exceptions,
- # including if _PyString_Resize fails (that will free the string
- # itself and set ret_s to NULL, hence XDECREF).
- ref.Py_XDECREF(ret_s)
-
- def get_stats(self):
- """
- Get pool usage statistics
-
- :returns: dict - contains the following keys:
-
- - ``num_bytes`` (int) - size of pool in bytes
-
- - ``num_kb`` (int) - size of pool in kbytes
-
- - ``num_objects`` (int) - number of objects in the pool
-
- - ``num_object_clones`` (int) - number of object clones
-
- - ``num_object_copies`` (int) - number of object copies
-
- - ``num_objects_missing_on_primary`` (int) - number of objets
- missing on primary
-
- - ``num_objects_unfound`` (int) - number of unfound objects
-
- - ``num_objects_degraded`` (int) - number of degraded objects
-
- - ``num_rd`` (int) - bytes read
-
- - ``num_rd_kb`` (int) - kbytes read
-
- - ``num_wr`` (int) - bytes written
-
- - ``num_wr_kb`` (int) - kbytes written
- """
- self.require_ioctx_open()
- cdef rados_pool_stat_t stats
- with nogil:
- ret = rados_ioctx_pool_stat(self.io, &stats)
- if ret < 0:
- raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
- return {'num_bytes': stats.num_bytes,
- 'num_kb': stats.num_kb,
- 'num_objects': stats.num_objects,
- 'num_object_clones': stats.num_object_clones,
- 'num_object_copies': stats.num_object_copies,
- "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
- "num_objects_unfound": stats.num_objects_unfound,
- "num_objects_degraded": stats.num_objects_degraded,
- "num_rd": stats.num_rd,
- "num_rd_kb": stats.num_rd_kb,
- "num_wr": stats.num_wr,
- "num_wr_kb": stats.num_wr_kb}
-
- @requires(('key', str_type))
- def remove_object(self, key):
- """
- Delete an object
-
- This does not delete any snapshots of the object.
-
- :param key: the name of the object to delete
- :type key: str
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- :returns: bool - True on success
- """
- self.require_ioctx_open()
- key = cstr(key, 'key')
- cdef:
- char *_key = key
-
- with nogil:
- ret = rados_remove(self.io, _key)
- if ret < 0:
- raise make_ex(ret, "Failed to remove '%s'" % key)
- return True
-
- @requires(('key', str_type))
- def trunc(self, key, size):
- """
- Resize an object
-
- If this enlarges the object, the new area is logically filled with
- zeroes. If this shrinks the object, the excess data is removed.
-
- :param key: the name of the object to resize
- :type key: str
- :param size: the new size of the object in bytes
- :type size: int
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- :returns: int - 0 on success, otherwise raises error
- """
-
- self.require_ioctx_open()
- key = cstr(key, 'key')
- cdef:
- char *_key = key
- uint64_t _size = size
-
- with nogil:
- ret = rados_trunc(self.io, _key, _size)
- if ret < 0:
- raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
- return ret
-
- @requires(('key', str_type))
- def stat(self, key):
- """
- Get object stats (size/mtime)
-
- :param key: the name of the object to get stats from
- :type key: str
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- :returns: (size,timestamp)
- """
- self.require_ioctx_open()
-
- key = cstr(key, 'key')
- cdef:
- char *_key = key
- uint64_t psize
- time_t pmtime
-
- with nogil:
- ret = rados_stat(self.io, _key, &psize, &pmtime)
- if ret < 0:
- raise make_ex(ret, "Failed to stat %r" % key)
- return psize, time.localtime(pmtime)
-
- @requires(('key', str_type), ('xattr_name', str_type))
- def get_xattr(self, key, xattr_name):
- """
- Get the value of an extended attribute on an object.
-
- :param key: the name of the object to get xattr from
- :type key: str
- :param xattr_name: which extended attribute to read
- :type xattr_name: str
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- :returns: str - value of the xattr
- """
- self.require_ioctx_open()
-
- key = cstr(key, 'key')
- xattr_name = cstr(xattr_name, 'xattr_name')
- cdef:
- char *_key = key
- char *_xattr_name = xattr_name
- size_t ret_length = 4096
- char *ret_buf = NULL
-
- try:
- while ret_length < 4096 * 1024 * 1024:
- ret_buf = <char *>realloc_chk(ret_buf, ret_length)
- with nogil:
- ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
- if ret == -errno.ERANGE:
- ret_length *= 2
- elif ret < 0:
- raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
- else:
- break
- return ret_buf[:ret]
- finally:
- free(ret_buf)
-
- @requires(('oid', str_type))
- def get_xattrs(self, oid):
- """
- Start iterating over xattrs on an object.
-
- :param oid: the name of the object to get xattrs from
- :type oid: str
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- :returns: XattrIterator
- """
- self.require_ioctx_open()
- return XattrIterator(self, oid)
-
- @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
- def set_xattr(self, key, xattr_name, xattr_value):
- """
- Set an extended attribute on an object.
-
- :param key: the name of the object to set xattr to
- :type key: str
- :param xattr_name: which extended attribute to set
- :type xattr_name: str
- :param xattr_value: the value of the extended attribute
- :type xattr_value: bytes
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- :returns: bool - True on success, otherwise raise an error
- """
- self.require_ioctx_open()
-
- key = cstr(key, 'key')
- xattr_name = cstr(xattr_name, 'xattr_name')
- cdef:
- char *_key = key
- char *_xattr_name = xattr_name
- char *_xattr_value = xattr_value
- size_t _xattr_value_len = len(xattr_value)
-
- with nogil:
- ret = rados_setxattr(self.io, _key, _xattr_name,
- _xattr_value, _xattr_value_len)
- if ret < 0:
- raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
- return True
-
- @requires(('key', str_type), ('xattr_name', str_type))
- def rm_xattr(self, key, xattr_name):
- """
- Removes an extended attribute on from an object.
-
- :param key: the name of the object to remove xattr from
- :type key: str
- :param xattr_name: which extended attribute to remove
- :type xattr_name: str
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- :returns: bool - True on success, otherwise raise an error
- """
- self.require_ioctx_open()
-
- key = cstr(key, 'key')
- xattr_name = cstr(xattr_name, 'xattr_name')
- cdef:
- char *_key = key
- char *_xattr_name = xattr_name
-
- with nogil:
- ret = rados_rmxattr(self.io, _key, _xattr_name)
- if ret < 0:
- raise make_ex(ret, "Failed to delete key %r xattr %r" %
- (key, xattr_name))
- return True
-
- def list_objects(self):
- """
- Get ObjectIterator on rados.Ioctx object.
-
- :returns: ObjectIterator
- """
- self.require_ioctx_open()
- return ObjectIterator(self)
-
- def list_snaps(self):
- """
- Get SnapIterator on rados.Ioctx object.
-
- :returns: SnapIterator
- """
- self.require_ioctx_open()
- return SnapIterator(self)
-
- @requires(('snap_name', str_type))
- def create_snap(self, snap_name):
- """
- Create a pool-wide snapshot
-
- :param snap_name: the name of the snapshot
- :type snap_name: str
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- """
- self.require_ioctx_open()
- snap_name = cstr(snap_name, 'snap_name')
- cdef char *_snap_name = snap_name
-
- with nogil:
- ret = rados_ioctx_snap_create(self.io, _snap_name)
- if ret != 0:
- raise make_ex(ret, "Failed to create snap %s" % snap_name)
-
- @requires(('snap_name', str_type))
- def remove_snap(self, snap_name):
- """
- Removes a pool-wide snapshot
-
- :param snap_name: the name of the snapshot
- :type snap_name: str
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- """
- self.require_ioctx_open()
- snap_name = cstr(snap_name, 'snap_name')
- cdef char *_snap_name = snap_name
-
- with nogil:
- ret = rados_ioctx_snap_remove(self.io, _snap_name)
- if ret != 0:
- raise make_ex(ret, "Failed to remove snap %s" % snap_name)
-
- @requires(('snap_name', str_type))
- def lookup_snap(self, snap_name):
- """
- Get the id of a pool snapshot
-
- :param snap_name: the name of the snapshot to lookop
- :type snap_name: str
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- :returns: Snap - on success
- """
- self.require_ioctx_open()
- csnap_name = cstr(snap_name, 'snap_name')
- cdef:
- char *_snap_name = csnap_name
- rados_snap_t snap_id
-
- with nogil:
- ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
- if ret != 0:
- raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
- return Snap(self, snap_name, int(snap_id))
-
- @requires(('oid', str_type), ('snap_name', str_type))
- def snap_rollback(self, oid, snap_name):
- """
- Rollback an object to a snapshot
-
- :param oid: the name of the object
- :type oid: str
- :param snap_name: the name of the snapshot
- :type snap_name: str
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- """
- self.require_ioctx_open()
- oid = cstr(oid, 'oid')
- snap_name = cstr(snap_name, 'snap_name')
- cdef:
- char *_snap_name = snap_name
- char *_oid = oid
-
- with nogil:
- ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
- if ret != 0:
- raise make_ex(ret, "Failed to rollback %s" % oid)
-
- def get_last_version(self):
- """
- Return the version of the last object read or written to.
-
- This exposes the internal version number of the last object read or
- written via this io context
-
- :returns: version of the last object used
- """
- self.require_ioctx_open()
- with nogil:
- ret = rados_get_last_version(self.io)
- return int(ret)
-
- def create_write_op(self):
- """
- create write operation object.
- need call release_write_op after use
- """
- return WriteOp().create()
-
- def create_read_op(self):
- """
- create read operation object.
- need call release_read_op after use
- """
- return ReadOp().create()
-
- def release_write_op(self, write_op):
- """
- release memory alloc by create_write_op
- """
- write_op.release()
-
- def release_read_op(self, read_op):
- """
- release memory alloc by create_read_op
- :para read_op: read_op object
- :type: int
- """
- read_op.release()
-
- @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
- def set_omap(self, write_op, keys, values):
- """
- set keys values to write_op
- :para write_op: write_operation object
- :type write_op: WriteOp
- :para keys: a tuple of keys
- :type keys: tuple
- :para values: a tuple of values
- :type values: tuple
- """
-
- if len(keys) != len(values):
- raise Error("Rados(): keys and values must have the same number of items")
-
- keys = cstr_list(keys, 'keys')
- cdef:
- WriteOp _write_op = write_op
- size_t key_num = len(keys)
- char **_keys = to_bytes_array(keys)
- char **_values = to_bytes_array(values)
- size_t *_lens = to_csize_t_array([len(v) for v in values])
-
- try:
- with nogil:
- rados_write_op_omap_set(_write_op.write_op,
- <const char**>_keys,
- <const char**>_values,
- <const size_t*>_lens, key_num)
- finally:
- free(_keys)
- free(_values)
- free(_lens)
-
- @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
- def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
- """
- excute the real write operation
- :para write_op: write operation object
- :type write_op: WriteOp
- :para oid: object name
- :type oid: str
- :para mtime: the time to set the mtime to, 0 for the current time
- :type mtime: int
- :para flags: flags to apply to the entire operation
- :type flags: int
- """
-
- oid = cstr(oid, 'oid')
- cdef:
- WriteOp _write_op = write_op
- char *_oid = oid
- time_t _mtime = mtime
- int _flags = flags
-
- with nogil:
- ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
- if ret != 0:
- raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
-
- @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
- def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
- """
- excute the real write operation asynchronously
- :para write_op: write operation object
- :type write_op: WriteOp
- :para oid: object name
- :type oid: str
- :param oncomplete: what to do when the remove is safe and complete in memory
- on all replicas
- :type oncomplete: completion
- :param onsafe: what to do when the remove is safe and complete on storage
- on all replicas
- :type onsafe: completion
- :para mtime: the time to set the mtime to, 0 for the current time
- :type mtime: int
- :para flags: flags to apply to the entire operation
- :type flags: int
-
- :raises: :class:`Error`
- :returns: completion object
- """
-
- oid = cstr(oid, 'oid')
- cdef:
- WriteOp _write_op = write_op
- char *_oid = oid
- Completion completion
- time_t _mtime = mtime
- int _flags = flags
-
- completion = self.__get_completion(oncomplete, onsafe)
- self.__track_completion(completion)
-
- with nogil:
- ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
- &_mtime, _flags)
- if ret != 0:
- completion._cleanup()
- raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
- return completion
-
- @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
- def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
- """
- excute the real read operation
- :para read_op: read operation object
- :type read_op: ReadOp
- :para oid: object name
- :type oid: str
- :para flag: flags to apply to the entire operation
- :type flag: int
- """
- oid = cstr(oid, 'oid')
- cdef:
- ReadOp _read_op = read_op
- char *_oid = oid
- int _flag = flag
-
- with nogil:
- ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
- if ret != 0:
- raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
-
- @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
- def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
- """
- excute the real read operation
- :para read_op: read operation object
- :type read_op: ReadOp
- :para oid: object name
- :type oid: str
- :param oncomplete: what to do when the remove is safe and complete in memory
- on all replicas
- :type oncomplete: completion
- :param onsafe: what to do when the remove is safe and complete on storage
- on all replicas
- :type onsafe: completion
- :para flag: flags to apply to the entire operation
- :type flag: int
- """
- oid = cstr(oid, 'oid')
- cdef:
- ReadOp _read_op = read_op
- char *_oid = oid
- Completion completion
- int _flag = flag
-
- completion = self.__get_completion(oncomplete, onsafe)
- self.__track_completion(completion)
-
- with nogil:
- ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
- if ret != 0:
- completion._cleanup()
- raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
- return completion
-
- @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
- def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
- """
- get the omap values
- :para read_op: read operation object
- :type read_op: ReadOp
- :para start_after: list keys starting after start_after
- :type start_after: str
- :para filter_prefix: list only keys beginning with filter_prefix
- :type filter_prefix: str
- :para max_return: list no more than max_return key/value pairs
- :type max_return: int
- :returns: an iterator over the requested omap values, return value from this action
- """
-
- start_after = cstr(start_after, 'start_after') if start_after else None
- filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
- cdef:
- char *_start_after = opt_str(start_after)
- char *_filter_prefix = opt_str(filter_prefix)
- ReadOp _read_op = read_op
- rados_omap_iter_t iter_addr = NULL
- int _max_return = max_return
- int prval = 0
-
- with nogil:
- rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
- _max_return, &iter_addr, NULL, &prval)
- it = OmapIterator(self)
- it.ctx = iter_addr
- return it, int(prval)
-
- @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
- def get_omap_keys(self, read_op, start_after, max_return):
- """
- get the omap keys
- :para read_op: read operation object
- :type read_op: ReadOp
- :para start_after: list keys starting after start_after
- :type start_after: str
- :para max_return: list no more than max_return key/value pairs
- :type max_return: int
- :returns: an iterator over the requested omap values, return value from this action
- """
- start_after = cstr(start_after, 'start_after') if start_after else None
- cdef:
- char *_start_after = opt_str(start_after)
- ReadOp _read_op = read_op
- rados_omap_iter_t iter_addr = NULL
- int _max_return = max_return
- int prval = 0
-
- with nogil:
- rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
- _max_return, &iter_addr, NULL, &prval)
- it = OmapIterator(self)
- it.ctx = iter_addr
- return it, int(prval)
-
- @requires(('read_op', ReadOp), ('keys', tuple))
- def get_omap_vals_by_keys(self, read_op, keys):
- """
- get the omap values by keys
- :para read_op: read operation object
- :type read_op: ReadOp
- :para keys: input key tuple
- :type keys: tuple
- :returns: an iterator over the requested omap values, return value from this action
- """
- keys = cstr_list(keys, 'keys')
- cdef:
- ReadOp _read_op = read_op
- rados_omap_iter_t iter_addr
- char **_keys = to_bytes_array(keys)
- size_t key_num = len(keys)
- int prval = 0
-
- try:
- with nogil:
- rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
- <const char**>_keys,
- key_num, &iter_addr, &prval)
- it = OmapIterator(self)
- it.ctx = iter_addr
- return it, int(prval)
- finally:
- free(_keys)
-
- @requires(('write_op', WriteOp), ('keys', tuple))
- def remove_omap_keys(self, write_op, keys):
- """
- remove omap keys specifiled
- :para write_op: write operation object
- :type write_op: WriteOp
- :para keys: input key tuple
- :type keys: tuple
- """
-
- keys = cstr_list(keys, 'keys')
- cdef:
- WriteOp _write_op = write_op
- size_t key_num = len(keys)
- char **_keys = to_bytes_array(keys)
-
- try:
- with nogil:
- rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
- finally:
- free(_keys)
-
- @requires(('write_op', WriteOp))
- def clear_omap(self, write_op):
- """
- Remove all key/value pairs from an object
- :para write_op: write operation object
- :type write_op: WriteOp
- """
-
- cdef:
- WriteOp _write_op = write_op
-
- with nogil:
- rados_write_op_omap_clear(_write_op.write_op)
-
- @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
- ('duration', opt(int)), ('flags', int))
- def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
-
- """
- Take an exclusive lock on an object
-
- :param key: name of the object
- :type key: str
- :param name: name of the lock
- :type name: str
- :param cookie: cookie of the lock
- :type cookie: str
- :param desc: description of the lock
- :type desc: str
- :param duration: duration of the lock in seconds
- :type duration: int
- :param flags: flags
- :type flags: int
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- """
- self.require_ioctx_open()
-
- key = cstr(key, 'key')
- name = cstr(name, 'name')
- cookie = cstr(cookie, 'cookie')
- desc = cstr(desc, 'desc')
-
- cdef:
- char* _key = key
- char* _name = name
- char* _cookie = cookie
- char* _desc = desc
- uint8_t _flags = flags
- timeval _duration
-
- if duration is None:
- with nogil:
- ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
- NULL, _flags)
- else:
- _duration.tv_sec = duration
- with nogil:
- ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
- &_duration, _flags)
-
- if ret < 0:
- raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
-
- @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
- ('desc', str_type), ('duration', opt(int)), ('flags', int))
- def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
-
- """
- Take a shared lock on an object
-
- :param key: name of the object
- :type key: str
- :param name: name of the lock
- :type name: str
- :param cookie: cookie of the lock
- :type cookie: str
- :param tag: tag of the lock
- :type tag: str
- :param desc: description of the lock
- :type desc: str
- :param duration: duration of the lock in seconds
- :type duration: int
- :param flags: flags
- :type flags: int
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- """
- self.require_ioctx_open()
-
- key = cstr(key, 'key')
- tag = cstr(tag, 'tag')
- name = cstr(name, 'name')
- cookie = cstr(cookie, 'cookie')
- desc = cstr(desc, 'desc')
-
- cdef:
- char* _key = key
- char* _tag = tag
- char* _name = name
- char* _cookie = cookie
- char* _desc = desc
- uint8_t _flags = flags
- timeval _duration
-
- if duration is None:
- with nogil:
- ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
- NULL, _flags)
- else:
- _duration.tv_sec = duration
- with nogil:
- ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
- &_duration, _flags)
- if ret < 0:
- raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
-
- @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
- def unlock(self, key, name, cookie):
-
- """
- Release a shared or exclusive lock on an object
-
- :param key: name of the object
- :type key: str
- :param name: name of the lock
- :type name: str
- :param cookie: cookie of the lock
- :type cookie: str
-
- :raises: :class:`TypeError`
- :raises: :class:`Error`
- """
- self.require_ioctx_open()
-
- key = cstr(key, 'key')
- name = cstr(name, 'name')
- cookie = cstr(cookie, 'cookie')
-
- cdef:
- char* _key = key
- char* _name = name
- char* _cookie = cookie
-
- with nogil:
- ret = rados_unlock(self.io, _key, _name, _cookie)
- if ret < 0:
- raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
-
- def application_enable(self, app_name, force=False):
- """
- Enable an application on an OSD pool
-
- :param app_name: application name
- :type app_name: str
- :param force: False if only a single app should exist per pool
- :type expire_seconds: boool
-
- :raises: :class:`Error`
- """
- app_name = cstr(app_name, 'app_name')
- cdef:
- char *_app_name = app_name
- int _force = (1 if force else 0)
-
- with nogil:
- ret = rados_application_enable(self.io, _app_name, _force)
- if ret < 0:
- raise make_ex(ret, "error enabling application")
-
- def application_list(self):
- """
- Returns a list of enabled applications
-
- :returns: list of app name string
- """
- cdef:
- size_t length = 128
- char *apps = NULL
-
- try:
- while True:
- apps = <char *>realloc_chk(apps, length)
- with nogil:
- ret = rados_application_list(self.io, apps, &length)
- if ret == 0:
- return [decode_cstr(app) for app in
- apps[:length].split(b'\0') if app]
- elif ret == -errno.ENOENT:
- return None
- elif ret == -errno.ERANGE:
- pass
- else:
- raise make_ex(ret, "error listing applications")
- finally:
- free(apps)
-
- def application_metadata_set(self, app_name, key, value):
- """
- Sets application metadata on an OSD pool
-
- :param app_name: application name
- :type app_name: str
- :param key: metadata key
- :type key: str
- :param value: metadata value
- :type value: str
-
- :raises: :class:`Error`
- """
- app_name = cstr(app_name, 'app_name')
- key = cstr(key, 'key')
- value = cstr(value, 'value')
- cdef:
- char *_app_name = app_name
- char *_key = key
- char *_value = value
-
- with nogil:
- ret = rados_application_metadata_set(self.io, _app_name, _key,
- _value)
- if ret < 0:
- raise make_ex(ret, "error setting application metadata")
-
- def application_metadata_remove(self, app_name, key):
- """
- Remove application metadata from an OSD pool
-
- :param app_name: application name
- :type app_name: str
- :param key: metadata key
- :type key: str
-
- :raises: :class:`Error`
- """
- app_name = cstr(app_name, 'app_name')
- key = cstr(key, 'key')
- cdef:
- char *_app_name = app_name
- char *_key = key
-
- with nogil:
- ret = rados_application_metadata_remove(self.io, _app_name, _key)
- if ret < 0:
- raise make_ex(ret, "error removing application metadata")
-
- def application_metadata_list(self, app_name):
- """
- Returns a list of enabled applications
-
- :param app_name: application name
- :type app_name: str
- :returns: list of key/value tuples
- """
- app_name = cstr(app_name, 'app_name')
- cdef:
- char *_app_name = app_name
- size_t key_length = 128
- size_t val_length = 128
- char *c_keys = NULL
- char *c_vals = NULL
-
- try:
- while True:
- c_keys = <char *>realloc_chk(c_keys, key_length)
- c_vals = <char *>realloc_chk(c_vals, val_length)
- with nogil:
- ret = rados_application_metadata_list(self.io, _app_name,
- c_keys, &key_length,
- c_vals, &val_length)
- if ret == 0:
- keys = [decode_cstr(key) for key in
- c_keys[:key_length].split(b'\0') if key]
- vals = [decode_cstr(val) for val in
- c_vals[:val_length].split(b'\0') if val]
- return zip(keys, vals)
- elif ret == -errno.ERANGE:
- pass
- else:
- raise make_ex(ret, "error listing application metadata")
- finally:
- free(c_keys)
- free(c_vals)
-
-
-def set_object_locator(func):
- def retfunc(self, *args, **kwargs):
- if self.locator_key is not None:
- old_locator = self.ioctx.get_locator_key()
- self.ioctx.set_locator_key(self.locator_key)
- retval = func(self, *args, **kwargs)
- self.ioctx.set_locator_key(old_locator)
- return retval
- else:
- return func(self, *args, **kwargs)
- return retfunc
-
-
-def set_object_namespace(func):
- def retfunc(self, *args, **kwargs):
- if self.nspace is None:
- raise LogicError("Namespace not set properly in context")
- old_nspace = self.ioctx.get_namespace()
- self.ioctx.set_namespace(self.nspace)
- retval = func(self, *args, **kwargs)
- self.ioctx.set_namespace(old_nspace)
- return retval
- return retfunc
-
-
-class Object(object):
- """Rados object wrapper, makes the object look like a file"""
- def __init__(self, ioctx, key, locator_key=None, nspace=None):
- self.key = key
- self.ioctx = ioctx
- self.offset = 0
- self.state = "exists"
- self.locator_key = locator_key
- self.nspace = "" if nspace is None else nspace
-
- def __str__(self):
- return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
- (str(self.ioctx), self.key, "--default--"
- if self.nspace is "" else self.nspace, self.locator_key)
-
- def require_object_exists(self):
- if self.state != "exists":
- raise ObjectStateError("The object is %s" % self.state)
-
- @set_object_locator
- @set_object_namespace
- def read(self, length=1024 * 1024):
- self.require_object_exists()
- ret = self.ioctx.read(self.key, length, self.offset)
- self.offset += len(ret)
- return ret
-
- @set_object_locator
- @set_object_namespace
- def write(self, string_to_write):
- self.require_object_exists()
- ret = self.ioctx.write(self.key, string_to_write, self.offset)
- if ret == 0:
- self.offset += len(string_to_write)
- return ret
-
- @set_object_locator
- @set_object_namespace
- def remove(self):
- self.require_object_exists()
- self.ioctx.remove_object(self.key)
- self.state = "removed"
-
- @set_object_locator
- @set_object_namespace
- def stat(self):
- self.require_object_exists()
- return self.ioctx.stat(self.key)
-
- def seek(self, position):
- self.require_object_exists()
- self.offset = position
-
- @set_object_locator
- @set_object_namespace
- def get_xattr(self, xattr_name):
- self.require_object_exists()
- return self.ioctx.get_xattr(self.key, xattr_name)
-
- @set_object_locator
- @set_object_namespace
- def get_xattrs(self):
- self.require_object_exists()
- return self.ioctx.get_xattrs(self.key)
-
- @set_object_locator
- @set_object_namespace
- def set_xattr(self, xattr_name, xattr_value):
- self.require_object_exists()
- return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
-
- @set_object_locator
- @set_object_namespace
- def rm_xattr(self, xattr_name):
- self.require_object_exists()
- return self.ioctx.rm_xattr(self.key, xattr_name)
-
-MONITOR_LEVELS = [
- "debug",
- "info",
- "warn", "warning",
- "err", "error",
- "sec",
- ]
-
-
-class MonitorLog(object):
- # NOTE(sileht): Keep this class for backward compat
- # method moved to Rados.monitor_log()
- """
- For watching cluster log messages. Instantiate an object and keep
- it around while callback is periodically called. Construct with
- 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
- arg will be passed to the callback.
-
- callback will be called with:
- arg (given to __init__)
- line (the full line, including timestamp, who, level, msg)
- who (which entity issued the log message)
- timestamp_sec (sec of a struct timespec)
- timestamp_nsec (sec of a struct timespec)
- seq (sequence number)
- level (string representing the level of the log message)
- msg (the message itself)
- callback's return value is ignored
- """
- def __init__(self, cluster, level, callback, arg):
- self.level = level
- self.callback = callback
- self.arg = arg
- self.cluster = cluster
- self.cluster.monitor_log(level, callback, arg)
-