remove ceph code
[stor4nfv.git] / src / ceph / src / pybind / ceph_volume_client.py
diff --git a/src/ceph/src/pybind/ceph_volume_client.py b/src/ceph/src/pybind/ceph_volume_client.py
deleted file mode 100644 (file)
index ac01807..0000000
+++ /dev/null
@@ -1,1399 +0,0 @@
-"""
-Copyright (C) 2015 Red Hat, Inc.
-
-LGPL2.  See file COPYING.
-"""
-
-from contextlib import contextmanager
-import errno
-import fcntl
-import json
-import logging
-import os
-import re
-import struct
-import sys
-import threading
-import time
-import uuid
-
-from ceph_argparse import json_command
-
-import cephfs
-import rados
-
-
-class RadosError(Exception):
-    """
-    Something went wrong talking to Ceph with librados
-    """
-    pass
-
-
-RADOS_TIMEOUT = 10
-
-log = logging.getLogger(__name__)
-
-
-# Reserved volume group name which we use in paths for volumes
-# that are not assigned to a group (i.e. created with group=None)
-NO_GROUP_NAME = "_nogroup"
-
-# Filename extensions for meta files.
-META_FILE_EXT = ".meta"
-
-class VolumePath(object):
-    """
-    Identify a volume's path as group->volume
-    The Volume ID is a unique identifier, but this is a much more
-    helpful thing to pass around.
-    """
-    def __init__(self, group_id, volume_id):
-        self.group_id = group_id
-        self.volume_id = volume_id
-        assert self.group_id != NO_GROUP_NAME
-        assert self.volume_id != "" and self.volume_id is not None
-
-    def __str__(self):
-        return "{0}/{1}".format(self.group_id, self.volume_id)
-
-
-class ClusterTimeout(Exception):
-    """
-    Exception indicating that we timed out trying to talk to the Ceph cluster,
-    either to the mons, or to any individual daemon that the mons indicate ought
-    to be up but isn't responding to us.
-    """
-    pass
-
-
-class ClusterError(Exception):
-    """
-    Exception indicating that the cluster returned an error to a command that
-    we thought should be successful based on our last knowledge of the cluster
-    state.
-    """
-    def __init__(self, action, result_code, result_str):
-        self._action = action
-        self._result_code = result_code
-        self._result_str = result_str
-
-    def __str__(self):
-        return "Error {0} (\"{1}\") while {2}".format(
-            self._result_code, self._result_str, self._action)
-
-
-class RankEvicter(threading.Thread):
-    """
-    Thread for evicting client(s) from a particular MDS daemon instance.
-
-    This is more complex than simply sending a command, because we have to
-    handle cases where MDS daemons might not be fully up yet, and/or might
-    be transiently unresponsive to commands.
-    """
-    class GidGone(Exception):
-        pass
-
-    POLL_PERIOD = 5
-
-    def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout):
-        """
-        :param client_spec: list of strings, used as filter arguments to "session evict"
-                            pass ["id=123"] to evict a single client with session id 123.
-        """
-        self.rank = rank
-        self.gid = gid
-        self._mds_map = mds_map
-        self._client_spec = client_spec
-        self._volume_client = volume_client
-        self._ready_timeout = ready_timeout
-        self._ready_waited = 0
-
-        self.success = False
-        self.exception = None
-
-        super(RankEvicter, self).__init__()
-
-    def _ready_to_evict(self):
-        if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
-            log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
-                self._client_spec, self.rank, self.gid
-            ))
-            raise RankEvicter.GidGone()
-
-        info = self._mds_map['info']["gid_{0}".format(self.gid)]
-        log.debug("_ready_to_evict: state={0}".format(info['state']))
-        return info['state'] in ["up:active", "up:clientreplay"]
-
-    def _wait_for_ready(self):
-        """
-        Wait for that MDS rank to reach an active or clientreplay state, and
-        not be laggy.
-        """
-        while not self._ready_to_evict():
-            if self._ready_waited > self._ready_timeout:
-                raise ClusterTimeout()
-
-            time.sleep(self.POLL_PERIOD)
-            self._ready_waited += self.POLL_PERIOD
-
-            self._mds_map = self._volume_client._rados_command("mds dump", {})
-
-    def _evict(self):
-        """
-        Run the eviction procedure.  Return true on success, false on errors.
-        """
-
-        # Wait til the MDS is believed by the mon to be available for commands
-        try:
-            self._wait_for_ready()
-        except self.GidGone:
-            return True
-
-        # Then send it an evict
-        ret = errno.ETIMEDOUT
-        while ret == errno.ETIMEDOUT:
-            log.debug("mds_command: {0}, {1}".format(
-                "%s" % self.gid, ["session", "evict"] + self._client_spec
-            ))
-            ret, outb, outs = self._volume_client.fs.mds_command(
-                "%s" % self.gid,
-                [json.dumps({
-                                "prefix": "session evict",
-                                "filters": self._client_spec
-                })], "")
-            log.debug("mds_command: complete {0} {1}".format(ret, outs))
-
-            # If we get a clean response, great, it's gone from that rank.
-            if ret == 0:
-                return True
-            elif ret == errno.ETIMEDOUT:
-                # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
-                self._mds_map = self._volume_client._rados_command("mds dump", {})
-                try:
-                    self._wait_for_ready()
-                except self.GidGone:
-                    return True
-            else:
-                raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
-
-    def run(self):
-        try:
-            self._evict()
-        except Exception as e:
-            self.success = False
-            self.exception = e
-        else:
-            self.success = True
-
-
-class EvictionError(Exception):
-    pass
-
-
-class CephFSVolumeClientError(Exception):
-    """
-    Something went wrong talking to Ceph using CephFSVolumeClient.
-    """
-    pass
-
-
-CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
-
-    CephFSVolumeClient Version History:
-
-    * 1 - Initial version
-    * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
-
-"""
-
-
-class CephFSVolumeClient(object):
-    """
-    Combine libcephfs and librados interfaces to implement a
-    'Volume' concept implemented as a cephfs directory and
-    client capabilities which restrict mount access to this
-    directory.
-
-    Additionally, volumes may be in a 'Group'.  Conveniently,
-    volumes are a lot like manila shares, and groups are a lot
-    like manila consistency groups.
-
-    Refer to volumes with VolumePath, which specifies the
-    volume and group IDs (both strings).  The group ID may
-    be None.
-
-    In general, functions in this class are allowed raise rados.Error
-    or cephfs.Error exceptions in unexpected situations.
-    """
-
-    # Current version
-    version = 2
-
-    # Where shall we create our volumes?
-    POOL_PREFIX = "fsvolume_"
-    DEFAULT_VOL_PREFIX = "/volumes"
-    DEFAULT_NS_PREFIX = "fsvolumens_"
-
-    def __init__(self, auth_id, conf_path, cluster_name, volume_prefix=None, pool_ns_prefix=None):
-        self.fs = None
-        self.rados = None
-        self.connected = False
-        self.conf_path = conf_path
-        self.cluster_name = cluster_name
-        self.auth_id = auth_id
-        self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
-        self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
-        # For flock'ing in cephfs, I want a unique ID to distinguish me
-        # from any other manila-share services that are loading this module.
-        # We could use pid, but that's unnecessary weak: generate a
-        # UUID
-        self._id = struct.unpack(">Q", uuid.uuid1().get_bytes()[0:8])[0]
-
-        # TODO: version the on-disk structures
-
-    def recover(self):
-        # Scan all auth keys to see if they're dirty: if they are, they have
-        # state that might not have propagated to Ceph or to the related
-        # volumes yet.
-
-        # Important: we *always* acquire locks in the order auth->volume
-        # That means a volume can never be dirty without the auth key
-        # we're updating it with being dirty at the same time.
-
-        # First list the auth IDs that have potentially dirty on-disk metadata
-        log.debug("Recovering from partial auth updates (if any)...")
-
-        try:
-            dir_handle = self.fs.opendir(self.volume_prefix)
-        except cephfs.ObjectNotFound:
-            log.debug("Nothing to recover. No auth meta files.")
-            return
-
-        d = self.fs.readdir(dir_handle)
-        auth_ids = []
-
-        if not d:
-            log.debug("Nothing to recover. No auth meta files.")
-
-        while d:
-            # Identify auth IDs from auth meta filenames. The auth meta files
-            # are named as, "$<auth_id><meta filename extension>"
-            regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT))
-            match = re.search(regex, d.d_name)
-            if match:
-                auth_ids.append(match.group(1))
-
-            d = self.fs.readdir(dir_handle)
-
-        self.fs.closedir(dir_handle)
-
-        # Key points based on ordering:
-        # * Anything added in VMeta is already added in AMeta
-        # * Anything added in Ceph is already added in VMeta
-        # * Anything removed in VMeta is already removed in Ceph
-        # * Anything removed in AMeta is already removed in VMeta
-
-        # Deauthorization: because I only update metadata AFTER the
-        # update of the next level down, I have the same ordering of
-        # -> things which exist in the AMeta should also exist
-        #    in the VMeta, should also exist in Ceph, and the same
-        #    recovery procedure that gets me consistent after crashes
-        #    during authorization will also work during deauthorization
-
-        # Now for each auth ID, check for dirty flag and apply updates
-        # if dirty flag is found
-        for auth_id in auth_ids:
-            with self._auth_lock(auth_id):
-                auth_meta = self._auth_metadata_get(auth_id)
-                if not auth_meta or not auth_meta['volumes']:
-                    # Clean up auth meta file
-                    self.fs.unlink(self._auth_metadata_path(auth_id))
-                    continue
-                if not auth_meta['dirty']:
-                    continue
-                self._recover_auth_meta(auth_id, auth_meta)
-
-        log.debug("Recovered from partial auth updates (if any).")
-
-    def _recover_auth_meta(self, auth_id, auth_meta):
-        """
-        Call me after locking the auth meta file.
-        """
-        remove_volumes = []
-
-        for volume, volume_data in auth_meta['volumes'].items():
-            if not volume_data['dirty']:
-                continue
-
-            (group_id, volume_id) = volume.split('/')
-            group_id = group_id if group_id is not 'None' else None
-            volume_path = VolumePath(group_id, volume_id)
-            access_level = volume_data['access_level']
-
-            with self._volume_lock(volume_path):
-                vol_meta = self._volume_metadata_get(volume_path)
-
-                # No VMeta update indicates that there was no auth update
-                # in Ceph either. So it's safe to remove corresponding
-                # partial update in AMeta.
-                if not vol_meta or auth_id not in vol_meta['auths']:
-                    remove_volumes.append(volume)
-                    continue
-
-                want_auth = {
-                    'access_level': access_level,
-                    'dirty': False,
-                }
-                # VMeta update looks clean. Ceph auth update must have been
-                # clean.
-                if vol_meta['auths'][auth_id] == want_auth:
-                    continue
-
-                readonly = True if access_level is 'r' else False
-                self._authorize_volume(volume_path, auth_id, readonly)
-
-            # Recovered from partial auth updates for the auth ID's access
-            # to a volume.
-            auth_meta['volumes'][volume]['dirty'] = False
-            self._auth_metadata_set(auth_id, auth_meta)
-
-        for volume in remove_volumes:
-            del auth_meta['volumes'][volume]
-
-        if not auth_meta['volumes']:
-            # Clean up auth meta file
-            self.fs.unlink(self._auth_metadata_path(auth_id))
-            return
-
-        # Recovered from all partial auth updates for the auth ID.
-        auth_meta['dirty'] = False
-        self._auth_metadata_set(auth_id, auth_meta)
-
-
-    def evict(self, auth_id, timeout=30, volume_path=None):
-        """
-        Evict all clients based on the authorization ID and optionally based on
-        the volume path mounted.  Assumes that the authorization key has been
-        revoked prior to calling this function.
-
-        This operation can throw an exception if the mon cluster is unresponsive, or
-        any individual MDS daemon is unresponsive for longer than the timeout passed in.
-        """
-
-        client_spec = ["auth_name={0}".format(auth_id), ]
-        if volume_path:
-            client_spec.append("client_metadata.root={0}".
-                               format(self._get_path(volume_path)))
-
-        log.info("evict clients with {0}".format(', '.join(client_spec)))
-
-        mds_map = self._rados_command("mds dump", {})
-
-        up = {}
-        for name, gid in mds_map['up'].items():
-            # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
-            assert name.startswith("mds_")
-            up[int(name[4:])] = gid
-
-        # For all MDS ranks held by a daemon
-        # Do the parallelism in python instead of using "tell mds.*", because
-        # the latter doesn't give us per-mds output
-        threads = []
-        for rank, gid in up.items():
-            thread = RankEvicter(self, client_spec, rank, gid, mds_map,
-                                 timeout)
-            thread.start()
-            threads.append(thread)
-
-        for t in threads:
-            t.join()
-
-        log.info("evict: joined all")
-
-        for t in threads:
-            if not t.success:
-                msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
-                       format(', '.join(client_spec), t.rank, t.gid, t.exception)
-                      )
-                log.error(msg)
-                raise EvictionError(msg)
-
-    def _get_path(self, volume_path):
-        """
-        Determine the path within CephFS where this volume will live
-        :return: absolute path (string)
-        """
-        return os.path.join(
-            self.volume_prefix,
-            volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME,
-            volume_path.volume_id)
-
-    def _get_group_path(self, group_id):
-        if group_id is None:
-            raise ValueError("group_id may not be None")
-
-        return os.path.join(
-            self.volume_prefix,
-            group_id
-        )
-
-    def connect(self, premount_evict = None):
-        """
-
-        :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
-                               may want to use this to specify their own auth ID if they expect
-                               to be a unique instance and don't want to wait for caps to time
-                               out after failure of another instance of themselves.
-        """
-        log.debug("Connecting to RADOS with config {0}...".format(self.conf_path))
-        self.rados = rados.Rados(
-            name="client.{0}".format(self.auth_id),
-            clustername=self.cluster_name,
-            conffile=self.conf_path,
-            conf={}
-        )
-        self.rados.connect()
-
-        log.debug("Connection to RADOS complete")
-
-        log.debug("Connecting to cephfs...")
-        self.fs = cephfs.LibCephFS(rados_inst=self.rados)
-        log.debug("CephFS initializing...")
-        self.fs.init()
-        if premount_evict is not None:
-            log.debug("Premount eviction of {0} starting".format(premount_evict))
-            self.evict(premount_evict)
-            log.debug("Premount eviction of {0} completes".format(premount_evict))
-        log.debug("CephFS mounting...")
-        self.fs.mount()
-        log.debug("Connection to cephfs complete")
-
-        # Recover from partial auth updates due to a previous
-        # crash.
-        self.recover()
-
-    def get_mon_addrs(self):
-        log.info("get_mon_addrs")
-        result = []
-        mon_map = self._rados_command("mon dump")
-        for mon in mon_map['mons']:
-            ip_port = mon['addr'].split("/")[0]
-            result.append(ip_port)
-
-        return result
-
-    def disconnect(self):
-        log.info("disconnect")
-        if self.fs:
-            log.debug("Disconnecting cephfs...")
-            self.fs.shutdown()
-            self.fs = None
-            log.debug("Disconnecting cephfs complete")
-
-        if self.rados:
-            log.debug("Disconnecting rados...")
-            self.rados.shutdown()
-            self.rados = None
-            log.debug("Disconnecting rados complete")
-
-    def __del__(self):
-        self.disconnect()
-
-    def _get_pool_id(self, osd_map, pool_name):
-        # Maybe borrow the OSDMap wrapper class from calamari if more helpers
-        # like this are needed.
-        for pool in osd_map['pools']:
-            if pool['pool_name'] == pool_name:
-                return pool['pool']
-
-        return None
-
-    def _create_volume_pool(self, pool_name):
-        """
-        Idempotently create a pool for use as a CephFS data pool, with the given name
-
-        :return The ID of the created pool
-        """
-        osd_map = self._rados_command('osd dump', {})
-
-        existing_id = self._get_pool_id(osd_map, pool_name)
-        if existing_id is not None:
-            log.info("Pool {0} already exists".format(pool_name))
-            return existing_id
-
-        osd_count = len(osd_map['osds'])
-
-        # We can't query the actual cluster config remotely, but since this is
-        # just a heuristic we'll assume that the ceph.conf we have locally reflects
-        # that in use in the rest of the cluster.
-        pg_warn_max_per_osd = int(self.rados.conf_get('mon_max_pg_per_osd'))
-
-        other_pgs = 0
-        for pool in osd_map['pools']:
-            if not pool['pool_name'].startswith(self.POOL_PREFIX):
-                other_pgs += pool['pg_num']
-
-        # A basic heuristic for picking pg_num: work out the max number of
-        # PGs we can have without tripping a warning, then subtract the number
-        # of PGs already created by non-manila pools, then divide by ten.  That'll
-        # give you a reasonable result on a system where you have "a few" manila
-        # shares.
-        pg_num = ((pg_warn_max_per_osd * osd_count) - other_pgs) / 10
-        # TODO Alternatively, respect an override set by the user.
-
-        self._rados_command(
-            'osd pool create',
-            {
-                'pool': pool_name,
-                'pg_num': pg_num
-            }
-        )
-
-        osd_map = self._rados_command('osd dump', {})
-        pool_id = self._get_pool_id(osd_map, pool_name)
-
-        if pool_id is None:
-            # If the pool isn't there, that's either a ceph bug, or it's some outside influence
-            # removing it right after we created it.
-            log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
-                pool_name, json.dumps(osd_map, indent=2)
-            ))
-            raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name))
-        else:
-            return pool_id
-
-    def create_group(self, group_id):
-        # Prevent craftily-named volume groups from colliding with the meta
-        # files.
-        if group_id.endswith(META_FILE_EXT):
-            raise ValueError("group ID cannot end with '{0}'.".format(
-                META_FILE_EXT))
-        path = self._get_group_path(group_id)
-        self._mkdir_p(path)
-
-    def destroy_group(self, group_id):
-        path = self._get_group_path(group_id)
-        try:
-            self.fs.stat(self.volume_prefix)
-        except cephfs.ObjectNotFound:
-            pass
-        else:
-            self.fs.rmdir(path)
-
-    def _mkdir_p(self, path):
-        try:
-            self.fs.stat(path)
-        except cephfs.ObjectNotFound:
-            pass
-        else:
-            return
-
-        parts = path.split(os.path.sep)
-
-        for i in range(1, len(parts) + 1):
-            subpath = os.path.join(*parts[0:i])
-            try:
-                self.fs.stat(subpath)
-            except cephfs.ObjectNotFound:
-                self.fs.mkdir(subpath, 0o755)
-
-    def create_volume(self, volume_path, size=None, data_isolated=False):
-        """
-        Set up metadata, pools and auth for a volume.
-
-        This function is idempotent.  It is safe to call this again
-        for an already-created volume, even if it is in use.
-
-        :param volume_path: VolumePath instance
-        :param size: In bytes, or None for no size limit
-        :param data_isolated: If true, create a separate OSD pool for this volume
-        :return:
-        """
-        path = self._get_path(volume_path)
-        log.info("create_volume: {0}".format(path))
-
-        self._mkdir_p(path)
-
-        if size is not None:
-            self.fs.setxattr(path, 'ceph.quota.max_bytes', size.__str__(), 0)
-
-        # data_isolated means create a separate pool for this volume
-        if data_isolated:
-            pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
-            log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
-            pool_id = self._create_volume_pool(pool_name)
-            mds_map = self._rados_command("mds dump", {})
-            if pool_id not in mds_map['data_pools']:
-                self._rados_command("mds add_data_pool", {
-                    'pool': pool_name
-                })
-            self.fs.setxattr(path, 'ceph.dir.layout.pool', pool_name, 0)
-
-        # enforce security isolation, use seperate namespace for this volume
-        namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id)
-        log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace))
-        self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace', namespace, 0)
-
-        # Create a volume meta file, if it does not already exist, to store
-        # data about auth ids having access to the volume
-        fd = self.fs.open(self._volume_metadata_path(volume_path),
-                          os.O_CREAT, 0o755)
-        self.fs.close(fd)
-
-        return {
-            'mount_path': path
-        }
-
-    def delete_volume(self, volume_path, data_isolated=False):
-        """
-        Make a volume inaccessible to guests.  This function is
-        idempotent.  This is the fast part of tearing down a volume: you must
-        also later call purge_volume, which is the slow part.
-
-        :param volume_path: Same identifier used in create_volume
-        :return:
-        """
-
-        path = self._get_path(volume_path)
-        log.info("delete_volume: {0}".format(path))
-
-        # Create the trash folder if it doesn't already exist
-        trash = os.path.join(self.volume_prefix, "_deleting")
-        self._mkdir_p(trash)
-
-        # We'll move it to here
-        trashed_volume = os.path.join(trash, volume_path.volume_id)
-
-        # Move the volume's data to the trash folder
-        try:
-            self.fs.stat(path)
-        except cephfs.ObjectNotFound:
-            log.warning("Trying to delete volume '{0}' but it's already gone".format(
-                path))
-        else:
-            self.fs.rename(path, trashed_volume)
-
-        # Delete the volume meta file, if it's not already deleted
-        vol_meta_path = self._volume_metadata_path(volume_path)
-        try:
-            self.fs.unlink(vol_meta_path)
-        except cephfs.ObjectNotFound:
-            pass
-
-    def purge_volume(self, volume_path, data_isolated=False):
-        """
-        Finish clearing up a volume that was previously passed to delete_volume.  This
-        function is idempotent.
-        """
-
-        trash = os.path.join(self.volume_prefix, "_deleting")
-        trashed_volume = os.path.join(trash, volume_path.volume_id)
-
-        try:
-            self.fs.stat(trashed_volume)
-        except cephfs.ObjectNotFound:
-            log.warning("Trying to purge volume '{0}' but it's already been purged".format(
-                trashed_volume))
-            return
-
-        def rmtree(root_path):
-            log.debug("rmtree {0}".format(root_path))
-            dir_handle = self.fs.opendir(root_path)
-            d = self.fs.readdir(dir_handle)
-            while d:
-                if d.d_name not in [".", ".."]:
-                    # Do not use os.path.join because it is sensitive
-                    # to string encoding, we just pass through dnames
-                    # as byte arrays
-                    d_full = "{0}/{1}".format(root_path, d.d_name)
-                    if d.is_dir():
-                        rmtree(d_full)
-                    else:
-                        self.fs.unlink(d_full)
-
-                d = self.fs.readdir(dir_handle)
-            self.fs.closedir(dir_handle)
-
-            self.fs.rmdir(root_path)
-
-        rmtree(trashed_volume)
-
-        if data_isolated:
-            pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
-            osd_map = self._rados_command("osd dump", {})
-            pool_id = self._get_pool_id(osd_map, pool_name)
-            mds_map = self._rados_command("mds dump", {})
-            if pool_id in mds_map['data_pools']:
-                self._rados_command("mds remove_data_pool", {
-                    'pool': pool_name
-                })
-            self._rados_command("osd pool delete",
-                                {
-                                    "pool": pool_name,
-                                    "pool2": pool_name,
-                                    "sure": "--yes-i-really-really-mean-it"
-                                })
-
-    def _get_ancestor_xattr(self, path, attr):
-        """
-        Helper for reading layout information: if this xattr is missing
-        on the requested path, keep checking parents until we find it.
-        """
-        try:
-            result = self.fs.getxattr(path, attr)
-            if result == "":
-                # Annoying!  cephfs gives us empty instead of an error when attr not found
-                raise cephfs.NoData()
-            else:
-                return result
-        except cephfs.NoData:
-            if path == "/":
-                raise
-            else:
-                return self._get_ancestor_xattr(os.path.split(path)[0], attr)
-
-    def _check_compat_version(self, compat_version):
-        if self.version < compat_version:
-            msg = ("The current version of CephFSVolumeClient, version {0} "
-                   "does not support the required feature. Need version {1} "
-                   "or greater".format(self.version, compat_version)
-                  )
-            log.error(msg)
-            raise CephFSVolumeClientError(msg)
-
-    def _metadata_get(self, path):
-        """
-        Return a deserialized JSON object, or None
-        """
-        fd = self.fs.open(path, "r")
-        # TODO iterate instead of assuming file < 4MB
-        read_bytes = self.fs.read(fd, 0, 4096 * 1024)
-        self.fs.close(fd)
-        if read_bytes:
-            return json.loads(read_bytes)
-        else:
-            return None
-
-    def _metadata_set(self, path, data):
-        serialized = json.dumps(data)
-        fd = self.fs.open(path, "w")
-        try:
-            self.fs.write(fd, serialized, 0)
-            self.fs.fsync(fd, 0)
-        finally:
-            self.fs.close(fd)
-
-    def _lock(self, path):
-        @contextmanager
-        def fn():
-            while(1):
-                fd = self.fs.open(path, os.O_CREAT, 0o755)
-                self.fs.flock(fd, fcntl.LOCK_EX, self._id)
-
-                # The locked file will be cleaned up sometime. It could be
-                # unlinked e.g., by an another manila share instance, before
-                # lock was applied on it. Perform checks to ensure that this
-                # does not happen.
-                try:
-                    statbuf = self.fs.stat(path)
-                except cephfs.ObjectNotFound:
-                    self.fs.close(fd)
-                    continue
-
-                fstatbuf = self.fs.fstat(fd)
-                if statbuf.st_ino == fstatbuf.st_ino:
-                    break
-
-            try:
-                yield
-            finally:
-                self.fs.flock(fd, fcntl.LOCK_UN, self._id)
-                self.fs.close(fd)
-
-        return fn()
-
-    def _auth_metadata_path(self, auth_id):
-        return os.path.join(self.volume_prefix, "${0}{1}".format(
-            auth_id, META_FILE_EXT))
-
-    def _auth_lock(self, auth_id):
-        return self._lock(self._auth_metadata_path(auth_id))
-
-    def _auth_metadata_get(self, auth_id):
-        """
-        Call me with the metadata locked!
-
-        Check whether a auth metadata structure can be decoded by the current
-        version of CephFSVolumeClient.
-
-        Return auth metadata that the current version of CephFSVolumeClient
-        can decode.
-        """
-        auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
-
-        if auth_metadata:
-            self._check_compat_version(auth_metadata['compat_version'])
-
-        return auth_metadata
-
-    def _auth_metadata_set(self, auth_id, data):
-        """
-        Call me with the metadata locked!
-
-        Fsync the auth metadata.
-
-        Add two version attributes to the auth metadata,
-        'compat_version', the minimum CephFSVolumeClient version that can
-        decode the metadata, and 'version', the CephFSVolumeClient version
-        that encoded the metadata.
-        """
-        data['compat_version'] = 1
-        data['version'] = self.version
-        return self._metadata_set(self._auth_metadata_path(auth_id), data)
-
-    def _volume_metadata_path(self, volume_path):
-        return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
-            volume_path.group_id if volume_path.group_id else "",
-            volume_path.volume_id,
-            META_FILE_EXT
-        ))
-
-    def _volume_lock(self, volume_path):
-        """
-        Return a ContextManager which locks the authorization metadata for
-        a particular volume, and persists a flag to the metadata indicating
-        that it is currently locked, so that we can detect dirty situations
-        during recovery.
-
-        This lock isn't just to make access to the metadata safe: it's also
-        designed to be used over the two-step process of checking the
-        metadata and then responding to an authorization request, to
-        ensure that at the point we respond the metadata hasn't changed
-        in the background.  It's key to how we avoid security holes
-        resulting from races during that problem ,
-        """
-        return self._lock(self._volume_metadata_path(volume_path))
-
-    def _volume_metadata_get(self, volume_path):
-        """
-        Call me with the metadata locked!
-
-        Check whether a volume metadata structure can be decoded by the current
-        version of CephFSVolumeClient.
-
-        Return a volume_metadata structure that the current version of
-        CephFSVolumeClient can decode.
-        """
-        volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path))
-
-        if volume_metadata:
-            self._check_compat_version(volume_metadata['compat_version'])
-
-        return volume_metadata
-
-    def _volume_metadata_set(self, volume_path, data):
-        """
-        Call me with the metadata locked!
-
-        Add two version attributes to the volume metadata,
-        'compat_version', the minimum CephFSVolumeClient version that can
-        decode the metadata and 'version', the CephFSVolumeClient version
-        that encoded the metadata.
-        """
-        data['compat_version'] = 1
-        data['version'] = self.version
-        return self._metadata_set(self._volume_metadata_path(volume_path), data)
-
-    def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None):
-        """
-        Get-or-create a Ceph auth identity for `auth_id` and grant them access
-        to
-        :param volume_path:
-        :param auth_id:
-        :param readonly:
-        :param tenant_id: Optionally provide a stringizable object to
-                          restrict any created cephx IDs to other callers
-                          passing the same tenant ID.
-        :return:
-        """
-
-        with self._auth_lock(auth_id):
-            # Existing meta, or None, to be updated
-            auth_meta = self._auth_metadata_get(auth_id)
-
-            # volume data to be inserted
-            volume_path_str = str(volume_path)
-            volume = {
-                volume_path_str : {
-                    # The access level at which the auth_id is authorized to
-                    # access the volume.
-                    'access_level': 'r' if readonly else 'rw',
-                    'dirty': True,
-                }
-            }
-            if auth_meta is None:
-                sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
-                    auth_id, tenant_id
-                ))
-                log.debug("Authorize: no existing meta")
-                auth_meta = {
-                    'dirty': True,
-                    'tenant_id': tenant_id.__str__() if tenant_id else None,
-                    'volumes': volume
-                }
-
-                # Note: this is *not* guaranteeing that the key doesn't already
-                # exist in Ceph: we are allowing VolumeClient tenants to
-                # 'claim' existing Ceph keys.  In order to prevent VolumeClient
-                # tenants from reading e.g. client.admin keys, you need to
-                # have configured your VolumeClient user (e.g. Manila) to
-                # have mon auth caps that prevent it from accessing those keys
-                # (e.g. limit it to only access keys with a manila.* prefix)
-            else:
-                # Disallow tenants to share auth IDs
-                if auth_meta['tenant_id'].__str__() != tenant_id.__str__():
-                    msg = "auth ID: {0} is already in use".format(auth_id)
-                    log.error(msg)
-                    raise CephFSVolumeClientError(msg)
-
-                if auth_meta['dirty']:
-                    self._recover_auth_meta(auth_id, auth_meta)
-
-                log.debug("Authorize: existing tenant {tenant}".format(
-                    tenant=auth_meta['tenant_id']
-                ))
-                auth_meta['dirty'] = True
-                auth_meta['volumes'].update(volume)
-
-            self._auth_metadata_set(auth_id, auth_meta)
-
-            with self._volume_lock(volume_path):
-                key = self._authorize_volume(volume_path, auth_id, readonly)
-
-            auth_meta['dirty'] = False
-            auth_meta['volumes'][volume_path_str]['dirty'] = False
-            self._auth_metadata_set(auth_id, auth_meta)
-
-            if tenant_id:
-                return {
-                    'auth_key': key
-                }
-            else:
-                # Caller wasn't multi-tenant aware: be safe and don't give
-                # them a key
-                return {
-                    'auth_key': None
-                }
-
-    def _authorize_volume(self, volume_path, auth_id, readonly):
-        vol_meta = self._volume_metadata_get(volume_path)
-
-        access_level = 'r' if readonly else 'rw'
-        auth = {
-            auth_id: {
-                'access_level': access_level,
-                'dirty': True,
-            }
-        }
-
-        if vol_meta is None:
-            vol_meta = {
-                'auths': auth
-            }
-        else:
-            vol_meta['auths'].update(auth)
-            self._volume_metadata_set(volume_path, vol_meta)
-
-        key = self._authorize_ceph(volume_path, auth_id, readonly)
-
-        vol_meta['auths'][auth_id]['dirty'] = False
-        self._volume_metadata_set(volume_path, vol_meta)
-
-        return key
-
-    def _authorize_ceph(self, volume_path, auth_id, readonly):
-        path = self._get_path(volume_path)
-        log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
-            auth_id, path
-        ))
-
-        # First I need to work out what the data pool is for this share:
-        # read the layout
-        pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
-        namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
-
-        # Now construct auth capabilities that give the guest just enough
-        # permissions to access the share
-        client_entity = "client.{0}".format(auth_id)
-        want_access_level = 'r' if readonly else 'rw'
-        want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path)
-        want_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
-            want_access_level, pool_name, namespace)
-
-        try:
-            existing = self._rados_command(
-                'auth get',
-                {
-                    'entity': client_entity
-                }
-            )
-            # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
-        except rados.Error:
-            caps = self._rados_command(
-                'auth get-or-create',
-                {
-                    'entity': client_entity,
-                    'caps': [
-                        'mds', want_mds_cap,
-                        'osd', want_osd_cap,
-                        'mon', 'allow r']
-                })
-        else:
-            # entity exists, update it
-            cap = existing[0]
-
-            # Construct auth caps that if present might conflict with the desired
-            # auth caps.
-            unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw'
-            unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path)
-            unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
-                unwanted_access_level, pool_name, namespace)
-
-            def cap_update(orig, want, unwanted):
-                # Updates the existing auth caps such that there is a single
-                # occurrence of wanted auth caps and no occurrence of
-                # conflicting auth caps.
-
-                if not orig:
-                    return want
-
-                cap_tokens = set(orig.split(","))
-
-                cap_tokens.discard(unwanted)
-                cap_tokens.add(want)
-
-                return ",".join(cap_tokens)
-
-            osd_cap_str = cap_update(cap['caps'].get('osd', ""), want_osd_cap, unwanted_osd_cap)
-            mds_cap_str = cap_update(cap['caps'].get('mds', ""), want_mds_cap, unwanted_mds_cap)
-
-            caps = self._rados_command(
-                'auth caps',
-                {
-                    'entity': client_entity,
-                    'caps': [
-                        'mds', mds_cap_str,
-                        'osd', osd_cap_str,
-                        'mon', cap['caps'].get('mon', 'allow r')]
-                })
-            caps = self._rados_command(
-                'auth get',
-                {
-                    'entity': client_entity
-                }
-            )
-
-        # Result expected like this:
-        # [
-        #     {
-        #         "entity": "client.foobar",
-        #         "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
-        #         "caps": {
-        #             "mds": "allow *",
-        #             "mon": "allow *"
-        #         }
-        #     }
-        # ]
-        assert len(caps) == 1
-        assert caps[0]['entity'] == client_entity
-        return caps[0]['key']
-
-    def deauthorize(self, volume_path, auth_id):
-        with self._auth_lock(auth_id):
-            # Existing meta, or None, to be updated
-            auth_meta = self._auth_metadata_get(auth_id)
-
-            volume_path_str = str(volume_path)
-            if (auth_meta is None) or (not auth_meta['volumes']):
-                log.warn("deauthorized called for already-removed auth"
-                         "ID '{auth_id}' for volume ID '{volume}'".format(
-                    auth_id=auth_id, volume=volume_path.volume_id
-                ))
-                # Clean up the auth meta file of an auth ID
-                self.fs.unlink(self._auth_metadata_path(auth_id))
-                return
-
-            if volume_path_str not in auth_meta['volumes']:
-                log.warn("deauthorized called for already-removed auth"
-                         "ID '{auth_id}' for volume ID '{volume}'".format(
-                    auth_id=auth_id, volume=volume_path.volume_id
-                ))
-                return
-
-            if auth_meta['dirty']:
-                self._recover_auth_meta(auth_id, auth_meta)
-
-            auth_meta['dirty'] = True
-            auth_meta['volumes'][volume_path_str]['dirty'] = True
-            self._auth_metadata_set(auth_id, auth_meta)
-
-            self._deauthorize_volume(volume_path, auth_id)
-
-            # Filter out the volume we're deauthorizing
-            del auth_meta['volumes'][volume_path_str]
-
-            # Clean up auth meta file
-            if not auth_meta['volumes']:
-                self.fs.unlink(self._auth_metadata_path(auth_id))
-                return
-
-            auth_meta['dirty'] = False
-            self._auth_metadata_set(auth_id, auth_meta)
-
-    def _deauthorize_volume(self, volume_path, auth_id):
-        with self._volume_lock(volume_path):
-            vol_meta = self._volume_metadata_get(volume_path)
-
-            if (vol_meta is None) or (auth_id not in vol_meta['auths']):
-                log.warn("deauthorized called for already-removed auth"
-                         "ID '{auth_id}' for volume ID '{volume}'".format(
-                    auth_id=auth_id, volume=volume_path.volume_id
-                ))
-                return
-
-            vol_meta['auths'][auth_id]['dirty'] = True
-            self._volume_metadata_set(volume_path, vol_meta)
-
-            self._deauthorize(volume_path, auth_id)
-
-            # Remove the auth_id from the metadata *after* removing it
-            # from ceph, so that if we crashed here, we would actually
-            # recreate the auth ID during recovery (i.e. end up with
-            # a consistent state).
-
-            # Filter out the auth we're removing
-            del vol_meta['auths'][auth_id]
-            self._volume_metadata_set(volume_path, vol_meta)
-
-    def _deauthorize(self, volume_path, auth_id):
-        """
-        The volume must still exist.
-        """
-        client_entity = "client.{0}".format(auth_id)
-        path = self._get_path(volume_path)
-        pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
-        namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
-
-        # The auth_id might have read-only or read-write mount access for the
-        # volume path.
-        access_levels = ('r', 'rw')
-        want_mds_caps = {'allow {0} path={1}'.format(access_level, path)
-                         for access_level in access_levels}
-        want_osd_caps = {'allow {0} pool={1} namespace={2}'.format(
-                         access_level, pool_name, namespace)
-                         for access_level in access_levels}
-
-        try:
-            existing = self._rados_command(
-                'auth get',
-                {
-                    'entity': client_entity
-                }
-            )
-
-            def cap_remove(orig, want):
-                cap_tokens = set(orig.split(","))
-                return ",".join(cap_tokens.difference(want))
-
-            cap = existing[0]
-            osd_cap_str = cap_remove(cap['caps'].get('osd', ""), want_osd_caps)
-            mds_cap_str = cap_remove(cap['caps'].get('mds', ""), want_mds_caps)
-            if (not osd_cap_str) and (not mds_cap_str):
-                self._rados_command('auth del', {'entity': client_entity}, decode=False)
-            else:
-                self._rados_command(
-                    'auth caps',
-                    {
-                        'entity': client_entity,
-                        'caps': [
-                            'mds', mds_cap_str,
-                            'osd', osd_cap_str,
-                            'mon', cap['caps'].get('mon', 'allow r')]
-                    })
-
-        # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
-        except rados.Error:
-            # Already gone, great.
-            return
-
-    def get_authorized_ids(self, volume_path):
-        """
-        Expose a list of auth IDs that have access to a volume.
-
-        return: a list of (auth_id, access_level) tuples, where
-                the access_level can be 'r' , or 'rw'.
-                None if no auth ID is given access to the volume.
-        """
-        with self._volume_lock(volume_path):
-            meta = self._volume_metadata_get(volume_path)
-            auths = []
-            if not meta or not meta['auths']:
-                return None
-
-            for auth, auth_data in meta['auths'].items():
-                # Skip partial auth updates.
-                if not auth_data['dirty']:
-                    auths.append((auth, auth_data['access_level']))
-
-            return auths
-
-    def _rados_command(self, prefix, args=None, decode=True):
-        """
-        Safer wrapper for ceph_argparse.json_command, which raises
-        Error exception instead of relying on caller to check return
-        codes.
-
-        Error exception can result from:
-        * Timeout
-        * Actual legitimate errors
-        * Malformed JSON output
-
-        return: Decoded object from ceph, or None if empty string returned.
-                If decode is False, return a string (the data returned by
-                ceph command)
-        """
-        if args is None:
-            args = {}
-
-        argdict = args.copy()
-        argdict['format'] = 'json'
-
-        ret, outbuf, outs = json_command(self.rados,
-                                         prefix=prefix,
-                                         argdict=argdict,
-                                         timeout=RADOS_TIMEOUT)
-        if ret != 0:
-            raise rados.Error(outs)
-        else:
-            if decode:
-                if outbuf:
-                    try:
-                        return json.loads(outbuf)
-                    except (ValueError, TypeError):
-                        raise RadosError("Invalid JSON output for command {0}".format(argdict))
-                else:
-                    return None
-            else:
-                return outbuf
-
-    def get_used_bytes(self, volume_path):
-        return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir.rbytes"))
-
-    def set_max_bytes(self, volume_path, max_bytes):
-        self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes',
-                         max_bytes.__str__() if max_bytes is not None else "0",
-                         0)
-
-    def _snapshot_path(self, dir_path, snapshot_name):
-        return os.path.join(
-            dir_path, self.rados.conf_get('client_snapdir'), snapshot_name
-        )
-
-    def _snapshot_create(self, dir_path, snapshot_name):
-        # TODO: raise intelligible exception for clusters where snaps are disabled
-        self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), 0o755)
-
-    def _snapshot_destroy(self, dir_path, snapshot_name):
-        """
-        Remove a snapshot, or do nothing if it already doesn't exist.
-        """
-        try:
-            self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name))
-        except cephfs.ObjectNotFound:
-            log.warn("Snapshot was already gone: {0}".format(snapshot_name))
-
-    def create_snapshot_volume(self, volume_path, snapshot_name):
-        self._snapshot_create(self._get_path(volume_path), snapshot_name)
-
-    def destroy_snapshot_volume(self, volume_path, snapshot_name):
-        self._snapshot_destroy(self._get_path(volume_path), snapshot_name)
-
-    def create_snapshot_group(self, group_id, snapshot_name):
-        if group_id is None:
-            raise RuntimeError("Group ID may not be None")
-
-        return self._snapshot_create(self._get_group_path(group_id), snapshot_name)
-
-    def destroy_snapshot_group(self, group_id, snapshot_name):
-        if group_id is None:
-            raise RuntimeError("Group ID may not be None")
-        if snapshot_name is None:
-            raise RuntimeError("Snapshot name may not be None")
-
-        return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name)
-
-    def _cp_r(self, src, dst):
-        # TODO
-        raise NotImplementedError()
-
-    def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name):
-        dest_fs_path = self._get_path(dest_volume_path)
-        src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name)
-
-        self._cp_r(src_snapshot_path, dest_fs_path)
-
-    def put_object(self, pool_name, object_name, data):
-        """
-        Synchronously write data to an object.
-
-        :param pool_name: name of the pool
-        :type pool_name: str
-        :param object_name: name of the object
-        :type object_name: str
-        :param data: data to write
-        :type data: bytes
-        """
-        ioctx = self.rados.open_ioctx(pool_name)
-        max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
-        if len(data) > max_size:
-            msg = ("Data to be written to object '{0}' exceeds "
-                   "{1} bytes".format(object_name, max_size))
-            log.error(msg)
-            raise CephFSVolumeClientError(msg)
-        try:
-            ioctx.write_full(object_name, data)
-        finally:
-            ioctx.close()
-
-    def get_object(self, pool_name, object_name):
-        """
-        Synchronously read data from object.
-
-        :param pool_name: name of the pool
-        :type pool_name: str
-        :param object_name: name of the object
-        :type object_name: str
-
-        :returns: bytes - data read from object
-        """
-        ioctx = self.rados.open_ioctx(pool_name)
-        max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
-        try:
-            bytes_read = ioctx.read(object_name, max_size)
-            if ((len(bytes_read) == max_size) and
-                    (ioctx.read(object_name, 1, offset=max_size))):
-                log.warning("Size of object {0} exceeds '{1}' bytes "
-                            "read".format(object_name, max_size))
-        finally:
-            ioctx.close()
-        return bytes_read
-
-    def delete_object(self, pool_name, object_name):
-        ioctx = self.rados.open_ioctx(pool_name)
-        try:
-            ioctx.remove_object(object_name)
-        except rados.ObjectNotFound:
-            log.warn("Object '{0}' was already removed".format(object_name))
-        finally:
-            ioctx.close()