X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fpybind%2Frados%2Frados.pyx;fp=src%2Fceph%2Fsrc%2Fpybind%2Frados%2Frados.pyx;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=113180191dc9b4a02cbd99cd396eb21ef2d9f6cc;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/pybind/rados/rados.pyx b/src/ceph/src/pybind/rados/rados.pyx deleted file mode 100644 index 1131801..0000000 --- a/src/ceph/src/pybind/rados/rados.pyx +++ /dev/null @@ -1,3852 +0,0 @@ -# cython: embedsignature=True -""" -This module is a thin wrapper around librados. - -Error codes from librados are turned into exceptions that subclass -:class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError` -(the base class of all rados exceptions), :class:`PermissionError` -and :class:`IOError`, in addition to those documented for the -method. -""" -# Copyright 2011 Josh Durgin -# Copyright 2011, Hannu Valtonen -# Copyright 2015 Hector Martin -# Copyright 2016 Mehdi Abaakouk - -from cpython cimport PyObject, ref -from cpython.pycapsule cimport * -from libc cimport errno -from libc.stdint cimport * -from libc.stdlib cimport malloc, realloc, free - -import sys -import threading -import time - -from collections import Callable -from datetime import datetime -from functools import partial, wraps -from itertools import chain - -# Are we running Python 2.x -if sys.version_info[0] < 3: - str_type = basestring -else: - str_type = str - - -cdef extern from "Python.h": - # These are in cpython/string.pxd, but use "object" types instead of - # PyObject*, which invokes assumptions in cpython that we need to - # legitimately break to implement zero-copy string buffers in Ioctx.read(). - # This is valid use of the Python API and documented as a special case. - PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL - char* PyBytes_AsString(PyObject *string) except NULL - int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1 - void PyEval_InitThreads() - - -cdef extern from "time.h": - ctypedef long int time_t - ctypedef long int suseconds_t - - -cdef extern from "sys/time.h": - cdef struct timeval: - time_t tv_sec - suseconds_t tv_usec - - -cdef extern from "rados/rados_types.h" nogil: - cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES" - - -cdef extern from "rados/librados.h" nogil: - enum: - _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL" - _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK" - _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM" - _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL" - _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED" - _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED" - _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE" - - - enum: - _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG" - _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS" - _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS" - _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES" - _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE" - _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS" - _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY" - _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE" - _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT" - - cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD" - - ctypedef void* rados_t - ctypedef void* rados_config_t - ctypedef void* rados_ioctx_t - ctypedef void* rados_xattrs_iter_t - ctypedef void* rados_omap_iter_t - ctypedef void* rados_list_ctx_t - ctypedef uint64_t rados_snap_t - ctypedef void *rados_write_op_t - ctypedef void *rados_read_op_t - ctypedef void *rados_completion_t - ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg) - ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who, - uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg) - ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name, - uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg) - - - cdef struct rados_cluster_stat_t: - uint64_t kb - uint64_t kb_used - uint64_t kb_avail - uint64_t num_objects - - cdef struct rados_pool_stat_t: - uint64_t num_bytes - uint64_t num_kb - uint64_t num_objects - uint64_t num_object_clones - uint64_t num_object_copies - uint64_t num_objects_missing_on_primary - uint64_t num_objects_unfound - uint64_t num_objects_degraded - uint64_t num_rd - uint64_t num_rd_kb - uint64_t num_wr - uint64_t num_wr_kb - - void rados_buffer_free(char *buf) - - void rados_version(int *major, int *minor, int *extra) - int rados_create2(rados_t *pcluster, const char *const clustername, - const char * const name, uint64_t flags) - int rados_create_with_context(rados_t *cluster, rados_config_t cct) - int rados_connect(rados_t cluster) - void rados_shutdown(rados_t cluster) - int rados_conf_read_file(rados_t cluster, const char *path) - int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv) - int rados_conf_parse_env(rados_t cluster, const char *var) - int rados_conf_set(rados_t cluster, char *option, const char *value) - int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len) - - int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats) - int64_t rados_pool_lookup(rados_t cluster, const char *pool_name) - int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen) - int rados_pool_create(rados_t cluster, const char *pool_name) - int rados_pool_create_with_auid(rados_t cluster, const char *pool_name, uint64_t auid) - int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num) - int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t auid, uint8_t crush_rule_num) - int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier) - int rados_pool_list(rados_t cluster, char *buf, size_t len) - int rados_pool_delete(rados_t cluster, const char *pool_name) - int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len) - - int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result) - int rados_cluster_fsid(rados_t cluster, char *buf, size_t len) - int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds) - int rados_application_enable(rados_ioctx_t io, const char *app_name, - int force) - int rados_application_list(rados_ioctx_t io, char *values, - size_t *values_len) - int rados_application_metadata_get(rados_ioctx_t io, const char *app_name, - const char *key, char *value, - size_t *value_len) - int rados_application_metadata_set(rados_ioctx_t io, const char *app_name, - const char *key, const char *value) - int rados_application_metadata_remove(rados_ioctx_t io, - const char *app_name, const char *key) - int rados_application_metadata_list(rados_ioctx_t io, - const char *app_name, char *keys, - size_t *key_len, char *values, - size_t *value_len) - int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen) - int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen, - const char *inbuf, size_t inbuflen, - char **outbuf, size_t *outbuflen, - char **outs, size_t *outslen) - int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen, - const char *inbuf, size_t inbuflen, - char **outbuf, size_t *outbuflen, - char **outs, size_t *outslen) - int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen, - const char *inbuf, size_t inbuflen, - char **outbuf, size_t *outbuflen, - char **outs, size_t *outslen) - int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen, - const char *inbuf, size_t inbuflen, - char **outbuf, size_t *outbuflen, - char **outs, size_t *outslen) - int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen, - const char *inbuf, size_t inbuflen, - char **outbuf, size_t *outbuflen, - char **outs, size_t *outslen) - int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg) - int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg) - - int rados_wait_for_latest_osdmap(rados_t cluster) - - int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx) - void rados_ioctx_destroy(rados_ioctx_t io) - int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid) - void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key) - void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace) - - uint64_t rados_get_last_version(rados_ioctx_t io) - int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime) - int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off) - int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len) - int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len) - int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off) - int rados_remove(rados_ioctx_t io, const char *oid) - int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size) - int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len) - int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len) - int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name) - int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter) - int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len) - void rados_getxattrs_end(rados_xattrs_iter_t iter) - - int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx) - int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace) - void rados_nobjects_list_close(rados_list_ctx_t ctx) - - int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname) - int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname) - int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname) - int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id) - int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen) - void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap) - int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen) - int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t) - - int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name, - const char * cookie, const char * desc, - timeval * duration, uint8_t flags) - int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name, - const char * cookie, const char * tag, const char * desc, - timeval * duration, uint8_t flags) - int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie) - - rados_write_op_t rados_create_write_op() - void rados_release_write_op(rados_write_op_t write_op) - - rados_read_op_t rados_create_read_op() - void rados_release_read_op(rados_read_op_t read_op) - - int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc) - void rados_aio_release(rados_completion_t c) - int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime) - int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off) - int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len) - int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len) - int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion) - int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off) - int rados_aio_flush(rados_ioctx_t io) - - int rados_aio_get_return_value(rados_completion_t c) - int rados_aio_wait_for_complete_and_cb(rados_completion_t c) - int rados_aio_wait_for_safe_and_cb(rados_completion_t c) - int rados_aio_wait_for_complete(rados_completion_t c) - int rados_aio_wait_for_safe(rados_completion_t c) - int rados_aio_is_complete(rados_completion_t c) - int rados_aio_is_safe(rados_completion_t c) - - int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method, - const char * in_buf, size_t in_len, char * buf, size_t out_len) - int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method, - const char * in_buf, size_t in_len, char * buf, size_t out_len) - - int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags) - int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags) - void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num) - void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len) - void rados_write_op_omap_clear(rados_write_op_t write_op) - void rados_write_op_set_flags(rados_write_op_t write_op, int flags) - - void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category) - void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len) - void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len) - void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset) - void rados_write_op_remove(rados_write_op_t write_op) - void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset) - void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len) - - void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval) - void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval) - void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval) - int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags) - int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags) - void rados_read_op_set_flags(rados_read_op_t read_op, int flags) - int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len) - void rados_omap_get_end(rados_omap_iter_t iter) - - -LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL -LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK -LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM -LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL -LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED -LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED -LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE - -LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD - -LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG -LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS -LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS -LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES -LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE -LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS -LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY - -LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8') - -LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE -LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT - -ANONYMOUS_AUID = 0xffffffffffffffff -ADMIN_AUID = 0 - - -class Error(Exception): - """ `Error` class, derived from `Exception` """ - pass - - -class InvalidArgumentError(Error): - pass - - -class OSError(Error): - """ `OSError` class, derived from `Error` """ - def __init__(self, message, errno=None): - super(OSError, self).__init__(message) - self.errno = errno - - def __str__(self): - msg = super(OSError, self).__str__() - if self.errno is None: - return msg - return '[errno {0}] {1}'.format(self.errno, msg) - - def __reduce__(self): - return (self.__class__, (self.message, self.errno)) - -class InterruptedOrTimeoutError(OSError): - """ `InterruptedOrTimeoutError` class, derived from `OSError` """ - pass - - -class PermissionError(OSError): - """ `PermissionError` class, derived from `OSError` """ - pass - - -class PermissionDeniedError(OSError): - """ deal with EACCES related. """ - pass - - -class ObjectNotFound(OSError): - """ `ObjectNotFound` class, derived from `OSError` """ - pass - - -class NoData(OSError): - """ `NoData` class, derived from `OSError` """ - pass - - -class ObjectExists(OSError): - """ `ObjectExists` class, derived from `OSError` """ - pass - - -class ObjectBusy(OSError): - """ `ObjectBusy` class, derived from `IOError` """ - pass - - -class IOError(OSError): - """ `ObjectBusy` class, derived from `OSError` """ - pass - - -class NoSpace(OSError): - """ `NoSpace` class, derived from `OSError` """ - pass - - -class RadosStateError(Error): - """ `RadosStateError` class, derived from `Error` """ - pass - - -class IoctxStateError(Error): - """ `IoctxStateError` class, derived from `Error` """ - pass - - -class ObjectStateError(Error): - """ `ObjectStateError` class, derived from `Error` """ - pass - - -class LogicError(Error): - """ `` class, derived from `Error` """ - pass - - -class TimedOut(OSError): - """ `TimedOut` class, derived from `OSError` """ - pass - - -IF UNAME_SYSNAME == "FreeBSD": - cdef errno_to_exception = { - errno.EPERM : PermissionError, - errno.ENOENT : ObjectNotFound, - errno.EIO : IOError, - errno.ENOSPC : NoSpace, - errno.EEXIST : ObjectExists, - errno.EBUSY : ObjectBusy, - errno.ENOATTR : NoData, - errno.EINTR : InterruptedOrTimeoutError, - errno.ETIMEDOUT : TimedOut, - errno.EACCES : PermissionDeniedError, - errno.EINVAL : InvalidArgumentError, - } -ELSE: - cdef errno_to_exception = { - errno.EPERM : PermissionError, - errno.ENOENT : ObjectNotFound, - errno.EIO : IOError, - errno.ENOSPC : NoSpace, - errno.EEXIST : ObjectExists, - errno.EBUSY : ObjectBusy, - errno.ENODATA : NoData, - errno.EINTR : InterruptedOrTimeoutError, - errno.ETIMEDOUT : TimedOut, - errno.EACCES : PermissionDeniedError, - errno.EINVAL : InvalidArgumentError, - } - - -cdef make_ex(ret, msg): - """ - Translate a librados return code into an exception. - - :param ret: the return code - :type ret: int - :param msg: the error message to use - :type msg: str - :returns: a subclass of :class:`Error` - """ - ret = abs(ret) - if ret in errno_to_exception: - return errno_to_exception[ret](msg, errno=ret) - else: - return OSError(msg, errno=ret) - - -# helper to specify an optional argument, where in addition to `cls`, `None` -# is also acceptable -def opt(cls): - return (cls, None) - - -# validate argument types of an instance method -# kwargs is an un-ordered dict, so use args instead -def requires(*types): - def is_type_of(v, t): - if t is None: - return v is None - else: - return isinstance(v, t) - - def check_type(val, arg_name, arg_type): - if isinstance(arg_type, tuple): - if any(is_type_of(val, t) for t in arg_type): - return - type_names = ' or '.join('None' if t is None else t.__name__ - for t in arg_type) - raise TypeError('%s must be %s' % (arg_name, type_names)) - else: - if is_type_of(val, arg_type): - return - assert(arg_type is not None) - raise TypeError('%s must be %s' % (arg_name, arg_type.__name__)) - - def wrapper(f): - # FIXME(sileht): this stop with - # AttributeError: 'method_descriptor' object has no attribute '__module__' - # @wraps(f) - def validate_func(*args, **kwargs): - # ignore the `self` arg - pos_args = zip(args[1:], types) - named_args = ((kwargs[name], (name, spec)) for name, spec in types - if name in kwargs) - for arg_val, (arg_name, arg_type) in chain(pos_args, named_args): - check_type(arg_val, arg_name, arg_type) - return f(*args, **kwargs) - return validate_func - return wrapper - - -def cstr(val, name, encoding="utf-8", opt=False): - """ - Create a byte string from a Python string - - :param basestring val: Python string - :param str name: Name of the string parameter, for exceptions - :param str encoding: Encoding to use - :param bool opt: If True, None is allowed - :rtype: bytes - :raises: :class:`InvalidArgument` - """ - if opt and val is None: - return None - if isinstance(val, bytes): - return val - elif isinstance(val, unicode): - return val.encode(encoding) - else: - raise TypeError('%s must be a string' % name) - - -def cstr_list(list_str, name, encoding="utf-8"): - return [cstr(s, name) for s in list_str] - - -def decode_cstr(val, encoding="utf-8"): - """ - Decode a byte string into a Python string. - - :param bytes val: byte string - :rtype: unicode or None - """ - if val is None: - return None - - return val.decode(encoding) - - -cdef char* opt_str(s) except? NULL: - if s is None: - return NULL - return s - - -cdef void* realloc_chk(void* ptr, size_t size) except NULL: - cdef void *ret = realloc(ptr, size) - if ret == NULL: - raise MemoryError("realloc failed") - return ret - - -cdef size_t * to_csize_t_array(list_int): - cdef size_t *ret = malloc(len(list_int) * sizeof(size_t)) - if ret == NULL: - raise MemoryError("malloc failed") - for i in xrange(len(list_int)): - ret[i] = list_int[i] - return ret - - -cdef char ** to_bytes_array(list_bytes): - cdef char **ret = malloc(len(list_bytes) * sizeof(char *)) - if ret == NULL: - raise MemoryError("malloc failed") - for i in xrange(len(list_bytes)): - ret[i] = list_bytes[i] - return ret - - - -cdef int __monitor_callback(void *arg, const char *line, const char *who, - uint64_t sec, uint64_t nsec, uint64_t seq, - const char *level, const char *msg) with gil: - cdef object cb_info = arg - cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg) - return 0 - -cdef int __monitor_callback2(void *arg, const char *line, const char *channel, - const char *who, - const char *name, - uint64_t sec, uint64_t nsec, uint64_t seq, - const char *level, const char *msg) with gil: - cdef object cb_info = arg - cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg) - return 0 - - -class Version(object): - """ Version information """ - def __init__(self, major, minor, extra): - self.major = major - self.minor = minor - self.extra = extra - - def __str__(self): - return "%d.%d.%d" % (self.major, self.minor, self.extra) - - -cdef class Rados(object): - """This class wraps librados functions""" - # NOTE(sileht): attributes declared in .pyd - - def __init__(self, *args, **kwargs): - PyEval_InitThreads() - self.__setup(*args, **kwargs) - - @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)), - ('conffile', opt(str_type))) - def __setup(self, rados_id=None, name=None, clustername=None, - conf_defaults=None, conffile=None, conf=None, flags=0, - context=None): - self.monitor_callback = None - self.monitor_callback2 = None - self.parsed_args = [] - self.conf_defaults = conf_defaults - self.conffile = conffile - self.rados_id = rados_id - - if rados_id and name: - raise Error("Rados(): can't supply both rados_id and name") - elif rados_id: - name = 'client.' + rados_id - elif name is None: - name = 'client.admin' - if clustername is None: - clustername = '' - - name = cstr(name, 'name') - clustername = cstr(clustername, 'clustername') - cdef: - char *_name = name - char *_clustername = clustername - int _flags = flags - int ret - - if context: - # Unpack void* (aka rados_config_t) from capsule - rados_config = PyCapsule_GetPointer(context, NULL) - with nogil: - ret = rados_create_with_context(&self.cluster, rados_config) - else: - with nogil: - ret = rados_create2(&self.cluster, _clustername, _name, _flags) - if ret != 0: - raise Error("rados_initialize failed with error code: %d" % ret) - - self.state = "configuring" - # order is important: conf_defaults, then conffile, then conf - if conf_defaults: - for key, value in conf_defaults.items(): - self.conf_set(key, value) - if conffile is not None: - # read the default conf file when '' is given - if conffile == '': - conffile = None - self.conf_read_file(conffile) - if conf: - for key, value in conf.items(): - self.conf_set(key, value) - - def require_state(self, *args): - """ - Checks if the Rados object is in a special state - - :raises: RadosStateError - """ - if self.state in args: - return - raise RadosStateError("You cannot perform that operation on a \ -Rados object in state %s." % self.state) - - def shutdown(self): - """ - Disconnects from the cluster. Call this explicitly when a - Rados.connect()ed object is no longer used. - """ - if self.state != "shutdown": - with nogil: - rados_shutdown(self.cluster) - self.state = "shutdown" - - def __enter__(self): - self.connect() - return self - - def __exit__(self, type_, value, traceback): - self.shutdown() - return False - - def version(self): - """ - Get the version number of the ``librados`` C library. - - :returns: a tuple of ``(major, minor, extra)`` components of the - librados version - """ - cdef int major = 0 - cdef int minor = 0 - cdef int extra = 0 - with nogil: - rados_version(&major, &minor, &extra) - return Version(major, minor, extra) - - @requires(('path', opt(str_type))) - def conf_read_file(self, path=None): - """ - Configure the cluster handle using a Ceph config file. - - :param path: path to the config file - :type path: str - """ - self.require_state("configuring", "connected") - path = cstr(path, 'path', opt=True) - cdef: - char *_path = opt_str(path) - with nogil: - ret = rados_conf_read_file(self.cluster, _path) - if ret != 0: - raise make_ex(ret, "error calling conf_read_file") - - def conf_parse_argv(self, args): - """ - Parse known arguments from args, and remove; returned - args contain only those unknown to ceph - """ - self.require_state("configuring", "connected") - if not args: - return - - cargs = cstr_list(args, 'args') - cdef: - int _argc = len(args) - char **_argv = to_bytes_array(cargs) - char **_remargv = NULL - - try: - _remargv = malloc(_argc * sizeof(char *)) - with nogil: - ret = rados_conf_parse_argv_remainder(self.cluster, _argc, - _argv, - _remargv) - if ret: - raise make_ex(ret, "error calling conf_parse_argv_remainder") - - # _remargv was allocated with fixed argc; collapse return - # list to eliminate any missing args - retargs = [decode_cstr(a) for a in _remargv[:_argc] - if a != NULL] - self.parsed_args = args - return retargs - finally: - free(_argv) - free(_remargv) - - def conf_parse_env(self, var='CEPH_ARGS'): - """ - Parse known arguments from an environment variable, normally - CEPH_ARGS. - """ - self.require_state("configuring", "connected") - if not var: - return - - var = cstr(var, 'var') - cdef: - char *_var = var - with nogil: - ret = rados_conf_parse_env(self.cluster, _var) - if ret != 0: - raise make_ex(ret, "error calling conf_parse_env") - - @requires(('option', str_type)) - def conf_get(self, option): - """ - Get the value of a configuration option - - :param option: which option to read - :type option: str - - :returns: str - value of the option or None - :raises: :class:`TypeError` - """ - self.require_state("configuring", "connected") - option = cstr(option, 'option') - cdef: - char *_option = option - size_t length = 20 - char *ret_buf = NULL - - try: - while True: - ret_buf = realloc_chk(ret_buf, length) - with nogil: - ret = rados_conf_get(self.cluster, _option, ret_buf, length) - if ret == 0: - return decode_cstr(ret_buf) - elif ret == -errno.ENAMETOOLONG: - length = length * 2 - elif ret == -errno.ENOENT: - return None - else: - raise make_ex(ret, "error calling conf_get") - finally: - free(ret_buf) - - @requires(('option', str_type), ('val', str_type)) - def conf_set(self, option, val): - """ - Set the value of a configuration option - - :param option: which option to set - :type option: str - :param option: value of the option - :type option: str - - :raises: :class:`TypeError`, :class:`ObjectNotFound` - """ - self.require_state("configuring", "connected") - option = cstr(option, 'option') - val = cstr(val, 'val') - cdef: - char *_option = option - char *_val = val - - with nogil: - ret = rados_conf_set(self.cluster, _option, _val) - if ret != 0: - raise make_ex(ret, "error calling conf_set") - - def ping_monitor(self, mon_id): - """ - Ping a monitor to assess liveness - - May be used as a simply way to assess liveness, or to obtain - information about the monitor in a simple way even in the - absence of quorum. - - :param mon_id: the ID portion of the monitor's name (i.e., mon.) - :type mon_id: str - :returns: the string reply from the monitor - """ - - self.require_state("configuring", "connected") - - mon_id = cstr(mon_id, 'mon_id') - cdef: - char *_mon_id = mon_id - size_t outstrlen = 0 - char *outstr - - with nogil: - ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen) - - if ret != 0: - raise make_ex(ret, "error calling ping_monitor") - - if outstrlen: - my_outstr = outstr[:outstrlen] - rados_buffer_free(outstr) - return decode_cstr(my_outstr) - - def connect(self, timeout=0): - """ - Connect to the cluster. Use shutdown() to release resources. - """ - self.require_state("configuring") - # NOTE(sileht): timeout was supported by old python API, - # but this is not something available in C API, so ignore - # for now and remove it later - with nogil: - ret = rados_connect(self.cluster) - if ret != 0: - raise make_ex(ret, "error connecting to the cluster") - self.state = "connected" - - def get_cluster_stats(self): - """ - Read usage info about the cluster - - This tells you total space, space used, space available, and number - of objects. These are not updated immediately when data is written, - they are eventually consistent. - - :returns: dict - contains the following keys: - - - ``kb`` (int) - total space - - - ``kb_used`` (int) - space used - - - ``kb_avail`` (int) - free space available - - - ``num_objects`` (int) - number of objects - - """ - cdef: - rados_cluster_stat_t stats - - with nogil: - ret = rados_cluster_stat(self.cluster, &stats) - - if ret < 0: - raise make_ex( - ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id) - return {'kb': stats.kb, - 'kb_used': stats.kb_used, - 'kb_avail': stats.kb_avail, - 'num_objects': stats.num_objects} - - @requires(('pool_name', str_type)) - def pool_exists(self, pool_name): - """ - Checks if a given pool exists. - - :param pool_name: name of the pool to check - :type pool_name: str - - :raises: :class:`TypeError`, :class:`Error` - :returns: bool - whether the pool exists, false otherwise. - """ - self.require_state("connected") - - pool_name = cstr(pool_name, 'pool_name') - cdef: - char *_pool_name = pool_name - - with nogil: - ret = rados_pool_lookup(self.cluster, _pool_name) - if ret >= 0: - return True - elif ret == -errno.ENOENT: - return False - else: - raise make_ex(ret, "error looking up pool '%s'" % pool_name) - - @requires(('pool_name', str_type)) - def pool_lookup(self, pool_name): - """ - Returns a pool's ID based on its name. - - :param pool_name: name of the pool to look up - :type pool_name: str - - :raises: :class:`TypeError`, :class:`Error` - :returns: int - pool ID, or None if it doesn't exist - """ - self.require_state("connected") - pool_name = cstr(pool_name, 'pool_name') - cdef: - char *_pool_name = pool_name - - with nogil: - ret = rados_pool_lookup(self.cluster, _pool_name) - if ret >= 0: - return int(ret) - elif ret == -errno.ENOENT: - return None - else: - raise make_ex(ret, "error looking up pool '%s'" % pool_name) - - @requires(('pool_id', int)) - def pool_reverse_lookup(self, pool_id): - """ - Returns a pool's name based on its ID. - - :param pool_id: ID of the pool to look up - :type pool_id: int - - :raises: :class:`TypeError`, :class:`Error` - :returns: string - pool name, or None if it doesn't exist - """ - self.require_state("connected") - cdef: - int64_t _pool_id = pool_id - size_t size = 512 - char *name = NULL - - try: - while True: - name = realloc_chk(name, size) - with nogil: - ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size) - if ret >= 0: - break - elif ret != -errno.ERANGE and size <= 4096: - size *= 2 - elif ret == -errno.ENOENT: - return None - elif ret < 0: - raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id) - - return decode_cstr(name) - - finally: - free(name) - - @requires(('pool_name', str_type), ('auid', opt(int)), ('crush_rule', opt(int))) - def create_pool(self, pool_name, auid=None, crush_rule=None): - """ - Create a pool: - - with default settings: if auid=None and crush_rule=None - - owned by a specific auid: auid given and crush_rule=None - - with a specific CRUSH rule: if auid=None and crush_rule given - - with a specific CRUSH rule and auid: if auid and crush_rule given - - :param pool_name: name of the pool to create - :type pool_name: str - :param auid: the id of the owner of the new pool - :type auid: int - :param crush_rule: rule to use for placement in the new pool - :type crush_rule: int - - :raises: :class:`TypeError`, :class:`Error` - """ - self.require_state("connected") - - pool_name = cstr(pool_name, 'pool_name') - cdef: - char *_pool_name = pool_name - uint8_t _crush_rule - uint64_t _auid - - if auid is None and crush_rule is None: - with nogil: - ret = rados_pool_create(self.cluster, _pool_name) - elif auid is None: - _crush_rule = crush_rule - with nogil: - ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule) - elif crush_rule is None: - _auid = auid - with nogil: - ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid) - else: - _auid = auid - _crush_rule = crush_rule - with nogil: - ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule) - if ret < 0: - raise make_ex(ret, "error creating pool '%s'" % pool_name) - - @requires(('pool_id', int)) - def get_pool_base_tier(self, pool_id): - """ - Get base pool - - :returns: base pool, or pool_id if tiering is not configured for the pool - """ - self.require_state("connected") - cdef: - int64_t base_tier = 0 - int64_t _pool_id = pool_id - - with nogil: - ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier) - if ret < 0: - raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id) - return int(base_tier) - - @requires(('pool_name', str_type)) - def delete_pool(self, pool_name): - """ - Delete a pool and all data inside it. - - The pool is removed from the cluster immediately, - but the actual data is deleted in the background. - - :param pool_name: name of the pool to delete - :type pool_name: str - - :raises: :class:`TypeError`, :class:`Error` - """ - self.require_state("connected") - - pool_name = cstr(pool_name, 'pool_name') - cdef: - char *_pool_name = pool_name - - with nogil: - ret = rados_pool_delete(self.cluster, _pool_name) - if ret < 0: - raise make_ex(ret, "error deleting pool '%s'" % pool_name) - - @requires(('pool_id', int)) - def get_inconsistent_pgs(self, pool_id): - """ - List inconsistent placement groups in the given pool - - :param pool_id: ID of the pool in which PGs are listed - :type pool_id: int - :returns: list - inconsistent placement groups - """ - self.require_state("connected") - cdef: - int64_t pool = pool_id - size_t size = 512 - char *pgs = NULL - - try: - while True: - pgs = realloc_chk(pgs, size); - with nogil: - ret = rados_inconsistent_pg_list(self.cluster, pool, - pgs, size) - if ret > size: - size *= 2 - elif ret >= 0: - break - else: - raise make_ex(ret, "error calling inconsistent_pg_list") - return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg] - finally: - free(pgs) - - def list_pools(self): - """ - Gets a list of pool names. - - :returns: list - of pool names. - """ - self.require_state("connected") - cdef: - size_t size = 512 - char *c_names = NULL - - try: - while True: - c_names = realloc_chk(c_names, size) - with nogil: - ret = rados_pool_list(self.cluster, c_names, size) - if ret > size: - size *= 2 - elif ret >= 0: - break - return [name for name in decode_cstr(c_names[:ret]).split('\0') - if name] - finally: - free(c_names) - - def get_fsid(self): - """ - Get the fsid of the cluster as a hexadecimal string. - - :raises: :class:`Error` - :returns: str - cluster fsid - """ - self.require_state("connected") - cdef: - char *ret_buf - size_t buf_len = 37 - PyObject* ret_s = NULL - - ret_s = PyBytes_FromStringAndSize(NULL, buf_len) - try: - ret_buf = PyBytes_AsString(ret_s) - with nogil: - ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len) - if ret < 0: - raise make_ex(ret, "error getting cluster fsid") - if ret != buf_len: - _PyBytes_Resize(&ret_s, ret) - return ret_s - finally: - # We DECREF unconditionally: the cast to object above will have - # INCREFed if necessary. This also takes care of exceptions, - # including if _PyString_Resize fails (that will free the string - # itself and set ret_s to NULL, hence XDECREF). - ref.Py_XDECREF(ret_s) - - @requires(('ioctx_name', str_type)) - def open_ioctx(self, ioctx_name): - """ - Create an io context - - The io context allows you to perform operations within a particular - pool. - - :param ioctx_name: name of the pool - :type ioctx_name: str - - :raises: :class:`TypeError`, :class:`Error` - :returns: Ioctx - Rados Ioctx object - """ - self.require_state("connected") - ioctx_name = cstr(ioctx_name, 'ioctx_name') - cdef: - rados_ioctx_t ioctx - char *_ioctx_name = ioctx_name - with nogil: - ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx) - if ret < 0: - raise make_ex(ret, "error opening pool '%s'" % ioctx_name) - io = Ioctx(ioctx_name) - io.io = ioctx - return io - - def mon_command(self, cmd, inbuf, timeout=0, target=None): - """ - mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen) - returns (int ret, string outbuf, string outs) - """ - # NOTE(sileht): timeout is ignored because C API doesn't provide - # timeout argument, but we keep it for backward compat with old python binding - - self.require_state("connected") - cmd = cstr_list(cmd, 'c') - - if isinstance(target, int): - # NOTE(sileht): looks weird but test_monmap_dump pass int - target = str(target) - - target = cstr(target, 'target', opt=True) - inbuf = cstr(inbuf, 'inbuf') - - cdef: - char *_target = opt_str(target) - char **_cmd = to_bytes_array(cmd) - size_t _cmdlen = len(cmd) - - char *_inbuf = inbuf - size_t _inbuf_len = len(inbuf) - - char *_outbuf - size_t _outbuf_len - char *_outs - size_t _outs_len - - try: - if target: - with nogil: - ret = rados_mon_command_target(self.cluster, _target, - _cmd, _cmdlen, - _inbuf, _inbuf_len, - &_outbuf, &_outbuf_len, - &_outs, &_outs_len) - else: - with nogil: - ret = rados_mon_command(self.cluster, - _cmd, _cmdlen, - _inbuf, _inbuf_len, - &_outbuf, &_outbuf_len, - &_outs, &_outs_len) - - my_outs = decode_cstr(_outs[:_outs_len]) - my_outbuf = _outbuf[:_outbuf_len] - if _outs_len: - rados_buffer_free(_outs) - if _outbuf_len: - rados_buffer_free(_outbuf) - return (ret, my_outbuf, my_outs) - finally: - free(_cmd) - - def osd_command(self, osdid, cmd, inbuf, timeout=0): - """ - osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen) - returns (int ret, string outbuf, string outs) - """ - # NOTE(sileht): timeout is ignored because C API doesn't provide - # timeout argument, but we keep it for backward compat with old python binding - self.require_state("connected") - - cmd = cstr_list(cmd, 'cmd') - inbuf = cstr(inbuf, 'inbuf') - - cdef: - int _osdid = osdid - char **_cmd = to_bytes_array(cmd) - size_t _cmdlen = len(cmd) - - char *_inbuf = inbuf - size_t _inbuf_len = len(inbuf) - - char *_outbuf - size_t _outbuf_len - char *_outs - size_t _outs_len - - try: - with nogil: - ret = rados_osd_command(self.cluster, _osdid, - _cmd, _cmdlen, - _inbuf, _inbuf_len, - &_outbuf, &_outbuf_len, - &_outs, &_outs_len) - - my_outs = decode_cstr(_outs[:_outs_len]) - my_outbuf = _outbuf[:_outbuf_len] - if _outs_len: - rados_buffer_free(_outs) - if _outbuf_len: - rados_buffer_free(_outbuf) - return (ret, my_outbuf, my_outs) - finally: - free(_cmd) - - def mgr_command(self, cmd, inbuf, timeout=0): - """ - returns (int ret, string outbuf, string outs) - """ - # NOTE(sileht): timeout is ignored because C API doesn't provide - # timeout argument, but we keep it for backward compat with old python binding - self.require_state("connected") - - cmd = cstr_list(cmd, 'cmd') - inbuf = cstr(inbuf, 'inbuf') - - cdef: - char **_cmd = to_bytes_array(cmd) - size_t _cmdlen = len(cmd) - - char *_inbuf = inbuf - size_t _inbuf_len = len(inbuf) - - char *_outbuf - size_t _outbuf_len - char *_outs - size_t _outs_len - - try: - with nogil: - ret = rados_mgr_command(self.cluster, - _cmd, _cmdlen, - _inbuf, _inbuf_len, - &_outbuf, &_outbuf_len, - &_outs, &_outs_len) - - my_outs = decode_cstr(_outs[:_outs_len]) - my_outbuf = _outbuf[:_outbuf_len] - if _outs_len: - rados_buffer_free(_outs) - if _outbuf_len: - rados_buffer_free(_outbuf) - return (ret, my_outbuf, my_outs) - finally: - free(_cmd) - - def pg_command(self, pgid, cmd, inbuf, timeout=0): - """ - pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen) - returns (int ret, string outbuf, string outs) - """ - # NOTE(sileht): timeout is ignored because C API doesn't provide - # timeout argument, but we keep it for backward compat with old python binding - self.require_state("connected") - - pgid = cstr(pgid, 'pgid') - cmd = cstr_list(cmd, 'cmd') - inbuf = cstr(inbuf, 'inbuf') - - cdef: - char *_pgid = pgid - char **_cmd = to_bytes_array(cmd) - size_t _cmdlen = len(cmd) - - char *_inbuf = inbuf - size_t _inbuf_len = len(inbuf) - - char *_outbuf - size_t _outbuf_len - char *_outs - size_t _outs_len - - try: - with nogil: - ret = rados_pg_command(self.cluster, _pgid, - _cmd, _cmdlen, - _inbuf, _inbuf_len, - &_outbuf, &_outbuf_len, - &_outs, &_outs_len) - - my_outs = decode_cstr(_outs[:_outs_len]) - my_outbuf = _outbuf[:_outbuf_len] - if _outs_len: - rados_buffer_free(_outs) - if _outbuf_len: - rados_buffer_free(_outbuf) - return (ret, my_outbuf, my_outs) - finally: - free(_cmd) - - def wait_for_latest_osdmap(self): - self.require_state("connected") - with nogil: - ret = rados_wait_for_latest_osdmap(self.cluster) - return ret - - def blacklist_add(self, client_address, expire_seconds=0): - """ - Blacklist a client from the OSDs - - :param client_address: client address - :type client_address: str - :param expire_seconds: number of seconds to blacklist - :type expire_seconds: int - - :raises: :class:`Error` - """ - self.require_state("connected") - client_address = cstr(client_address, 'client_address') - cdef: - uint32_t _expire_seconds = expire_seconds - char *_client_address = client_address - - with nogil: - ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds) - if ret < 0: - raise make_ex(ret, "error blacklisting client '%s'" % client_address) - - def monitor_log(self, level, callback, arg): - if level not in MONITOR_LEVELS: - raise LogicError("invalid monitor level " + level) - if callback is not None and not callable(callback): - raise LogicError("callback must be a callable function or None") - - level = cstr(level, 'level') - cdef char *_level = level - - if callback is None: - with nogil: - r = rados_monitor_log(self.cluster, _level, NULL, NULL) - self.monitor_callback = None - self.monitor_callback2 = None - return - - cb = (callback, arg) - cdef PyObject* _arg = cb - with nogil: - r = rados_monitor_log(self.cluster, _level, - &__monitor_callback, _arg) - - if r: - raise make_ex(r, 'error calling rados_monitor_log') - # NOTE(sileht): Prevents the callback method from being garbage collected - self.monitor_callback = cb - self.monitor_callback2 = None - - def monitor_log2(self, level, callback, arg): - if level not in MONITOR_LEVELS: - raise LogicError("invalid monitor level " + level) - if callback is not None and not callable(callback): - raise LogicError("callback must be a callable function or None") - - level = cstr(level, 'level') - cdef char *_level = level - - if callback is None: - with nogil: - r = rados_monitor_log2(self.cluster, _level, NULL, NULL) - self.monitor_callback = None - self.monitor_callback2 = None - return - - cb = (callback, arg) - cdef PyObject* _arg = cb - with nogil: - r = rados_monitor_log2(self.cluster, _level, - &__monitor_callback2, _arg) - - if r: - raise make_ex(r, 'error calling rados_monitor_log') - # NOTE(sileht): Prevents the callback method from being garbage collected - self.monitor_callback = None - self.monitor_callback2 = cb - - -cdef class OmapIterator(object): - """Omap iterator""" - - cdef public Ioctx ioctx - cdef rados_omap_iter_t ctx - - def __cinit__(self, Ioctx ioctx): - self.ioctx = ioctx - - def __iter__(self): - return self - - def __next__(self): - """ - Get the next key-value pair in the object - :returns: next rados.OmapItem - """ - cdef: - char *key_ = NULL - char *val_ = NULL - size_t len_ - - with nogil: - ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_) - - if ret != 0: - raise make_ex(ret, "error iterating over the omap") - if key_ == NULL: - raise StopIteration() - key = decode_cstr(key_) - val = None - if val_ != NULL: - val = val_[:len_] - return (key, val) - - def __dealloc__(self): - with nogil: - rados_omap_get_end(self.ctx) - - -cdef class ObjectIterator(object): - """rados.Ioctx Object iterator""" - - cdef rados_list_ctx_t ctx - - cdef public object ioctx - - def __cinit__(self, Ioctx ioctx): - self.ioctx = ioctx - - with nogil: - ret = rados_nobjects_list_open(ioctx.io, &self.ctx) - if ret < 0: - raise make_ex(ret, "error iterating over the objects in ioctx '%s'" - % self.ioctx.name) - - def __iter__(self): - return self - - def __next__(self): - """ - Get the next object name and locator in the pool - - :raises: StopIteration - :returns: next rados.Ioctx Object - """ - cdef: - const char *key_ = NULL - const char *locator_ = NULL - const char *nspace_ = NULL - - with nogil: - ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_) - - if ret < 0: - raise StopIteration() - - key = decode_cstr(key_) - locator = decode_cstr(locator_) if locator_ != NULL else None - nspace = decode_cstr(nspace_) if nspace_ != NULL else None - return Object(self.ioctx, key, locator, nspace) - - def __dealloc__(self): - with nogil: - rados_nobjects_list_close(self.ctx) - - -cdef class XattrIterator(object): - """Extended attribute iterator""" - - cdef rados_xattrs_iter_t it - cdef char* _oid - - cdef public Ioctx ioctx - cdef public object oid - - def __cinit__(self, Ioctx ioctx, oid): - self.ioctx = ioctx - self.oid = cstr(oid, 'oid') - self._oid = self.oid - - with nogil: - ret = rados_getxattrs(ioctx.io, self._oid, &self.it) - if ret != 0: - raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid) - - def __iter__(self): - return self - - def __next__(self): - """ - Get the next xattr on the object - - :raises: StopIteration - :returns: pair - of name and value of the next Xattr - """ - cdef: - const char *name_ = NULL - const char *val_ = NULL - size_t len_ = 0 - - with nogil: - ret = rados_getxattrs_next(self.it, &name_, &val_, &len_) - if ret != 0: - raise make_ex(ret, "error iterating over the extended attributes \ -in '%s'" % self.oid) - if name_ == NULL: - raise StopIteration() - name = decode_cstr(name_) - val = val_[:len_] - return (name, val) - - def __dealloc__(self): - with nogil: - rados_getxattrs_end(self.it) - - -cdef class SnapIterator(object): - """Snapshot iterator""" - - cdef public Ioctx ioctx - - cdef rados_snap_t *snaps - cdef int max_snap - cdef int cur_snap - - def __cinit__(self, Ioctx ioctx): - self.ioctx = ioctx - # We don't know how big a buffer we need until we've called the - # function. So use the exponential doubling strategy. - cdef int num_snaps = 10 - while True: - self.snaps = realloc_chk(self.snaps, - num_snaps * - sizeof(rados_snap_t)) - - with nogil: - ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps) - if ret >= 0: - self.max_snap = ret - break - elif ret != -errno.ERANGE: - raise make_ex(ret, "error calling rados_snap_list for \ -ioctx '%s'" % self.ioctx.name) - num_snaps = num_snaps * 2 - self.cur_snap = 0 - - def __iter__(self): - return self - - def __next__(self): - """ - Get the next Snapshot - - :raises: :class:`Error`, StopIteration - :returns: Snap - next snapshot - """ - if self.cur_snap >= self.max_snap: - raise StopIteration - - cdef: - rados_snap_t snap_id = self.snaps[self.cur_snap] - int name_len = 10 - char *name = NULL - - try: - while True: - name = realloc_chk(name, name_len) - with nogil: - ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len) - if ret == 0: - break - elif ret != -errno.ERANGE: - raise make_ex(ret, "rados_snap_get_name error") - else: - name_len = name_len * 2 - - snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id) - self.cur_snap = self.cur_snap + 1 - return snap - finally: - free(name) - - -cdef class Snap(object): - """Snapshot object""" - cdef public Ioctx ioctx - cdef public object name - - # NOTE(sileht): old API was storing the ctypes object - # instead of the value .... - cdef public rados_snap_t snap_id - - def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id): - self.ioctx = ioctx - self.name = name - self.snap_id = snap_id - - def __str__(self): - return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \ - % (str(self.ioctx), self.name, self.snap_id) - - def get_timestamp(self): - """ - Find when a snapshot in the current pool occurred - - :raises: :class:`Error` - :returns: datetime - the data and time the snapshot was created - """ - cdef time_t snap_time - - with nogil: - ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time) - if ret != 0: - raise make_ex(ret, "rados_ioctx_snap_get_stamp error") - return datetime.fromtimestamp(snap_time) - - -cdef class Completion(object): - """completion object""" - - cdef public: - Ioctx ioctx - object oncomplete - object onsafe - - cdef: - rados_callback_t complete_cb - rados_callback_t safe_cb - rados_completion_t rados_comp - PyObject* buf - - def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe): - self.oncomplete = oncomplete - self.onsafe = onsafe - self.ioctx = ioctx - - def is_safe(self): - """ - Is an asynchronous operation safe? - - This does not imply that the safe callback has finished. - - :returns: True if the operation is safe - """ - with nogil: - ret = rados_aio_is_safe(self.rados_comp) - return ret == 1 - - def is_complete(self): - """ - Has an asynchronous operation completed? - - This does not imply that the safe callback has finished. - - :returns: True if the operation is completed - """ - with nogil: - ret = rados_aio_is_complete(self.rados_comp) - return ret == 1 - - def wait_for_safe(self): - """ - Wait for an asynchronous operation to be marked safe - - This does not imply that the safe callback has finished. - """ - with nogil: - rados_aio_wait_for_safe(self.rados_comp) - - def wait_for_complete(self): - """ - Wait for an asynchronous operation to complete - - This does not imply that the complete callback has finished. - """ - with nogil: - rados_aio_wait_for_complete(self.rados_comp) - - def wait_for_safe_and_cb(self): - """ - Wait for an asynchronous operation to be marked safe and for - the safe callback to have returned - """ - with nogil: - rados_aio_wait_for_safe_and_cb(self.rados_comp) - - def wait_for_complete_and_cb(self): - """ - Wait for an asynchronous operation to complete and for the - complete callback to have returned - - :returns: whether the operation is completed - """ - with nogil: - ret = rados_aio_wait_for_complete_and_cb(self.rados_comp) - return ret - - def get_return_value(self): - """ - Get the return value of an asychronous operation - - The return value is set when the operation is complete or safe, - whichever comes first. - - :returns: int - return value of the operation - """ - with nogil: - ret = rados_aio_get_return_value(self.rados_comp) - return ret - - def __dealloc__(self): - """ - Release a completion - - Call this when you no longer need the completion. It may not be - freed immediately if the operation is not acked and committed. - """ - ref.Py_XDECREF(self.buf) - self.buf = NULL - if self.rados_comp != NULL: - with nogil: - rados_aio_release(self.rados_comp) - self.rados_comp = NULL - - def _complete(self): - self.oncomplete(self) - with self.ioctx.lock: - if self.oncomplete: - self.ioctx.complete_completions.remove(self) - - def _safe(self): - self.onsafe(self) - with self.ioctx.lock: - if self.onsafe: - self.ioctx.safe_completions.remove(self) - - def _cleanup(self): - with self.ioctx.lock: - if self.oncomplete: - self.ioctx.complete_completions.remove(self) - if self.onsafe: - self.ioctx.safe_completions.remove(self) - - -class OpCtx(object): - def __enter__(self): - return self.create() - - def __exit__(self, type, msg, traceback): - self.release() - - -cdef class WriteOp(object): - cdef rados_write_op_t write_op - - def create(self): - with nogil: - self.write_op = rados_create_write_op() - return self - - def release(self): - with nogil: - rados_release_write_op(self.write_op) - - @requires(('exclusive', opt(int))) - def new(self, exclusive=None): - """ - Create the object. - """ - - cdef: - int _exclusive = exclusive - - with nogil: - rados_write_op_create(self.write_op, _exclusive, NULL) - - - def remove(self): - """ - Remove object. - """ - with nogil: - rados_write_op_remove(self.write_op) - - @requires(('flags', int)) - def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG): - """ - Set flags for the last operation added to this write_op. - :para flags: flags to apply to the last operation - :type flags: int - """ - - cdef: - int _flags = flags - - with nogil: - rados_write_op_set_flags(self.write_op, _flags) - - @requires(('to_write', bytes)) - def append(self, to_write): - """ - Append data to an object synchronously - :param to_write: data to write - :type to_write: bytes - """ - - cdef: - char *_to_write = to_write - size_t length = len(to_write) - - with nogil: - rados_write_op_append(self.write_op, _to_write, length) - - @requires(('to_write', bytes)) - def write_full(self, to_write): - """ - Write whole object, atomically replacing it. - :param to_write: data to write - :type to_write: bytes - """ - - cdef: - char *_to_write = to_write - size_t length = len(to_write) - - with nogil: - rados_write_op_write_full(self.write_op, _to_write, length) - - @requires(('to_write', bytes), ('offset', int)) - def write(self, to_write, offset=0): - """ - Write to offset. - :param to_write: data to write - :type to_write: bytes - :param offset: byte offset in the object to begin writing at - :type offset: int - """ - - cdef: - char *_to_write = to_write - size_t length = len(to_write) - uint64_t _offset = offset - - with nogil: - rados_write_op_write(self.write_op, _to_write, length, _offset) - - @requires(('offset', int), ('length', int)) - def zero(self, offset, length): - """ - Zero part of an object. - :param offset: byte offset in the object to begin writing at - :type offset: int - :param offset: number of zero to write - :type offset: int - """ - - cdef: - size_t _length = length - uint64_t _offset = offset - - with nogil: - rados_write_op_zero(self.write_op, _length, _offset) - - @requires(('offset', int)) - def truncate(self, offset): - """ - Truncate an object. - :param offset: byte offset in the object to begin truncating at - :type offset: int - """ - - cdef: - uint64_t _offset = offset - - with nogil: - rados_write_op_truncate(self.write_op, _offset) - - -class WriteOpCtx(WriteOp, OpCtx): - """write operation context manager""" - - -cdef class ReadOp(object): - cdef rados_read_op_t read_op - - def create(self): - with nogil: - self.read_op = rados_create_read_op() - return self - - def release(self): - with nogil: - rados_release_read_op(self.read_op) - - @requires(('flags', int)) - def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG): - """ - Set flags for the last operation added to this read_op. - :para flags: flags to apply to the last operation - :type flags: int - """ - - cdef: - int _flags = flags - - with nogil: - rados_read_op_set_flags(self.read_op, _flags) - - -class ReadOpCtx(ReadOp, OpCtx): - """read operation context manager""" - - -cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil: - """ - Callback to onsafe() for asynchronous operations - """ - cdef object cb = args - cb._safe() - return 0 - - -cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil: - """ - Callback to oncomplete() for asynchronous operations - """ - cdef object cb = args - cb._complete() - return 0 - - -cdef class Ioctx(object): - """rados.Ioctx object""" - # NOTE(sileht): attributes declared in .pyd - - def __init__(self, name): - self.name = name - self.state = "open" - - self.locator_key = "" - self.nspace = "" - self.lock = threading.Lock() - self.safe_completions = [] - self.complete_completions = [] - - def __enter__(self): - return self - - def __exit__(self, type_, value, traceback): - self.close() - return False - - def __dealloc__(self): - self.close() - - def __track_completion(self, completion_obj): - if completion_obj.oncomplete: - with self.lock: - self.complete_completions.append(completion_obj) - if completion_obj.onsafe: - with self.lock: - self.safe_completions.append(completion_obj) - - def __get_completion(self, oncomplete, onsafe): - """ - Constructs a completion to use with asynchronous operations - - :param oncomplete: what to do when the write is safe and complete in memory - on all replicas - :type oncomplete: completion - :param onsafe: what to do when the write is safe and complete on storage - on all replicas - :type onsafe: completion - - :raises: :class:`Error` - :returns: completion object - """ - - completion_obj = Completion(self, oncomplete, onsafe) - - cdef: - rados_callback_t complete_cb = NULL - rados_callback_t safe_cb = NULL - rados_completion_t completion - PyObject* p_completion_obj= completion_obj - - if oncomplete: - complete_cb = &__aio_complete_cb - if onsafe: - safe_cb = &__aio_safe_cb - - with nogil: - ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb, - &completion) - if ret < 0: - raise make_ex(ret, "error getting a completion") - - completion_obj.rados_comp = completion - return completion_obj - - @requires(('object_name', str_type), ('oncomplete', opt(Callable))) - def aio_stat(self, object_name, oncomplete): - """ - Asynchronously get object stats (size/mtime) - - oncomplete will be called with the returned size and mtime - as well as the completion: - - oncomplete(completion, size, mtime) - - :param object_name: the name of the object to get stats from - :type object_name: str - :param oncomplete: what to do when the stat is complete - :type oncomplete: completion - - :raises: :class:`Error` - :returns: completion object - """ - - object_name = cstr(object_name, 'object_name') - - cdef: - Completion completion - char *_object_name = object_name - uint64_t psize - time_t pmtime - - def oncomplete_(completion_v): - cdef Completion _completion_v = completion_v - return_value = _completion_v.get_return_value() - if return_value >= 0: - return oncomplete(_completion_v, psize, time.localtime(pmtime)) - else: - return oncomplete(_completion_v, None, None) - - completion = self.__get_completion(oncomplete_, None) - self.__track_completion(completion) - with nogil: - ret = rados_aio_stat(self.io, _object_name, completion.rados_comp, - &psize, &pmtime) - - if ret < 0: - completion._cleanup() - raise make_ex(ret, "error stating %s" % object_name) - return completion - - @requires(('object_name', str_type), ('to_write', bytes), ('offset', int), - ('oncomplete', opt(Callable)), ('onsafe', opt(Callable))) - def aio_write(self, object_name, to_write, offset=0, - oncomplete=None, onsafe=None): - """ - Write data to an object asynchronously - - Queues the write and returns. - - :param object_name: name of the object - :type object_name: str - :param to_write: data to write - :type to_write: bytes - :param offset: byte offset in the object to begin writing at - :type offset: int - :param oncomplete: what to do when the write is safe and complete in memory - on all replicas - :type oncomplete: completion - :param onsafe: what to do when the write is safe and complete on storage - on all replicas - :type onsafe: completion - - :raises: :class:`Error` - :returns: completion object - """ - - object_name = cstr(object_name, 'object_name') - - cdef: - Completion completion - char* _object_name = object_name - char* _to_write = to_write - size_t size = len(to_write) - uint64_t _offset = offset - - completion = self.__get_completion(oncomplete, onsafe) - self.__track_completion(completion) - with nogil: - ret = rados_aio_write(self.io, _object_name, completion.rados_comp, - _to_write, size, _offset) - if ret < 0: - completion._cleanup() - raise make_ex(ret, "error writing object %s" % object_name) - return completion - - @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)), - ('onsafe', opt(Callable))) - def aio_write_full(self, object_name, to_write, - oncomplete=None, onsafe=None): - """ - Asychronously write an entire object - - The object is filled with the provided data. If the object exists, - it is atomically truncated and then written. - Queues the write and returns. - - :param object_name: name of the object - :type object_name: str - :param to_write: data to write - :type to_write: str - :param oncomplete: what to do when the write is safe and complete in memory - on all replicas - :type oncomplete: completion - :param onsafe: what to do when the write is safe and complete on storage - on all replicas - :type onsafe: completion - - :raises: :class:`Error` - :returns: completion object - """ - - object_name = cstr(object_name, 'object_name') - - cdef: - Completion completion - char* _object_name = object_name - char* _to_write = to_write - size_t size = len(to_write) - - completion = self.__get_completion(oncomplete, onsafe) - self.__track_completion(completion) - with nogil: - ret = rados_aio_write_full(self.io, _object_name, - completion.rados_comp, - _to_write, size) - if ret < 0: - completion._cleanup() - raise make_ex(ret, "error writing object %s" % object_name) - return completion - - @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)), - ('onsafe', opt(Callable))) - def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None): - """ - Asychronously append data to an object - - Queues the write and returns. - - :param object_name: name of the object - :type object_name: str - :param to_append: data to append - :type to_append: str - :param offset: byte offset in the object to begin writing at - :type offset: int - :param oncomplete: what to do when the write is safe and complete in memory - on all replicas - :type oncomplete: completion - :param onsafe: what to do when the write is safe and complete on storage - on all replicas - :type onsafe: completion - - :raises: :class:`Error` - :returns: completion object - """ - object_name = cstr(object_name, 'object_name') - - cdef: - Completion completion - char* _object_name = object_name - char* _to_append = to_append - size_t size = len(to_append) - - completion = self.__get_completion(oncomplete, onsafe) - self.__track_completion(completion) - with nogil: - ret = rados_aio_append(self.io, _object_name, - completion.rados_comp, - _to_append, size) - if ret < 0: - completion._cleanup() - raise make_ex(ret, "error appending object %s" % object_name) - return completion - - def aio_flush(self): - """ - Block until all pending writes in an io context are safe - - :raises: :class:`Error` - """ - with nogil: - ret = rados_aio_flush(self.io) - if ret < 0: - raise make_ex(ret, "error flushing") - - @requires(('object_name', str_type), ('length', int), ('offset', int), - ('oncomplete', opt(Callable))) - def aio_read(self, object_name, length, offset, oncomplete): - """ - Asychronously read data from an object - - oncomplete will be called with the returned read value as - well as the completion: - - oncomplete(completion, data_read) - - :param object_name: name of the object to read from - :type object_name: str - :param length: the number of bytes to read - :type length: int - :param offset: byte offset in the object to begin reading from - :type offset: int - :param oncomplete: what to do when the read is complete - :type oncomplete: completion - - :raises: :class:`Error` - :returns: completion object - """ - - object_name = cstr(object_name, 'object_name') - - cdef: - Completion completion - char* _object_name = object_name - uint64_t _offset = offset - - char *ref_buf - size_t _length = length - - def oncomplete_(completion_v): - cdef Completion _completion_v = completion_v - return_value = _completion_v.get_return_value() - if return_value > 0 and return_value != length: - _PyBytes_Resize(&_completion_v.buf, return_value) - return oncomplete(_completion_v, _completion_v.buf if return_value >= 0 else None) - - completion = self.__get_completion(oncomplete_, None) - completion.buf = PyBytes_FromStringAndSize(NULL, length) - ret_buf = PyBytes_AsString(completion.buf) - self.__track_completion(completion) - with nogil: - ret = rados_aio_read(self.io, _object_name, completion.rados_comp, - ret_buf, _length, _offset) - if ret < 0: - completion._cleanup() - raise make_ex(ret, "error reading %s" % object_name) - return completion - - @requires(('object_name', str_type), ('cls', str_type), ('method', str_type), - ('data', bytes), ('length', int), - ('oncomplete', opt(Callable)), ('onsafe', opt(Callable))) - def aio_execute(self, object_name, cls, method, data, - length=8192, oncomplete=None, onsafe=None): - """ - Asynchronously execute an OSD class method on an object. - - oncomplete and onsafe will be called with the data returned from - the plugin as well as the completion: - - oncomplete(completion, data) - onsafe(completion, data) - - :param object_name: name of the object - :type object_name: str - :param cls: name of the object class - :type cls: str - :param method: name of the method - :type method: str - :param data: input data - :type data: bytes - :param length: size of output buffer in bytes (default=8192) - :type length: int - :param oncomplete: what to do when the execution is complete - :type oncomplete: completion - :param onsafe: what to do when the execution is safe and complete - :type onsafe: completion - - :raises: :class:`Error` - :returns: completion object - """ - - object_name = cstr(object_name, 'object_name') - cls = cstr(cls, 'cls') - method = cstr(method, 'method') - cdef: - Completion completion - char *_object_name = object_name - char *_cls = cls - char *_method = method - char *_data = data - size_t _data_len = len(data) - - char *ref_buf - size_t _length = length - - def oncomplete_(completion_v): - cdef Completion _completion_v = completion_v - return_value = _completion_v.get_return_value() - if return_value > 0 and return_value != length: - _PyBytes_Resize(&_completion_v.buf, return_value) - return oncomplete(_completion_v, _completion_v.buf if return_value >= 0 else None) - - def onsafe_(completion_v): - cdef Completion _completion_v = completion_v - return_value = _completion_v.get_return_value() - return onsafe(_completion_v, _completion_v.buf if return_value >= 0 else None) - - completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None) - completion.buf = PyBytes_FromStringAndSize(NULL, length) - ret_buf = PyBytes_AsString(completion.buf) - self.__track_completion(completion) - with nogil: - ret = rados_aio_exec(self.io, _object_name, completion.rados_comp, - _cls, _method, _data, _data_len, ret_buf, _length) - if ret < 0: - completion._cleanup() - raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name)) - return completion - - @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable))) - def aio_remove(self, object_name, oncomplete=None, onsafe=None): - """ - Asychronously remove an object - - :param object_name: name of the object to remove - :type object_name: str - :param oncomplete: what to do when the remove is safe and complete in memory - on all replicas - :type oncomplete: completion - :param onsafe: what to do when the remove is safe and complete on storage - on all replicas - :type onsafe: completion - - :raises: :class:`Error` - :returns: completion object - """ - object_name = cstr(object_name, 'object_name') - - cdef: - Completion completion - char* _object_name = object_name - - completion = self.__get_completion(oncomplete, onsafe) - self.__track_completion(completion) - with nogil: - ret = rados_aio_remove(self.io, _object_name, - completion.rados_comp) - if ret < 0: - completion._cleanup() - raise make_ex(ret, "error removing %s" % object_name) - return completion - - def require_ioctx_open(self): - """ - Checks if the rados.Ioctx object state is 'open' - - :raises: IoctxStateError - """ - if self.state != "open": - raise IoctxStateError("The pool is %s" % self.state) - - def change_auid(self, auid): - """ - Attempt to change an io context's associated auid "owner." - - Requires that you have write permission on both the current and new - auid. - - :raises: :class:`Error` - """ - self.require_ioctx_open() - - cdef: - uint64_t _auid = auid - - with nogil: - ret = rados_ioctx_pool_set_auid(self.io, _auid) - if ret < 0: - raise make_ex(ret, "error changing auid of '%s' to %d" - % (self.name, auid)) - - @requires(('loc_key', str_type)) - def set_locator_key(self, loc_key): - """ - Set the key for mapping objects to pgs within an io context. - - The key is used instead of the object name to determine which - placement groups an object is put in. This affects all subsequent - operations of the io context - until a different locator key is - set, all objects in this io context will be placed in the same pg. - - :param loc_key: the key to use as the object locator, or NULL to discard - any previously set key - :type loc_key: str - - :raises: :class:`TypeError` - """ - self.require_ioctx_open() - cloc_key = cstr(loc_key, 'loc_key') - cdef char *_loc_key = cloc_key - with nogil: - rados_ioctx_locator_set_key(self.io, _loc_key) - self.locator_key = loc_key - - def get_locator_key(self): - """ - Get the locator_key of context - - :returns: locator_key - """ - return self.locator_key - - @requires(('snap_id', long)) - def set_read(self, snap_id): - """ - Set the snapshot for reading objects. - - To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD) - - :param snap_id: the snapshot Id - :type snap_id: int - - :raises: :class:`TypeError` - """ - self.require_ioctx_open() - cdef rados_snap_t _snap_id = snap_id - with nogil: - rados_ioctx_snap_set_read(self.io, _snap_id) - - @requires(('nspace', str_type)) - def set_namespace(self, nspace): - """ - Set the namespace for objects within an io context. - - The namespace in addition to the object name fully identifies - an object. This affects all subsequent operations of the io context - - until a different namespace is set, all objects in this io context - will be placed in the same namespace. - - :param nspace: the namespace to use, or None/"" for the default namespace - :type nspace: str - - :raises: :class:`TypeError` - """ - self.require_ioctx_open() - if nspace is None: - nspace = "" - cnspace = cstr(nspace, 'nspace') - cdef char *_nspace = cnspace - with nogil: - rados_ioctx_set_namespace(self.io, _nspace) - self.nspace = nspace - - def get_namespace(self): - """ - Get the namespace of context - - :returns: namespace - """ - return self.nspace - - def close(self): - """ - Close a rados.Ioctx object. - - This just tells librados that you no longer need to use the io context. - It may not be freed immediately if there are pending asynchronous - requests on it, but you should not use an io context again after - calling this function on it. - """ - if self.state == "open": - self.require_ioctx_open() - with nogil: - rados_ioctx_destroy(self.io) - self.state = "closed" - - - @requires(('key', str_type), ('data', bytes)) - def write(self, key, data, offset=0): - """ - Write data to an object synchronously - - :param key: name of the object - :type key: str - :param data: data to write - :type data: bytes - :param offset: byte offset in the object to begin writing at - :type offset: int - - :raises: :class:`TypeError` - :raises: :class:`LogicError` - :returns: int - 0 on success - """ - self.require_ioctx_open() - - key = cstr(key, 'key') - cdef: - char *_key = key - char *_data = data - size_t length = len(data) - uint64_t _offset = offset - - with nogil: - ret = rados_write(self.io, _key, _data, length, _offset) - if ret == 0: - return ret - elif ret < 0: - raise make_ex(ret, "Ioctx.write(%s): failed to write %s" - % (self.name, key)) - else: - raise LogicError("Ioctx.write(%s): rados_write \ -returned %d, but should return zero on success." % (self.name, ret)) - - @requires(('key', str_type), ('data', bytes)) - def write_full(self, key, data): - """ - Write an entire object synchronously. - - The object is filled with the provided data. If the object exists, - it is atomically truncated and then written. - - :param key: name of the object - :type key: str - :param data: data to write - :type data: bytes - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: int - 0 on success - """ - self.require_ioctx_open() - key = cstr(key, 'key') - cdef: - char *_key = key - char *_data = data - size_t length = len(data) - - with nogil: - ret = rados_write_full(self.io, _key, _data, length) - if ret == 0: - return ret - elif ret < 0: - raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s" - % (self.name, key)) - else: - raise LogicError("Ioctx.write_full(%s): rados_write_full \ -returned %d, but should return zero on success." % (self.name, ret)) - - @requires(('key', str_type), ('data', bytes)) - def append(self, key, data): - """ - Append data to an object synchronously - - :param key: name of the object - :type key: str - :param data: data to write - :type data: bytes - - :raises: :class:`TypeError` - :raises: :class:`LogicError` - :returns: int - 0 on success - """ - self.require_ioctx_open() - key = cstr(key, 'key') - cdef: - char *_key = key - char *_data = data - size_t length = len(data) - - with nogil: - ret = rados_append(self.io, _key, _data, length) - if ret == 0: - return ret - elif ret < 0: - raise make_ex(ret, "Ioctx.append(%s): failed to append %s" - % (self.name, key)) - else: - raise LogicError("Ioctx.append(%s): rados_append \ -returned %d, but should return zero on success." % (self.name, ret)) - - @requires(('key', str_type)) - def read(self, key, length=8192, offset=0): - """ - Read data from an object synchronously - - :param key: name of the object - :type key: str - :param length: the number of bytes to read (default=8192) - :type length: int - :param offset: byte offset in the object to begin reading at - :type offset: int - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: str - data read from object - """ - self.require_ioctx_open() - key = cstr(key, 'key') - cdef: - char *_key = key - char *ret_buf - uint64_t _offset = offset - size_t _length = length - PyObject* ret_s = NULL - - ret_s = PyBytes_FromStringAndSize(NULL, length) - try: - ret_buf = PyBytes_AsString(ret_s) - with nogil: - ret = rados_read(self.io, _key, ret_buf, _length, _offset) - if ret < 0: - raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key)) - - if ret != length: - _PyBytes_Resize(&ret_s, ret) - - return ret_s - finally: - # We DECREF unconditionally: the cast to object above will have - # INCREFed if necessary. This also takes care of exceptions, - # including if _PyString_Resize fails (that will free the string - # itself and set ret_s to NULL, hence XDECREF). - ref.Py_XDECREF(ret_s) - - @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes)) - def execute(self, key, cls, method, data, length=8192): - """ - Execute an OSD class method on an object. - - :param key: name of the object - :type key: str - :param cls: name of the object class - :type cls: str - :param method: name of the method - :type method: str - :param data: input data - :type data: bytes - :param length: size of output buffer in bytes (default=8192) - :type length: int - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: (ret, method output) - """ - self.require_ioctx_open() - - key = cstr(key, 'key') - cls = cstr(cls, 'cls') - method = cstr(method, 'method') - cdef: - char *_key = key - char *_cls = cls - char *_method = method - char *_data = data - size_t _data_len = len(data) - - char *ref_buf - size_t _length = length - PyObject* ret_s = NULL - - ret_s = PyBytes_FromStringAndSize(NULL, length) - try: - ret_buf = PyBytes_AsString(ret_s) - with nogil: - ret = rados_exec(self.io, _key, _cls, _method, _data, - _data_len, ret_buf, _length) - if ret < 0: - raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key)) - - if ret != length: - _PyBytes_Resize(&ret_s, ret) - - return ret, ret_s - finally: - # We DECREF unconditionally: the cast to object above will have - # INCREFed if necessary. This also takes care of exceptions, - # including if _PyString_Resize fails (that will free the string - # itself and set ret_s to NULL, hence XDECREF). - ref.Py_XDECREF(ret_s) - - def get_stats(self): - """ - Get pool usage statistics - - :returns: dict - contains the following keys: - - - ``num_bytes`` (int) - size of pool in bytes - - - ``num_kb`` (int) - size of pool in kbytes - - - ``num_objects`` (int) - number of objects in the pool - - - ``num_object_clones`` (int) - number of object clones - - - ``num_object_copies`` (int) - number of object copies - - - ``num_objects_missing_on_primary`` (int) - number of objets - missing on primary - - - ``num_objects_unfound`` (int) - number of unfound objects - - - ``num_objects_degraded`` (int) - number of degraded objects - - - ``num_rd`` (int) - bytes read - - - ``num_rd_kb`` (int) - kbytes read - - - ``num_wr`` (int) - bytes written - - - ``num_wr_kb`` (int) - kbytes written - """ - self.require_ioctx_open() - cdef rados_pool_stat_t stats - with nogil: - ret = rados_ioctx_pool_stat(self.io, &stats) - if ret < 0: - raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name) - return {'num_bytes': stats.num_bytes, - 'num_kb': stats.num_kb, - 'num_objects': stats.num_objects, - 'num_object_clones': stats.num_object_clones, - 'num_object_copies': stats.num_object_copies, - "num_objects_missing_on_primary": stats.num_objects_missing_on_primary, - "num_objects_unfound": stats.num_objects_unfound, - "num_objects_degraded": stats.num_objects_degraded, - "num_rd": stats.num_rd, - "num_rd_kb": stats.num_rd_kb, - "num_wr": stats.num_wr, - "num_wr_kb": stats.num_wr_kb} - - @requires(('key', str_type)) - def remove_object(self, key): - """ - Delete an object - - This does not delete any snapshots of the object. - - :param key: the name of the object to delete - :type key: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: bool - True on success - """ - self.require_ioctx_open() - key = cstr(key, 'key') - cdef: - char *_key = key - - with nogil: - ret = rados_remove(self.io, _key) - if ret < 0: - raise make_ex(ret, "Failed to remove '%s'" % key) - return True - - @requires(('key', str_type)) - def trunc(self, key, size): - """ - Resize an object - - If this enlarges the object, the new area is logically filled with - zeroes. If this shrinks the object, the excess data is removed. - - :param key: the name of the object to resize - :type key: str - :param size: the new size of the object in bytes - :type size: int - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: int - 0 on success, otherwise raises error - """ - - self.require_ioctx_open() - key = cstr(key, 'key') - cdef: - char *_key = key - uint64_t _size = size - - with nogil: - ret = rados_trunc(self.io, _key, _size) - if ret < 0: - raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key)) - return ret - - @requires(('key', str_type)) - def stat(self, key): - """ - Get object stats (size/mtime) - - :param key: the name of the object to get stats from - :type key: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: (size,timestamp) - """ - self.require_ioctx_open() - - key = cstr(key, 'key') - cdef: - char *_key = key - uint64_t psize - time_t pmtime - - with nogil: - ret = rados_stat(self.io, _key, &psize, &pmtime) - if ret < 0: - raise make_ex(ret, "Failed to stat %r" % key) - return psize, time.localtime(pmtime) - - @requires(('key', str_type), ('xattr_name', str_type)) - def get_xattr(self, key, xattr_name): - """ - Get the value of an extended attribute on an object. - - :param key: the name of the object to get xattr from - :type key: str - :param xattr_name: which extended attribute to read - :type xattr_name: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: str - value of the xattr - """ - self.require_ioctx_open() - - key = cstr(key, 'key') - xattr_name = cstr(xattr_name, 'xattr_name') - cdef: - char *_key = key - char *_xattr_name = xattr_name - size_t ret_length = 4096 - char *ret_buf = NULL - - try: - while ret_length < 4096 * 1024 * 1024: - ret_buf = realloc_chk(ret_buf, ret_length) - with nogil: - ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length) - if ret == -errno.ERANGE: - ret_length *= 2 - elif ret < 0: - raise make_ex(ret, "Failed to get xattr %r" % xattr_name) - else: - break - return ret_buf[:ret] - finally: - free(ret_buf) - - @requires(('oid', str_type)) - def get_xattrs(self, oid): - """ - Start iterating over xattrs on an object. - - :param oid: the name of the object to get xattrs from - :type oid: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: XattrIterator - """ - self.require_ioctx_open() - return XattrIterator(self, oid) - - @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes)) - def set_xattr(self, key, xattr_name, xattr_value): - """ - Set an extended attribute on an object. - - :param key: the name of the object to set xattr to - :type key: str - :param xattr_name: which extended attribute to set - :type xattr_name: str - :param xattr_value: the value of the extended attribute - :type xattr_value: bytes - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: bool - True on success, otherwise raise an error - """ - self.require_ioctx_open() - - key = cstr(key, 'key') - xattr_name = cstr(xattr_name, 'xattr_name') - cdef: - char *_key = key - char *_xattr_name = xattr_name - char *_xattr_value = xattr_value - size_t _xattr_value_len = len(xattr_value) - - with nogil: - ret = rados_setxattr(self.io, _key, _xattr_name, - _xattr_value, _xattr_value_len) - if ret < 0: - raise make_ex(ret, "Failed to set xattr %r" % xattr_name) - return True - - @requires(('key', str_type), ('xattr_name', str_type)) - def rm_xattr(self, key, xattr_name): - """ - Removes an extended attribute on from an object. - - :param key: the name of the object to remove xattr from - :type key: str - :param xattr_name: which extended attribute to remove - :type xattr_name: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: bool - True on success, otherwise raise an error - """ - self.require_ioctx_open() - - key = cstr(key, 'key') - xattr_name = cstr(xattr_name, 'xattr_name') - cdef: - char *_key = key - char *_xattr_name = xattr_name - - with nogil: - ret = rados_rmxattr(self.io, _key, _xattr_name) - if ret < 0: - raise make_ex(ret, "Failed to delete key %r xattr %r" % - (key, xattr_name)) - return True - - def list_objects(self): - """ - Get ObjectIterator on rados.Ioctx object. - - :returns: ObjectIterator - """ - self.require_ioctx_open() - return ObjectIterator(self) - - def list_snaps(self): - """ - Get SnapIterator on rados.Ioctx object. - - :returns: SnapIterator - """ - self.require_ioctx_open() - return SnapIterator(self) - - @requires(('snap_name', str_type)) - def create_snap(self, snap_name): - """ - Create a pool-wide snapshot - - :param snap_name: the name of the snapshot - :type snap_name: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - """ - self.require_ioctx_open() - snap_name = cstr(snap_name, 'snap_name') - cdef char *_snap_name = snap_name - - with nogil: - ret = rados_ioctx_snap_create(self.io, _snap_name) - if ret != 0: - raise make_ex(ret, "Failed to create snap %s" % snap_name) - - @requires(('snap_name', str_type)) - def remove_snap(self, snap_name): - """ - Removes a pool-wide snapshot - - :param snap_name: the name of the snapshot - :type snap_name: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - """ - self.require_ioctx_open() - snap_name = cstr(snap_name, 'snap_name') - cdef char *_snap_name = snap_name - - with nogil: - ret = rados_ioctx_snap_remove(self.io, _snap_name) - if ret != 0: - raise make_ex(ret, "Failed to remove snap %s" % snap_name) - - @requires(('snap_name', str_type)) - def lookup_snap(self, snap_name): - """ - Get the id of a pool snapshot - - :param snap_name: the name of the snapshot to lookop - :type snap_name: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: Snap - on success - """ - self.require_ioctx_open() - csnap_name = cstr(snap_name, 'snap_name') - cdef: - char *_snap_name = csnap_name - rados_snap_t snap_id - - with nogil: - ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id) - if ret != 0: - raise make_ex(ret, "Failed to lookup snap %s" % snap_name) - return Snap(self, snap_name, int(snap_id)) - - @requires(('oid', str_type), ('snap_name', str_type)) - def snap_rollback(self, oid, snap_name): - """ - Rollback an object to a snapshot - - :param oid: the name of the object - :type oid: str - :param snap_name: the name of the snapshot - :type snap_name: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - """ - self.require_ioctx_open() - oid = cstr(oid, 'oid') - snap_name = cstr(snap_name, 'snap_name') - cdef: - char *_snap_name = snap_name - char *_oid = oid - - with nogil: - ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name) - if ret != 0: - raise make_ex(ret, "Failed to rollback %s" % oid) - - def get_last_version(self): - """ - Return the version of the last object read or written to. - - This exposes the internal version number of the last object read or - written via this io context - - :returns: version of the last object used - """ - self.require_ioctx_open() - with nogil: - ret = rados_get_last_version(self.io) - return int(ret) - - def create_write_op(self): - """ - create write operation object. - need call release_write_op after use - """ - return WriteOp().create() - - def create_read_op(self): - """ - create read operation object. - need call release_read_op after use - """ - return ReadOp().create() - - def release_write_op(self, write_op): - """ - release memory alloc by create_write_op - """ - write_op.release() - - def release_read_op(self, read_op): - """ - release memory alloc by create_read_op - :para read_op: read_op object - :type: int - """ - read_op.release() - - @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple)) - def set_omap(self, write_op, keys, values): - """ - set keys values to write_op - :para write_op: write_operation object - :type write_op: WriteOp - :para keys: a tuple of keys - :type keys: tuple - :para values: a tuple of values - :type values: tuple - """ - - if len(keys) != len(values): - raise Error("Rados(): keys and values must have the same number of items") - - keys = cstr_list(keys, 'keys') - cdef: - WriteOp _write_op = write_op - size_t key_num = len(keys) - char **_keys = to_bytes_array(keys) - char **_values = to_bytes_array(values) - size_t *_lens = to_csize_t_array([len(v) for v in values]) - - try: - with nogil: - rados_write_op_omap_set(_write_op.write_op, - _keys, - _values, - _lens, key_num) - finally: - free(_keys) - free(_values) - free(_lens) - - @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int))) - def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG): - """ - excute the real write operation - :para write_op: write operation object - :type write_op: WriteOp - :para oid: object name - :type oid: str - :para mtime: the time to set the mtime to, 0 for the current time - :type mtime: int - :para flags: flags to apply to the entire operation - :type flags: int - """ - - oid = cstr(oid, 'oid') - cdef: - WriteOp _write_op = write_op - char *_oid = oid - time_t _mtime = mtime - int _flags = flags - - with nogil: - ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags) - if ret != 0: - raise make_ex(ret, "Failed to operate write op for oid %s" % oid) - - @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int))) - def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG): - """ - excute the real write operation asynchronously - :para write_op: write operation object - :type write_op: WriteOp - :para oid: object name - :type oid: str - :param oncomplete: what to do when the remove is safe and complete in memory - on all replicas - :type oncomplete: completion - :param onsafe: what to do when the remove is safe and complete on storage - on all replicas - :type onsafe: completion - :para mtime: the time to set the mtime to, 0 for the current time - :type mtime: int - :para flags: flags to apply to the entire operation - :type flags: int - - :raises: :class:`Error` - :returns: completion object - """ - - oid = cstr(oid, 'oid') - cdef: - WriteOp _write_op = write_op - char *_oid = oid - Completion completion - time_t _mtime = mtime - int _flags = flags - - completion = self.__get_completion(oncomplete, onsafe) - self.__track_completion(completion) - - with nogil: - ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid, - &_mtime, _flags) - if ret != 0: - completion._cleanup() - raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid) - return completion - - @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int))) - def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG): - """ - excute the real read operation - :para read_op: read operation object - :type read_op: ReadOp - :para oid: object name - :type oid: str - :para flag: flags to apply to the entire operation - :type flag: int - """ - oid = cstr(oid, 'oid') - cdef: - ReadOp _read_op = read_op - char *_oid = oid - int _flag = flag - - with nogil: - ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag) - if ret != 0: - raise make_ex(ret, "Failed to operate read op for oid %s" % oid) - - @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int))) - def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG): - """ - excute the real read operation - :para read_op: read operation object - :type read_op: ReadOp - :para oid: object name - :type oid: str - :param oncomplete: what to do when the remove is safe and complete in memory - on all replicas - :type oncomplete: completion - :param onsafe: what to do when the remove is safe and complete on storage - on all replicas - :type onsafe: completion - :para flag: flags to apply to the entire operation - :type flag: int - """ - oid = cstr(oid, 'oid') - cdef: - ReadOp _read_op = read_op - char *_oid = oid - Completion completion - int _flag = flag - - completion = self.__get_completion(oncomplete, onsafe) - self.__track_completion(completion) - - with nogil: - ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag) - if ret != 0: - completion._cleanup() - raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid) - return completion - - @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int)) - def get_omap_vals(self, read_op, start_after, filter_prefix, max_return): - """ - get the omap values - :para read_op: read operation object - :type read_op: ReadOp - :para start_after: list keys starting after start_after - :type start_after: str - :para filter_prefix: list only keys beginning with filter_prefix - :type filter_prefix: str - :para max_return: list no more than max_return key/value pairs - :type max_return: int - :returns: an iterator over the requested omap values, return value from this action - """ - - start_after = cstr(start_after, 'start_after') if start_after else None - filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None - cdef: - char *_start_after = opt_str(start_after) - char *_filter_prefix = opt_str(filter_prefix) - ReadOp _read_op = read_op - rados_omap_iter_t iter_addr = NULL - int _max_return = max_return - int prval = 0 - - with nogil: - rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix, - _max_return, &iter_addr, NULL, &prval) - it = OmapIterator(self) - it.ctx = iter_addr - return it, int(prval) - - @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int)) - def get_omap_keys(self, read_op, start_after, max_return): - """ - get the omap keys - :para read_op: read operation object - :type read_op: ReadOp - :para start_after: list keys starting after start_after - :type start_after: str - :para max_return: list no more than max_return key/value pairs - :type max_return: int - :returns: an iterator over the requested omap values, return value from this action - """ - start_after = cstr(start_after, 'start_after') if start_after else None - cdef: - char *_start_after = opt_str(start_after) - ReadOp _read_op = read_op - rados_omap_iter_t iter_addr = NULL - int _max_return = max_return - int prval = 0 - - with nogil: - rados_read_op_omap_get_keys2(_read_op.read_op, _start_after, - _max_return, &iter_addr, NULL, &prval) - it = OmapIterator(self) - it.ctx = iter_addr - return it, int(prval) - - @requires(('read_op', ReadOp), ('keys', tuple)) - def get_omap_vals_by_keys(self, read_op, keys): - """ - get the omap values by keys - :para read_op: read operation object - :type read_op: ReadOp - :para keys: input key tuple - :type keys: tuple - :returns: an iterator over the requested omap values, return value from this action - """ - keys = cstr_list(keys, 'keys') - cdef: - ReadOp _read_op = read_op - rados_omap_iter_t iter_addr - char **_keys = to_bytes_array(keys) - size_t key_num = len(keys) - int prval = 0 - - try: - with nogil: - rados_read_op_omap_get_vals_by_keys(_read_op.read_op, - _keys, - key_num, &iter_addr, &prval) - it = OmapIterator(self) - it.ctx = iter_addr - return it, int(prval) - finally: - free(_keys) - - @requires(('write_op', WriteOp), ('keys', tuple)) - def remove_omap_keys(self, write_op, keys): - """ - remove omap keys specifiled - :para write_op: write operation object - :type write_op: WriteOp - :para keys: input key tuple - :type keys: tuple - """ - - keys = cstr_list(keys, 'keys') - cdef: - WriteOp _write_op = write_op - size_t key_num = len(keys) - char **_keys = to_bytes_array(keys) - - try: - with nogil: - rados_write_op_omap_rm_keys(_write_op.write_op, _keys, key_num) - finally: - free(_keys) - - @requires(('write_op', WriteOp)) - def clear_omap(self, write_op): - """ - Remove all key/value pairs from an object - :para write_op: write operation object - :type write_op: WriteOp - """ - - cdef: - WriteOp _write_op = write_op - - with nogil: - rados_write_op_omap_clear(_write_op.write_op) - - @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type), - ('duration', opt(int)), ('flags', int)) - def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0): - - """ - Take an exclusive lock on an object - - :param key: name of the object - :type key: str - :param name: name of the lock - :type name: str - :param cookie: cookie of the lock - :type cookie: str - :param desc: description of the lock - :type desc: str - :param duration: duration of the lock in seconds - :type duration: int - :param flags: flags - :type flags: int - - :raises: :class:`TypeError` - :raises: :class:`Error` - """ - self.require_ioctx_open() - - key = cstr(key, 'key') - name = cstr(name, 'name') - cookie = cstr(cookie, 'cookie') - desc = cstr(desc, 'desc') - - cdef: - char* _key = key - char* _name = name - char* _cookie = cookie - char* _desc = desc - uint8_t _flags = flags - timeval _duration - - if duration is None: - with nogil: - ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc, - NULL, _flags) - else: - _duration.tv_sec = duration - with nogil: - ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc, - &_duration, _flags) - - if ret < 0: - raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key)) - - @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type), - ('desc', str_type), ('duration', opt(int)), ('flags', int)) - def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0): - - """ - Take a shared lock on an object - - :param key: name of the object - :type key: str - :param name: name of the lock - :type name: str - :param cookie: cookie of the lock - :type cookie: str - :param tag: tag of the lock - :type tag: str - :param desc: description of the lock - :type desc: str - :param duration: duration of the lock in seconds - :type duration: int - :param flags: flags - :type flags: int - - :raises: :class:`TypeError` - :raises: :class:`Error` - """ - self.require_ioctx_open() - - key = cstr(key, 'key') - tag = cstr(tag, 'tag') - name = cstr(name, 'name') - cookie = cstr(cookie, 'cookie') - desc = cstr(desc, 'desc') - - cdef: - char* _key = key - char* _tag = tag - char* _name = name - char* _cookie = cookie - char* _desc = desc - uint8_t _flags = flags - timeval _duration - - if duration is None: - with nogil: - ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc, - NULL, _flags) - else: - _duration.tv_sec = duration - with nogil: - ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc, - &_duration, _flags) - if ret < 0: - raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key)) - - @requires(('key', str_type), ('name', str_type), ('cookie', str_type)) - def unlock(self, key, name, cookie): - - """ - Release a shared or exclusive lock on an object - - :param key: name of the object - :type key: str - :param name: name of the lock - :type name: str - :param cookie: cookie of the lock - :type cookie: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - """ - self.require_ioctx_open() - - key = cstr(key, 'key') - name = cstr(name, 'name') - cookie = cstr(cookie, 'cookie') - - cdef: - char* _key = key - char* _name = name - char* _cookie = cookie - - with nogil: - ret = rados_unlock(self.io, _key, _name, _cookie) - if ret < 0: - raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key)) - - def application_enable(self, app_name, force=False): - """ - Enable an application on an OSD pool - - :param app_name: application name - :type app_name: str - :param force: False if only a single app should exist per pool - :type expire_seconds: boool - - :raises: :class:`Error` - """ - app_name = cstr(app_name, 'app_name') - cdef: - char *_app_name = app_name - int _force = (1 if force else 0) - - with nogil: - ret = rados_application_enable(self.io, _app_name, _force) - if ret < 0: - raise make_ex(ret, "error enabling application") - - def application_list(self): - """ - Returns a list of enabled applications - - :returns: list of app name string - """ - cdef: - size_t length = 128 - char *apps = NULL - - try: - while True: - apps = realloc_chk(apps, length) - with nogil: - ret = rados_application_list(self.io, apps, &length) - if ret == 0: - return [decode_cstr(app) for app in - apps[:length].split(b'\0') if app] - elif ret == -errno.ENOENT: - return None - elif ret == -errno.ERANGE: - pass - else: - raise make_ex(ret, "error listing applications") - finally: - free(apps) - - def application_metadata_set(self, app_name, key, value): - """ - Sets application metadata on an OSD pool - - :param app_name: application name - :type app_name: str - :param key: metadata key - :type key: str - :param value: metadata value - :type value: str - - :raises: :class:`Error` - """ - app_name = cstr(app_name, 'app_name') - key = cstr(key, 'key') - value = cstr(value, 'value') - cdef: - char *_app_name = app_name - char *_key = key - char *_value = value - - with nogil: - ret = rados_application_metadata_set(self.io, _app_name, _key, - _value) - if ret < 0: - raise make_ex(ret, "error setting application metadata") - - def application_metadata_remove(self, app_name, key): - """ - Remove application metadata from an OSD pool - - :param app_name: application name - :type app_name: str - :param key: metadata key - :type key: str - - :raises: :class:`Error` - """ - app_name = cstr(app_name, 'app_name') - key = cstr(key, 'key') - cdef: - char *_app_name = app_name - char *_key = key - - with nogil: - ret = rados_application_metadata_remove(self.io, _app_name, _key) - if ret < 0: - raise make_ex(ret, "error removing application metadata") - - def application_metadata_list(self, app_name): - """ - Returns a list of enabled applications - - :param app_name: application name - :type app_name: str - :returns: list of key/value tuples - """ - app_name = cstr(app_name, 'app_name') - cdef: - char *_app_name = app_name - size_t key_length = 128 - size_t val_length = 128 - char *c_keys = NULL - char *c_vals = NULL - - try: - while True: - c_keys = realloc_chk(c_keys, key_length) - c_vals = realloc_chk(c_vals, val_length) - with nogil: - ret = rados_application_metadata_list(self.io, _app_name, - c_keys, &key_length, - c_vals, &val_length) - if ret == 0: - keys = [decode_cstr(key) for key in - c_keys[:key_length].split(b'\0') if key] - vals = [decode_cstr(val) for val in - c_vals[:val_length].split(b'\0') if val] - return zip(keys, vals) - elif ret == -errno.ERANGE: - pass - else: - raise make_ex(ret, "error listing application metadata") - finally: - free(c_keys) - free(c_vals) - - -def set_object_locator(func): - def retfunc(self, *args, **kwargs): - if self.locator_key is not None: - old_locator = self.ioctx.get_locator_key() - self.ioctx.set_locator_key(self.locator_key) - retval = func(self, *args, **kwargs) - self.ioctx.set_locator_key(old_locator) - return retval - else: - return func(self, *args, **kwargs) - return retfunc - - -def set_object_namespace(func): - def retfunc(self, *args, **kwargs): - if self.nspace is None: - raise LogicError("Namespace not set properly in context") - old_nspace = self.ioctx.get_namespace() - self.ioctx.set_namespace(self.nspace) - retval = func(self, *args, **kwargs) - self.ioctx.set_namespace(old_nspace) - return retval - return retfunc - - -class Object(object): - """Rados object wrapper, makes the object look like a file""" - def __init__(self, ioctx, key, locator_key=None, nspace=None): - self.key = key - self.ioctx = ioctx - self.offset = 0 - self.state = "exists" - self.locator_key = locator_key - self.nspace = "" if nspace is None else nspace - - def __str__(self): - return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \ - (str(self.ioctx), self.key, "--default--" - if self.nspace is "" else self.nspace, self.locator_key) - - def require_object_exists(self): - if self.state != "exists": - raise ObjectStateError("The object is %s" % self.state) - - @set_object_locator - @set_object_namespace - def read(self, length=1024 * 1024): - self.require_object_exists() - ret = self.ioctx.read(self.key, length, self.offset) - self.offset += len(ret) - return ret - - @set_object_locator - @set_object_namespace - def write(self, string_to_write): - self.require_object_exists() - ret = self.ioctx.write(self.key, string_to_write, self.offset) - if ret == 0: - self.offset += len(string_to_write) - return ret - - @set_object_locator - @set_object_namespace - def remove(self): - self.require_object_exists() - self.ioctx.remove_object(self.key) - self.state = "removed" - - @set_object_locator - @set_object_namespace - def stat(self): - self.require_object_exists() - return self.ioctx.stat(self.key) - - def seek(self, position): - self.require_object_exists() - self.offset = position - - @set_object_locator - @set_object_namespace - def get_xattr(self, xattr_name): - self.require_object_exists() - return self.ioctx.get_xattr(self.key, xattr_name) - - @set_object_locator - @set_object_namespace - def get_xattrs(self): - self.require_object_exists() - return self.ioctx.get_xattrs(self.key) - - @set_object_locator - @set_object_namespace - def set_xattr(self, xattr_name, xattr_value): - self.require_object_exists() - return self.ioctx.set_xattr(self.key, xattr_name, xattr_value) - - @set_object_locator - @set_object_namespace - def rm_xattr(self, xattr_name): - self.require_object_exists() - return self.ioctx.rm_xattr(self.key, xattr_name) - -MONITOR_LEVELS = [ - "debug", - "info", - "warn", "warning", - "err", "error", - "sec", - ] - - -class MonitorLog(object): - # NOTE(sileht): Keep this class for backward compat - # method moved to Rados.monitor_log() - """ - For watching cluster log messages. Instantiate an object and keep - it around while callback is periodically called. Construct with - 'level' to monitor 'level' messages (one of MONITOR_LEVELS). - arg will be passed to the callback. - - callback will be called with: - arg (given to __init__) - line (the full line, including timestamp, who, level, msg) - who (which entity issued the log message) - timestamp_sec (sec of a struct timespec) - timestamp_nsec (sec of a struct timespec) - seq (sequence number) - level (string representing the level of the log message) - msg (the message itself) - callback's return value is ignored - """ - def __init__(self, cluster, level, callback, arg): - self.level = level - self.callback = callback - self.arg = arg - self.cluster = cluster - self.cluster.monitor_log(level, callback, arg) -