Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / pybind / cephfs / cephfs.pyx
1 """
2 This module is a thin wrapper around libcephfs.
3 """
4
5 from cpython cimport PyObject, ref, exc
6 from libc cimport errno
7 from libc.stdint cimport *
8 from libc.stdlib cimport malloc, realloc, free
9
10 cimport rados
11
12 from collections import namedtuple
13 from datetime import datetime
14 import errno
15 import os
16 import sys
17
18 # Are we running Python 2.x
19 if sys.version_info[0] < 3:
20     str_type = basestring
21 else:
22     str_type = str
23
24
25 cdef extern from "Python.h":
26     # These are in cpython/string.pxd, but use "object" types instead of
27     # PyObject*, which invokes assumptions in cpython that we need to
28     # legitimately break to implement zero-copy string buffers in Image.read().
29     # This is valid use of the Python API and documented as a special case.
30     PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
31     char* PyBytes_AsString(PyObject *string) except NULL
32     int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
33     void PyEval_InitThreads()
34
35
36 cdef extern from "sys/statvfs.h":
37     cdef struct statvfs:
38         unsigned long int f_bsize
39         unsigned long int f_frsize
40         unsigned long int f_blocks
41         unsigned long int f_bfree
42         unsigned long int f_bavail
43         unsigned long int f_files
44         unsigned long int f_ffree
45         unsigned long int f_favail
46         unsigned long int f_fsid
47         unsigned long int f_flag
48         unsigned long int f_namemax
49         unsigned long int f_padding[32]
50
51
52 cdef extern from "dirent.h":
53     cdef struct dirent:
54         long int d_ino
55         unsigned long int d_off
56         unsigned short int d_reclen
57         unsigned char d_type
58         char d_name[256]
59
60
61 cdef extern from "time.h":
62     ctypedef long int time_t
63
64 cdef extern from "time.h":
65     cdef struct timespec:
66         time_t      tv_sec
67         long int    tv_nsec
68
69 cdef extern from "sys/types.h":
70     ctypedef unsigned long mode_t
71
72 cdef extern from "cephfs/ceph_statx.h":
73     cdef struct statx "ceph_statx":
74         uint32_t    stx_mask
75         uint32_t    stx_blksize
76         uint32_t    stx_nlink
77         uint32_t    stx_uid
78         uint32_t    stx_gid
79         uint16_t    stx_mode
80         uint64_t    stx_ino
81         uint64_t    stx_size
82         uint64_t    stx_blocks
83         uint64_t    stx_dev
84         uint64_t    stx_rdev
85         timespec    stx_atime
86         timespec    stx_ctime
87         timespec    stx_mtime
88         timespec    stx_btime
89         uint64_t    stx_version
90
91 cdef extern from "cephfs/libcephfs.h" nogil:
92     cdef struct ceph_mount_info:
93         pass
94
95     cdef struct ceph_dir_result:
96         pass
97
98     ctypedef void* rados_t
99
100     const char *ceph_version(int *major, int *minor, int *patch)
101
102     int ceph_create(ceph_mount_info **cmount, const char * const id)
103     int ceph_create_from_rados(ceph_mount_info **cmount, rados_t cluster)
104     int ceph_init(ceph_mount_info *cmount)
105     void ceph_shutdown(ceph_mount_info *cmount)
106
107     int ceph_conf_read_file(ceph_mount_info *cmount, const char *path_list)
108     int ceph_conf_parse_argv(ceph_mount_info *cmount, int argc, const char **argv)
109     int ceph_conf_get(ceph_mount_info *cmount, const char *option, char *buf, size_t len)
110     int ceph_conf_set(ceph_mount_info *cmount, const char *option, const char *value)
111
112     int ceph_mount(ceph_mount_info *cmount, const char *root)
113     int ceph_unmount(ceph_mount_info *cmount)
114     int ceph_fstatx(ceph_mount_info *cmount, int fd, statx *stx, unsigned want, unsigned flags)
115     int ceph_statx(ceph_mount_info *cmount, const char *path, statx *stx, unsigned want, unsigned flags)
116     int ceph_statfs(ceph_mount_info *cmount, const char *path, statvfs *stbuf)
117
118     int ceph_mds_command(ceph_mount_info *cmount, const char *mds_spec, const char **cmd, size_t cmdlen,
119                          const char *inbuf, size_t inbuflen, char **outbuf, size_t *outbuflen,
120                          char **outs, size_t *outslen)
121     int ceph_rename(ceph_mount_info *cmount, const char *from_, const char *to)
122     int ceph_link(ceph_mount_info *cmount, const char *existing, const char *newname)
123     int ceph_unlink(ceph_mount_info *cmount, const char *path)
124     int ceph_symlink(ceph_mount_info *cmount, const char *existing, const char *newname)
125     int ceph_readlink(ceph_mount_info *cmount, const char *path, char *buf, int64_t size)
126     int ceph_setxattr(ceph_mount_info *cmount, const char *path, const char *name,
127                       const void *value, size_t size, int flags)
128     int ceph_getxattr(ceph_mount_info *cmount, const char *path, const char *name,
129                       void *value, size_t size)
130     int ceph_write(ceph_mount_info *cmount, int fd, const char *buf, int64_t size, int64_t offset)
131     int ceph_read(ceph_mount_info *cmount, int fd, char *buf, int64_t size, int64_t offset)
132     int ceph_flock(ceph_mount_info *cmount, int fd, int operation, uint64_t owner)
133     int ceph_close(ceph_mount_info *cmount, int fd)
134     int ceph_open(ceph_mount_info *cmount, const char *path, int flags, mode_t mode)
135     int ceph_mkdir(ceph_mount_info *cmount, const char *path, mode_t mode)
136     int ceph_mkdirs(ceph_mount_info *cmount, const char *path, mode_t mode)
137     int ceph_closedir(ceph_mount_info *cmount, ceph_dir_result *dirp)
138     int ceph_opendir(ceph_mount_info *cmount, const char *name, ceph_dir_result **dirpp)
139     int ceph_chdir(ceph_mount_info *cmount, const char *path)
140     dirent * ceph_readdir(ceph_mount_info *cmount, ceph_dir_result *dirp)
141     int ceph_rmdir(ceph_mount_info *cmount, const char *path)
142     const char* ceph_getcwd(ceph_mount_info *cmount)
143     int ceph_sync_fs(ceph_mount_info *cmount)
144     int ceph_fsync(ceph_mount_info *cmount, int fd, int syncdataonly)
145     int ceph_conf_parse_argv(ceph_mount_info *cmount, int argc, const char **argv)
146     void ceph_buffer_free(char *buf)
147
148
149
150 class Error(Exception):
151     pass
152
153
154 class OSError(Error):
155     def __init__(self, errno, strerror):
156         self.errno = errno
157         self.strerror = strerror
158
159     def __str__(self):
160         return '[Errno {0}] {1}'.format(self.errno, self.strerror)
161
162
163 class PermissionError(OSError):
164     pass
165
166
167 class ObjectNotFound(OSError):
168     pass
169
170
171 class NoData(OSError):
172     pass
173
174
175 class ObjectExists(OSError):
176     pass
177
178
179 class IOError(OSError):
180     pass
181
182
183 class NoSpace(OSError):
184     pass
185
186
187 class InvalidValue(OSError):
188     pass
189
190
191 class OperationNotSupported(OSError):
192     pass
193
194
195 class LibCephFSStateError(Error):
196     pass
197
198
199 class WouldBlock(OSError):
200     pass
201
202
203 class OutOfRange(OSError):
204     pass
205
206
207 IF UNAME_SYSNAME == "FreeBSD":
208     cdef errno_to_exception =  {
209         errno.EPERM      : PermissionError,
210         errno.ENOENT     : ObjectNotFound,
211         errno.EIO        : IOError,
212         errno.ENOSPC     : NoSpace,
213         errno.EEXIST     : ObjectExists,
214         errno.ENOATTR    : NoData,
215         errno.EINVAL     : InvalidValue,
216         errno.EOPNOTSUPP : OperationNotSupported,
217         errno.ERANGE     : OutOfRange,
218         errno.EWOULDBLOCK: WouldBlock,
219     }
220 ELSE:
221     cdef errno_to_exception =  {
222         errno.EPERM      : PermissionError,
223         errno.ENOENT     : ObjectNotFound,
224         errno.EIO        : IOError,
225         errno.ENOSPC     : NoSpace,
226         errno.EEXIST     : ObjectExists,
227         errno.ENODATA    : NoData,
228         errno.EINVAL     : InvalidValue,
229         errno.EOPNOTSUPP : OperationNotSupported,
230         errno.ERANGE     : OutOfRange,
231         errno.EWOULDBLOCK: WouldBlock,
232     }
233
234
235 cdef make_ex(ret, msg):
236     """
237     Translate a librados return code into an exception.
238
239     :param ret: the return code
240     :type ret: int
241     :param msg: the error message to use
242     :type msg: str
243     :returns: a subclass of :class:`Error`
244     """
245     ret = abs(ret)
246     if ret in errno_to_exception:
247         return errno_to_exception[ret](ret, msg)
248     else:
249         return Error(ret, msg + (": error code %d" % ret))
250
251
252 class DirEntry(namedtuple('DirEntry',
253                ['d_ino', 'd_off', 'd_reclen', 'd_type', 'd_name'])):
254     DT_DIR = 0x4
255     DT_REG = 0xA
256     DT_LNK = 0xC
257     def is_dir(self):
258         return self.d_type == self.DT_DIR
259
260     def is_symbol_file(self):
261         return self.d_type == self.DT_LNK
262
263     def is_file(self):
264         return self.d_type == self.DT_REG
265
266 StatResult = namedtuple('StatResult',
267                         ["st_dev", "st_ino", "st_mode", "st_nlink", "st_uid",
268                          "st_gid", "st_rdev", "st_size", "st_blksize",
269                          "st_blocks", "st_atime", "st_mtime", "st_ctime"])
270
271 cdef class DirResult(object):
272     cdef ceph_dir_result *handler
273
274
275 def cstr(val, name, encoding="utf-8", opt=False):
276     """
277     Create a byte string from a Python string
278
279     :param basestring val: Python string
280     :param str name: Name of the string parameter, for exceptions
281     :param str encoding: Encoding to use
282     :param bool opt: If True, None is allowed
283     :rtype: bytes
284     :raises: :class:`InvalidArgument`
285     """
286     if opt and val is None:
287         return None
288     if isinstance(val, bytes):
289         return val
290     elif isinstance(val, unicode):
291         return val.encode(encoding)
292     else:
293         raise TypeError('%s must be a string' % name)
294
295
296 def cstr_list(list_str, name, encoding="utf-8"):
297     return [cstr(s, name) for s in list_str]
298
299
300 def decode_cstr(val, encoding="utf-8"):
301     """
302     Decode a byte string into a Python string.
303
304     :param bytes val: byte string
305     :rtype: unicode or None
306     """
307     if val is None:
308         return None
309
310     return val.decode(encoding)
311
312
313 cdef char* opt_str(s) except? NULL:
314     if s is None:
315         return NULL
316     return s
317
318
319 cdef char ** to_bytes_array(list_bytes):
320     cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
321     if ret == NULL:
322         raise MemoryError("malloc failed")
323     for i in xrange(len(list_bytes)):
324         ret[i] = <char *>list_bytes[i]
325     return ret
326
327
328 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
329     cdef void *ret = realloc(ptr, size)
330     if ret == NULL:
331         raise MemoryError("realloc failed")
332     return ret
333
334
335 cdef class LibCephFS(object):
336     """libcephfs python wrapper"""
337
338     cdef public object state
339     cdef ceph_mount_info *cluster
340
341     def require_state(self, *args):
342         if self.state in args:
343             return
344         raise LibCephFSStateError("You cannot perform that operation on a "
345                                   "CephFS object in state %s." % (self.state))
346
347     def __cinit__(self, conf=None, conffile=None, auth_id=None, rados_inst=None):
348         """Create a libcephfs wrapper
349
350         :param conf dict opt: settings overriding the default ones and conffile
351         :param conffile str opt: the path to ceph.conf to override the default settings
352         :auth_id str opt: the id used to authenticate the client entity
353         :rados_inst Rados opt: a rados.Rados instance
354         """
355         PyEval_InitThreads()
356         self.state = "uninitialized"
357         if rados_inst is not None:
358             if auth_id is not None or conffile is not None or conf is not None:
359                 raise make_ex(errno.EINVAL,
360                               "May not pass RADOS instance as well as other configuration")
361
362             self.create_with_rados(rados_inst)
363         else:
364             self.create(conf, conffile, auth_id)
365
366     def create_with_rados(self, rados.Rados rados_inst):
367         cdef int ret
368         with nogil:
369             ret = ceph_create_from_rados(&self.cluster, rados_inst.cluster)
370         if ret != 0:
371             raise Error("libcephfs_initialize failed with error code: %d" % ret)
372         self.state = "configuring"
373
374     def create(self, conf=None, conffile=None, auth_id=None):
375         if conf is not None and not isinstance(conf, dict):
376             raise TypeError("conf must be dict or None")
377         cstr(conffile, 'configfile', opt=True)
378         auth_id = cstr(auth_id, 'auth_id', opt=True)
379
380         cdef:
381             char* _auth_id = opt_str(auth_id)
382             int ret
383
384         with nogil:
385             ret = ceph_create(&self.cluster, <const char*>_auth_id)
386         if ret != 0:
387             raise Error("libcephfs_initialize failed with error code: %d" % ret)
388
389         self.state = "configuring"
390         if conffile is not None:
391             # read the default conf file when '' is given
392             if conffile == '':
393                 conffile = None
394             self.conf_read_file(conffile)
395         if conf is not None:
396             for key, value in conf.iteritems():
397                 self.conf_set(key, value)
398
399     def conf_read_file(self, conffile=None):
400         conffile = cstr(conffile, 'conffile', opt=True)
401         cdef:
402             char *_conffile = opt_str(conffile)
403         with nogil:
404             ret = ceph_conf_read_file(self.cluster, <const char*>_conffile)
405         if ret != 0:
406             raise make_ex(ret, "error calling conf_read_file")
407
408     def conf_parse_argv(self, argv):
409         self.require_state("configuring")
410         cargv = cstr_list(argv, 'argv')
411         cdef:
412             int _argc = len(argv)
413             char **_argv = to_bytes_array(cargv)
414
415         try:
416             with nogil:
417                 ret = ceph_conf_parse_argv(self.cluster, _argc,
418                                            <const char **>_argv)
419             if ret != 0:
420                 raise make_ex(ret, "error calling conf_parse_argv")
421         finally:
422             free(_argv)
423
424     def shutdown(self):
425         """
426         Unmount and destroy the ceph mount handle.
427         """
428         if self.state in ["initialized", "mounted"]:
429             with nogil:
430                 ceph_shutdown(self.cluster)
431             self.state = "shutdown"
432
433     def __enter__(self):
434         self.mount()
435         return self
436
437     def __exit__(self, type_, value, traceback):
438         self.shutdown()
439         return False
440
441     def __dealloc__(self):
442         self.shutdown()
443
444     def version(self):
445         """
446         Get the version number of the ``libcephfs`` C library.
447
448         :returns: a tuple of ``(major, minor, extra)`` components of the
449                   libcephfs version
450         """
451         cdef:
452             int major = 0
453             int minor = 0
454             int extra = 0
455         with nogil:
456             ceph_version(&major, &minor, &extra)
457         return (major, minor, extra)
458
459     def conf_get(self, option):
460         self.require_state("configuring", "initialized", "mounted")
461
462         option = cstr(option, 'option')
463         cdef:
464             char *_option = option
465             size_t length = 20
466             char *ret_buf = NULL
467
468         try:
469             while True:
470                 ret_buf = <char *>realloc_chk(ret_buf, length)
471                 with nogil:
472                     ret = ceph_conf_get(self.cluster, _option, ret_buf, length)
473                 if ret == 0:
474                     return decode_cstr(ret_buf)
475                 elif ret == -errno.ENAMETOOLONG:
476                     length = length * 2
477                 elif ret == -errno.ENOENT:
478                     return None
479                 else:
480                     raise make_ex(ret, "error calling conf_get")
481         finally:
482             free(ret_buf)
483
484     def conf_set(self, option, val):
485         self.require_state("configuring", "initialized", "mounted")
486
487         option = cstr(option, 'option')
488         val = cstr(val, 'val')
489         cdef:
490             char *_option = option
491             char *_val = val
492
493         with nogil:
494             ret = ceph_conf_set(self.cluster, _option, _val)
495         if ret != 0:
496             raise make_ex(ret, "error calling conf_set")
497
498     def init(self):
499         self.require_state("configuring")
500         with nogil:
501             ret = ceph_init(self.cluster)
502         if ret != 0:
503             raise make_ex(ret, "error calling ceph_init")
504         self.state = "initialized"
505
506     def mount(self):
507         if self.state == "configuring":
508             self.init()
509         self.require_state("initialized")
510         with nogil:
511             ret = ceph_mount(self.cluster, "/")
512         if ret != 0:
513             raise make_ex(ret, "error calling ceph_mount")
514         self.state = "mounted"
515
516     def unmount(self):
517         self.require_state("mounted")
518         with nogil:
519             ret = ceph_unmount(self.cluster)
520         if ret != 0:
521             raise make_ex(ret, "error calling ceph_unmount")
522         self.state = "initialized"
523
524     def statfs(self, path):
525         self.require_state("mounted")
526         path = cstr(path, 'path')
527         cdef:
528             char* _path = path
529             statvfs statbuf
530
531         with nogil:
532             ret = ceph_statfs(self.cluster, _path, &statbuf)
533         if ret < 0:
534             raise make_ex(ret, "statfs failed: %s" % path)
535         return {'f_bsize': statbuf.f_bsize,
536                 'f_frsize': statbuf.f_frsize,
537                 'f_blocks': statbuf.f_blocks,
538                 'f_bfree': statbuf.f_bfree,
539                 'f_bavail': statbuf.f_bavail,
540                 'f_files': statbuf.f_files,
541                 'f_ffree': statbuf.f_ffree,
542                 'f_favail': statbuf.f_favail,
543                 'f_fsid': statbuf.f_fsid,
544                 'f_flag': statbuf.f_flag,
545                 'f_namemax': statbuf.f_namemax}
546
547     def sync_fs(self):
548         self.require_state("mounted")
549         with nogil:
550             ret = ceph_sync_fs(self.cluster)
551         if ret < 0:
552             raise make_ex(ret, "sync_fs failed")
553
554     def fsync(self, int fd, int syncdataonly):
555         self.require_state("mounted")
556         with nogil:
557             ret = ceph_fsync(self.cluster, fd, syncdataonly)
558         if ret < 0:
559             raise make_ex(ret, "fsync failed")
560
561     def getcwd(self):
562         self.require_state("mounted")
563         with nogil:
564             ret = ceph_getcwd(self.cluster)
565         return ret
566
567     def chdir(self, path):
568         self.require_state("mounted")
569
570         path = cstr(path, 'path')
571         cdef char* _path = path
572         with nogil:
573             ret = ceph_chdir(self.cluster, _path)
574         if ret < 0:
575             raise make_ex(ret, "chdir failed")
576
577     def opendir(self, path):
578         self.require_state("mounted")
579
580         path = cstr(path, 'path')
581         cdef:
582             char* _path = path
583             ceph_dir_result *dir_handler
584         with nogil:
585             ret = ceph_opendir(self.cluster, _path, &dir_handler);
586         if ret < 0:
587             raise make_ex(ret, "opendir failed")
588         d = DirResult()
589         d.handler = dir_handler
590         return d
591
592     def readdir(self, DirResult dir_handler):
593         self.require_state("mounted")
594
595         cdef ceph_dir_result *_dir_handler = dir_handler.handler
596         with nogil:
597             dirent = ceph_readdir(self.cluster, _dir_handler)
598         if not dirent:
599             return None
600
601         return DirEntry(d_ino=dirent.d_ino,
602                         d_off=dirent.d_off,
603                         d_reclen=dirent.d_reclen,
604                         d_type=dirent.d_type,
605                         d_name=dirent.d_name)
606
607     def closedir(self, DirResult dir_handler):
608         self.require_state("mounted")
609         cdef:
610             ceph_dir_result *_dir_handler = dir_handler.handler
611
612         with nogil:
613             ret = ceph_closedir(self.cluster, _dir_handler)
614         if ret < 0:
615             raise make_ex(ret, "closedir failed")
616
617     def mkdir(self, path, mode):
618         self.require_state("mounted")
619         path = cstr(path, 'path')
620         if not isinstance(mode, int):
621             raise TypeError('mode must be an int')
622         cdef:
623             char* _path = path
624             int _mode = mode
625         with nogil:
626             ret = ceph_mkdir(self.cluster, _path, _mode)
627         if ret < 0:
628             raise make_ex(ret, "error in mkdir '%s'" % path)
629
630     def mkdirs(self, path, mode):
631         self.require_state("mounted")
632         path = cstr(path, 'path')
633         if not isinstance(mode, int):
634             raise TypeError('mode must be an int')
635         cdef:
636             char* _path = path
637             int _mode = mode
638
639         with nogil:
640             ret = ceph_mkdirs(self.cluster, _path, _mode)
641         if ret < 0:
642             raise make_ex(ret, "error in mkdirs '%s'" % path)
643
644     def rmdir(self, path):
645         self.require_state("mounted")
646         path = cstr(path, 'path')
647         cdef char* _path = path
648         ret = ceph_rmdir(self.cluster, _path)
649         if ret < 0:
650             raise make_ex(ret, "error in rmdir '%s'" % path)
651
652     def open(self, path, flags, mode=0):
653         self.require_state("mounted")
654         path = cstr(path, 'path')
655
656         if not isinstance(mode, int):
657             raise TypeError('mode must be an int')
658         if isinstance(flags, str):
659             cephfs_flags = 0
660             if flags == '':
661                 cephfs_flags = os.O_RDONLY
662             else:
663                 access_flags = 0;
664                 for c in flags:
665                     if c == 'r':
666                         access_flags = 1;
667                     elif c == 'w':
668                         access_flags = 2;
669                         cephfs_flags |= os.O_TRUNC | os.O_CREAT
670                     elif access_flags > 0 and c == '+':
671                         access_flags = 3;
672                     else:
673                         raise make_ex(errno.EOPNOTSUPP,
674                                       "open flags doesn't support %s" % c)
675
676                 if access_flags == 1:
677                     cephfs_flags |= os.O_RDONLY;
678                 elif access_flags == 2:
679                     cephfs_flags |= os.O_WRONLY;
680                 else:
681                     cephfs_flags |= os.O_RDWR;
682
683         elif isinstance(flags, int):
684             cephfs_flags = flags
685         else:
686             raise TypeError("flags must be a string or an integer")
687
688         cdef:
689             char* _path = path
690             int _flags = cephfs_flags
691             int _mode = mode
692
693         with nogil:
694             ret = ceph_open(self.cluster, _path, _flags, _mode)
695         if ret < 0:
696             raise make_ex(ret, "error in open '%s'" % path)
697         return ret
698
699     def close(self, fd):
700         self.require_state("mounted")
701         if not isinstance(fd, int):
702             raise TypeError('fd must be an int')
703         cdef int _fd = fd
704         with nogil:
705             ret = ceph_close(self.cluster, _fd)
706         if ret < 0:
707             raise make_ex(ret, "error in close")
708
709     def read(self, fd, offset, l):
710         self.require_state("mounted")
711         if not isinstance(offset, int):
712             raise TypeError('offset must be an int')
713         if not isinstance(l, int):
714             raise TypeError('l must be an int')
715         if not isinstance(fd, int):
716             raise TypeError('fd must be an int')
717         cdef:
718             int _fd = fd
719             int64_t _offset = offset
720             int64_t _length = l
721
722             char *ret_buf
723             PyObject* ret_s = NULL
724
725         ret_s = PyBytes_FromStringAndSize(NULL, _length)
726         try:
727             ret_buf = PyBytes_AsString(ret_s)
728             with nogil:
729                 ret = ceph_read(self.cluster, _fd, ret_buf, _length, _offset)
730             if ret < 0:
731                 raise make_ex(ret, "error in read")
732
733             if ret != _length:
734                 _PyBytes_Resize(&ret_s, ret)
735
736             return <object>ret_s
737         finally:
738             # We DECREF unconditionally: the cast to object above will have
739             # INCREFed if necessary. This also takes care of exceptions,
740             # including if _PyString_Resize fails (that will free the string
741             # itself and set ret_s to NULL, hence XDECREF).
742             ref.Py_XDECREF(ret_s)
743
744     def write(self, fd, buf, offset):
745         self.require_state("mounted")
746         if not isinstance(fd, int):
747             raise TypeError('fd must be an int')
748         if not isinstance(buf, bytes):
749             raise TypeError('buf must be a bytes')
750         if not isinstance(offset, int):
751             raise TypeError('offset must be an int')
752
753         cdef:
754             int _fd = fd
755             char *_data = buf
756             int64_t _offset = offset
757
758             size_t length = len(buf)
759
760         with nogil:
761             ret = ceph_write(self.cluster, _fd, _data, length, _offset)
762         if ret < 0:
763             raise make_ex(ret, "error in write")
764         return ret
765
766     def flock(self, fd, operation, owner):
767         self.require_state("mounted")
768         if not isinstance(fd, int):
769             raise TypeError('fd must be an int')
770         if not isinstance(operation, int):
771             raise TypeError('operation must be an int')
772
773         cdef:
774             int _fd = fd
775             int _op = operation
776             uint64_t _owner = owner
777
778         with nogil:
779             ret = ceph_flock(self.cluster, _fd, _op, _owner)
780         if ret < 0:
781             raise make_ex(ret, "error in write")
782         return ret
783
784     def getxattr(self, path, name, size=255):
785         self.require_state("mounted")
786
787         path = cstr(path, 'path')
788         name = cstr(name, 'name')
789
790         cdef:
791             char* _path = path
792             char* _name = name
793
794             size_t ret_length = size
795             char *ret_buf = NULL
796
797         try:
798             ret_buf = <char *>realloc_chk(ret_buf, ret_length)
799             with nogil:
800                 ret = ceph_getxattr(self.cluster, _path, _name, ret_buf,
801                                     ret_length)
802
803             if ret < 0:
804                 raise make_ex(ret, "error in getxattr")
805
806             return ret_buf[:ret]
807         finally:
808             free(ret_buf)
809
810     def setxattr(self, path, name, value, flags):
811         self.require_state("mounted")
812
813         name = cstr(name, 'name')
814         path = cstr(path, 'path')
815         if not isinstance(flags, int):
816             raise TypeError('flags must be a int')
817         if not isinstance(value, bytes):
818             raise TypeError('value must be a bytes')
819
820         cdef:
821             char *_path = path
822             char *_name = name
823             char *_value = value
824             size_t _value_len = len(value)
825             int _flags = flags
826
827         with nogil:
828             ret = ceph_setxattr(self.cluster, _path, _name,
829                                 _value, _value_len, _flags)
830         if ret < 0:
831             raise make_ex(ret, "error in setxattr")
832
833     def stat(self, path):
834         self.require_state("mounted")
835         path = cstr(path, 'path')
836
837         cdef:
838             char* _path = path
839             statx stx
840
841         with nogil:
842             # FIXME: replace magic number with CEPH_STATX_BASIC_STATS
843             ret = ceph_statx(self.cluster, _path, &stx, 0x7ffu, 0)
844         if ret < 0:
845             raise make_ex(ret, "error in stat: %s" % path)
846         return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
847                           st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
848                           st_uid=stx.stx_uid, st_gid=stx.stx_gid,
849                           st_rdev=stx.stx_rdev, st_size=stx.stx_size,
850                           st_blksize=stx.stx_blksize,
851                           st_blocks=stx.stx_blocks,
852                           st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
853                           st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
854                           st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
855
856     def fstat(self, fd):
857         self.require_state("mounted")
858         if not isinstance(fd, int):
859             raise TypeError('fd must be an int')
860
861         cdef:
862             int _fd = fd
863             statx stx
864
865         with nogil:
866             # FIXME: replace magic number with CEPH_STATX_BASIC_STATS
867             ret = ceph_fstatx(self.cluster, _fd, &stx, 0x7ffu, 0)
868         if ret < 0:
869             raise make_ex(ret, "error in fsat")
870         return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
871                           st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
872                           st_uid=stx.stx_uid, st_gid=stx.stx_gid,
873                           st_rdev=stx.stx_rdev, st_size=stx.stx_size,
874                           st_blksize=stx.stx_blksize,
875                           st_blocks=stx.stx_blocks,
876                           st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
877                           st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
878                           st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
879
880     def symlink(self, existing, newname):
881         self.require_state("mounted")
882         existing = cstr(existing, 'existing')
883         newname = cstr(newname, 'newname')
884         cdef:
885             char* _existing = existing
886             char* _newname = newname
887
888         with nogil:
889             ret = ceph_symlink(self.cluster, _existing, _newname)
890         if ret < 0:
891             raise make_ex(ret, "error in symlink")
892     
893     def link(self, existing, newname):
894         self.require_state("mounted")
895         existing = cstr(existing, 'existing')
896         newname = cstr(newname, 'newname')
897         cdef:
898             char* _existing = existing
899             char* _newname = newname
900         
901         with nogil:
902             ret = ceph_link(self.cluster, _existing, _newname)
903         if ret < 0:
904             raise make_ex(ret, "error in link")    
905     
906     def readlink(self, path, size):
907         self.require_state("mounted")
908         path = cstr(path, 'path')
909
910         cdef:
911             char* _path = path
912             int64_t _size = size
913             char *buf = NULL
914
915         try:
916             buf = <char *>realloc_chk(buf, _size)
917             with nogil:
918                 ret = ceph_readlink(self.cluster, _path, buf, _size)
919             if ret < 0:
920                 raise make_ex(ret, "error in readlink")
921             return buf
922         finally:
923             free(buf)
924
925     def unlink(self, path):
926         self.require_state("mounted")
927         path = cstr(path, 'path')
928         cdef char* _path = path
929         with nogil:
930             ret = ceph_unlink(self.cluster, _path)
931         if ret < 0:
932             raise make_ex(ret, "error in unlink: %s" % path)
933
934     def rename(self, src, dst):
935         self.require_state("mounted")
936
937         src = cstr(src, 'source')
938         dst = cstr(dst, 'destination')
939
940         cdef:
941             char* _src = src
942             char* _dst = dst
943
944         with nogil:
945             ret = ceph_rename(self.cluster, _src, _dst)
946         if ret < 0:
947             raise make_ex(ret, "error in rename '%s' to '%s'" % (src, dst))
948
949     def mds_command(self, mds_spec, args, input_data):
950         """
951         :return 3-tuple of output status int, output status string, output data
952         """
953         mds_spec = cstr(mds_spec, 'mds_spec')
954         args = cstr_list(args, 'args')
955         input_data = cstr(input_data, 'input_data')
956
957         cdef:
958             char *_mds_spec = opt_str(mds_spec)
959             char **_cmd = to_bytes_array(args)
960             size_t _cmdlen = len(args)
961
962             char *_inbuf = input_data
963             size_t _inbuf_len = len(input_data)
964
965             char *_outbuf
966             size_t _outbuf_len
967             char *_outs
968             size_t _outs_len
969
970         try:
971             with nogil:
972                 ret = ceph_mds_command(self.cluster, _mds_spec,
973                                        <const char **>_cmd, _cmdlen,
974                                        <const char*>_inbuf, _inbuf_len,
975                                        &_outbuf, &_outbuf_len,
976                                        &_outs, &_outs_len)
977             if ret == 0:
978                 my_outs = decode_cstr(_outs[:_outs_len])
979                 my_outbuf = _outbuf[:_outbuf_len]
980                 if _outs_len:
981                     ceph_buffer_free(_outs)
982                 if _outbuf_len:
983                     ceph_buffer_free(_outbuf)
984                 return (ret, my_outbuf, my_outs)
985             else:
986                 return (ret, b"", "")
987         finally:
988             free(_cmd)