X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fpybind%2Fceph_volume_client.py;fp=src%2Fceph%2Fsrc%2Fpybind%2Fceph_volume_client.py;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=ac01807b69362a3b4c927e1d021b165ecf6b529b;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/pybind/ceph_volume_client.py b/src/ceph/src/pybind/ceph_volume_client.py deleted file mode 100644 index ac01807..0000000 --- a/src/ceph/src/pybind/ceph_volume_client.py +++ /dev/null @@ -1,1399 +0,0 @@ -""" -Copyright (C) 2015 Red Hat, Inc. - -LGPL2. See file COPYING. -""" - -from contextlib import contextmanager -import errno -import fcntl -import json -import logging -import os -import re -import struct -import sys -import threading -import time -import uuid - -from ceph_argparse import json_command - -import cephfs -import rados - - -class RadosError(Exception): - """ - Something went wrong talking to Ceph with librados - """ - pass - - -RADOS_TIMEOUT = 10 - -log = logging.getLogger(__name__) - - -# Reserved volume group name which we use in paths for volumes -# that are not assigned to a group (i.e. created with group=None) -NO_GROUP_NAME = "_nogroup" - -# Filename extensions for meta files. -META_FILE_EXT = ".meta" - -class VolumePath(object): - """ - Identify a volume's path as group->volume - The Volume ID is a unique identifier, but this is a much more - helpful thing to pass around. - """ - def __init__(self, group_id, volume_id): - self.group_id = group_id - self.volume_id = volume_id - assert self.group_id != NO_GROUP_NAME - assert self.volume_id != "" and self.volume_id is not None - - def __str__(self): - return "{0}/{1}".format(self.group_id, self.volume_id) - - -class ClusterTimeout(Exception): - """ - Exception indicating that we timed out trying to talk to the Ceph cluster, - either to the mons, or to any individual daemon that the mons indicate ought - to be up but isn't responding to us. - """ - pass - - -class ClusterError(Exception): - """ - Exception indicating that the cluster returned an error to a command that - we thought should be successful based on our last knowledge of the cluster - state. - """ - def __init__(self, action, result_code, result_str): - self._action = action - self._result_code = result_code - self._result_str = result_str - - def __str__(self): - return "Error {0} (\"{1}\") while {2}".format( - self._result_code, self._result_str, self._action) - - -class RankEvicter(threading.Thread): - """ - Thread for evicting client(s) from a particular MDS daemon instance. - - This is more complex than simply sending a command, because we have to - handle cases where MDS daemons might not be fully up yet, and/or might - be transiently unresponsive to commands. - """ - class GidGone(Exception): - pass - - POLL_PERIOD = 5 - - def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout): - """ - :param client_spec: list of strings, used as filter arguments to "session evict" - pass ["id=123"] to evict a single client with session id 123. - """ - self.rank = rank - self.gid = gid - self._mds_map = mds_map - self._client_spec = client_spec - self._volume_client = volume_client - self._ready_timeout = ready_timeout - self._ready_waited = 0 - - self.success = False - self.exception = None - - super(RankEvicter, self).__init__() - - def _ready_to_evict(self): - if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid: - log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format( - self._client_spec, self.rank, self.gid - )) - raise RankEvicter.GidGone() - - info = self._mds_map['info']["gid_{0}".format(self.gid)] - log.debug("_ready_to_evict: state={0}".format(info['state'])) - return info['state'] in ["up:active", "up:clientreplay"] - - def _wait_for_ready(self): - """ - Wait for that MDS rank to reach an active or clientreplay state, and - not be laggy. - """ - while not self._ready_to_evict(): - if self._ready_waited > self._ready_timeout: - raise ClusterTimeout() - - time.sleep(self.POLL_PERIOD) - self._ready_waited += self.POLL_PERIOD - - self._mds_map = self._volume_client._rados_command("mds dump", {}) - - def _evict(self): - """ - Run the eviction procedure. Return true on success, false on errors. - """ - - # Wait til the MDS is believed by the mon to be available for commands - try: - self._wait_for_ready() - except self.GidGone: - return True - - # Then send it an evict - ret = errno.ETIMEDOUT - while ret == errno.ETIMEDOUT: - log.debug("mds_command: {0}, {1}".format( - "%s" % self.gid, ["session", "evict"] + self._client_spec - )) - ret, outb, outs = self._volume_client.fs.mds_command( - "%s" % self.gid, - [json.dumps({ - "prefix": "session evict", - "filters": self._client_spec - })], "") - log.debug("mds_command: complete {0} {1}".format(ret, outs)) - - # If we get a clean response, great, it's gone from that rank. - if ret == 0: - return True - elif ret == errno.ETIMEDOUT: - # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error) - self._mds_map = self._volume_client._rados_command("mds dump", {}) - try: - self._wait_for_ready() - except self.GidGone: - return True - else: - raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs) - - def run(self): - try: - self._evict() - except Exception as e: - self.success = False - self.exception = e - else: - self.success = True - - -class EvictionError(Exception): - pass - - -class CephFSVolumeClientError(Exception): - """ - Something went wrong talking to Ceph using CephFSVolumeClient. - """ - pass - - -CEPHFSVOLUMECLIENT_VERSION_HISTORY = """ - - CephFSVolumeClient Version History: - - * 1 - Initial version - * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient - -""" - - -class CephFSVolumeClient(object): - """ - Combine libcephfs and librados interfaces to implement a - 'Volume' concept implemented as a cephfs directory and - client capabilities which restrict mount access to this - directory. - - Additionally, volumes may be in a 'Group'. Conveniently, - volumes are a lot like manila shares, and groups are a lot - like manila consistency groups. - - Refer to volumes with VolumePath, which specifies the - volume and group IDs (both strings). The group ID may - be None. - - In general, functions in this class are allowed raise rados.Error - or cephfs.Error exceptions in unexpected situations. - """ - - # Current version - version = 2 - - # Where shall we create our volumes? - POOL_PREFIX = "fsvolume_" - DEFAULT_VOL_PREFIX = "/volumes" - DEFAULT_NS_PREFIX = "fsvolumens_" - - def __init__(self, auth_id, conf_path, cluster_name, volume_prefix=None, pool_ns_prefix=None): - self.fs = None - self.rados = None - self.connected = False - self.conf_path = conf_path - self.cluster_name = cluster_name - self.auth_id = auth_id - self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX - self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX - # For flock'ing in cephfs, I want a unique ID to distinguish me - # from any other manila-share services that are loading this module. - # We could use pid, but that's unnecessary weak: generate a - # UUID - self._id = struct.unpack(">Q", uuid.uuid1().get_bytes()[0:8])[0] - - # TODO: version the on-disk structures - - def recover(self): - # Scan all auth keys to see if they're dirty: if they are, they have - # state that might not have propagated to Ceph or to the related - # volumes yet. - - # Important: we *always* acquire locks in the order auth->volume - # That means a volume can never be dirty without the auth key - # we're updating it with being dirty at the same time. - - # First list the auth IDs that have potentially dirty on-disk metadata - log.debug("Recovering from partial auth updates (if any)...") - - try: - dir_handle = self.fs.opendir(self.volume_prefix) - except cephfs.ObjectNotFound: - log.debug("Nothing to recover. No auth meta files.") - return - - d = self.fs.readdir(dir_handle) - auth_ids = [] - - if not d: - log.debug("Nothing to recover. No auth meta files.") - - while d: - # Identify auth IDs from auth meta filenames. The auth meta files - # are named as, "$" - regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT)) - match = re.search(regex, d.d_name) - if match: - auth_ids.append(match.group(1)) - - d = self.fs.readdir(dir_handle) - - self.fs.closedir(dir_handle) - - # Key points based on ordering: - # * Anything added in VMeta is already added in AMeta - # * Anything added in Ceph is already added in VMeta - # * Anything removed in VMeta is already removed in Ceph - # * Anything removed in AMeta is already removed in VMeta - - # Deauthorization: because I only update metadata AFTER the - # update of the next level down, I have the same ordering of - # -> things which exist in the AMeta should also exist - # in the VMeta, should also exist in Ceph, and the same - # recovery procedure that gets me consistent after crashes - # during authorization will also work during deauthorization - - # Now for each auth ID, check for dirty flag and apply updates - # if dirty flag is found - for auth_id in auth_ids: - with self._auth_lock(auth_id): - auth_meta = self._auth_metadata_get(auth_id) - if not auth_meta or not auth_meta['volumes']: - # Clean up auth meta file - self.fs.unlink(self._auth_metadata_path(auth_id)) - continue - if not auth_meta['dirty']: - continue - self._recover_auth_meta(auth_id, auth_meta) - - log.debug("Recovered from partial auth updates (if any).") - - def _recover_auth_meta(self, auth_id, auth_meta): - """ - Call me after locking the auth meta file. - """ - remove_volumes = [] - - for volume, volume_data in auth_meta['volumes'].items(): - if not volume_data['dirty']: - continue - - (group_id, volume_id) = volume.split('/') - group_id = group_id if group_id is not 'None' else None - volume_path = VolumePath(group_id, volume_id) - access_level = volume_data['access_level'] - - with self._volume_lock(volume_path): - vol_meta = self._volume_metadata_get(volume_path) - - # No VMeta update indicates that there was no auth update - # in Ceph either. So it's safe to remove corresponding - # partial update in AMeta. - if not vol_meta or auth_id not in vol_meta['auths']: - remove_volumes.append(volume) - continue - - want_auth = { - 'access_level': access_level, - 'dirty': False, - } - # VMeta update looks clean. Ceph auth update must have been - # clean. - if vol_meta['auths'][auth_id] == want_auth: - continue - - readonly = True if access_level is 'r' else False - self._authorize_volume(volume_path, auth_id, readonly) - - # Recovered from partial auth updates for the auth ID's access - # to a volume. - auth_meta['volumes'][volume]['dirty'] = False - self._auth_metadata_set(auth_id, auth_meta) - - for volume in remove_volumes: - del auth_meta['volumes'][volume] - - if not auth_meta['volumes']: - # Clean up auth meta file - self.fs.unlink(self._auth_metadata_path(auth_id)) - return - - # Recovered from all partial auth updates for the auth ID. - auth_meta['dirty'] = False - self._auth_metadata_set(auth_id, auth_meta) - - - def evict(self, auth_id, timeout=30, volume_path=None): - """ - Evict all clients based on the authorization ID and optionally based on - the volume path mounted. Assumes that the authorization key has been - revoked prior to calling this function. - - This operation can throw an exception if the mon cluster is unresponsive, or - any individual MDS daemon is unresponsive for longer than the timeout passed in. - """ - - client_spec = ["auth_name={0}".format(auth_id), ] - if volume_path: - client_spec.append("client_metadata.root={0}". - format(self._get_path(volume_path))) - - log.info("evict clients with {0}".format(', '.join(client_spec))) - - mds_map = self._rados_command("mds dump", {}) - - up = {} - for name, gid in mds_map['up'].items(): - # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0" - assert name.startswith("mds_") - up[int(name[4:])] = gid - - # For all MDS ranks held by a daemon - # Do the parallelism in python instead of using "tell mds.*", because - # the latter doesn't give us per-mds output - threads = [] - for rank, gid in up.items(): - thread = RankEvicter(self, client_spec, rank, gid, mds_map, - timeout) - thread.start() - threads.append(thread) - - for t in threads: - t.join() - - log.info("evict: joined all") - - for t in threads: - if not t.success: - msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}". - format(', '.join(client_spec), t.rank, t.gid, t.exception) - ) - log.error(msg) - raise EvictionError(msg) - - def _get_path(self, volume_path): - """ - Determine the path within CephFS where this volume will live - :return: absolute path (string) - """ - return os.path.join( - self.volume_prefix, - volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME, - volume_path.volume_id) - - def _get_group_path(self, group_id): - if group_id is None: - raise ValueError("group_id may not be None") - - return os.path.join( - self.volume_prefix, - group_id - ) - - def connect(self, premount_evict = None): - """ - - :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers - may want to use this to specify their own auth ID if they expect - to be a unique instance and don't want to wait for caps to time - out after failure of another instance of themselves. - """ - log.debug("Connecting to RADOS with config {0}...".format(self.conf_path)) - self.rados = rados.Rados( - name="client.{0}".format(self.auth_id), - clustername=self.cluster_name, - conffile=self.conf_path, - conf={} - ) - self.rados.connect() - - log.debug("Connection to RADOS complete") - - log.debug("Connecting to cephfs...") - self.fs = cephfs.LibCephFS(rados_inst=self.rados) - log.debug("CephFS initializing...") - self.fs.init() - if premount_evict is not None: - log.debug("Premount eviction of {0} starting".format(premount_evict)) - self.evict(premount_evict) - log.debug("Premount eviction of {0} completes".format(premount_evict)) - log.debug("CephFS mounting...") - self.fs.mount() - log.debug("Connection to cephfs complete") - - # Recover from partial auth updates due to a previous - # crash. - self.recover() - - def get_mon_addrs(self): - log.info("get_mon_addrs") - result = [] - mon_map = self._rados_command("mon dump") - for mon in mon_map['mons']: - ip_port = mon['addr'].split("/")[0] - result.append(ip_port) - - return result - - def disconnect(self): - log.info("disconnect") - if self.fs: - log.debug("Disconnecting cephfs...") - self.fs.shutdown() - self.fs = None - log.debug("Disconnecting cephfs complete") - - if self.rados: - log.debug("Disconnecting rados...") - self.rados.shutdown() - self.rados = None - log.debug("Disconnecting rados complete") - - def __del__(self): - self.disconnect() - - def _get_pool_id(self, osd_map, pool_name): - # Maybe borrow the OSDMap wrapper class from calamari if more helpers - # like this are needed. - for pool in osd_map['pools']: - if pool['pool_name'] == pool_name: - return pool['pool'] - - return None - - def _create_volume_pool(self, pool_name): - """ - Idempotently create a pool for use as a CephFS data pool, with the given name - - :return The ID of the created pool - """ - osd_map = self._rados_command('osd dump', {}) - - existing_id = self._get_pool_id(osd_map, pool_name) - if existing_id is not None: - log.info("Pool {0} already exists".format(pool_name)) - return existing_id - - osd_count = len(osd_map['osds']) - - # We can't query the actual cluster config remotely, but since this is - # just a heuristic we'll assume that the ceph.conf we have locally reflects - # that in use in the rest of the cluster. - pg_warn_max_per_osd = int(self.rados.conf_get('mon_max_pg_per_osd')) - - other_pgs = 0 - for pool in osd_map['pools']: - if not pool['pool_name'].startswith(self.POOL_PREFIX): - other_pgs += pool['pg_num'] - - # A basic heuristic for picking pg_num: work out the max number of - # PGs we can have without tripping a warning, then subtract the number - # of PGs already created by non-manila pools, then divide by ten. That'll - # give you a reasonable result on a system where you have "a few" manila - # shares. - pg_num = ((pg_warn_max_per_osd * osd_count) - other_pgs) / 10 - # TODO Alternatively, respect an override set by the user. - - self._rados_command( - 'osd pool create', - { - 'pool': pool_name, - 'pg_num': pg_num - } - ) - - osd_map = self._rados_command('osd dump', {}) - pool_id = self._get_pool_id(osd_map, pool_name) - - if pool_id is None: - # If the pool isn't there, that's either a ceph bug, or it's some outside influence - # removing it right after we created it. - log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format( - pool_name, json.dumps(osd_map, indent=2) - )) - raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name)) - else: - return pool_id - - def create_group(self, group_id): - # Prevent craftily-named volume groups from colliding with the meta - # files. - if group_id.endswith(META_FILE_EXT): - raise ValueError("group ID cannot end with '{0}'.".format( - META_FILE_EXT)) - path = self._get_group_path(group_id) - self._mkdir_p(path) - - def destroy_group(self, group_id): - path = self._get_group_path(group_id) - try: - self.fs.stat(self.volume_prefix) - except cephfs.ObjectNotFound: - pass - else: - self.fs.rmdir(path) - - def _mkdir_p(self, path): - try: - self.fs.stat(path) - except cephfs.ObjectNotFound: - pass - else: - return - - parts = path.split(os.path.sep) - - for i in range(1, len(parts) + 1): - subpath = os.path.join(*parts[0:i]) - try: - self.fs.stat(subpath) - except cephfs.ObjectNotFound: - self.fs.mkdir(subpath, 0o755) - - def create_volume(self, volume_path, size=None, data_isolated=False): - """ - Set up metadata, pools and auth for a volume. - - This function is idempotent. It is safe to call this again - for an already-created volume, even if it is in use. - - :param volume_path: VolumePath instance - :param size: In bytes, or None for no size limit - :param data_isolated: If true, create a separate OSD pool for this volume - :return: - """ - path = self._get_path(volume_path) - log.info("create_volume: {0}".format(path)) - - self._mkdir_p(path) - - if size is not None: - self.fs.setxattr(path, 'ceph.quota.max_bytes', size.__str__(), 0) - - # data_isolated means create a separate pool for this volume - if data_isolated: - pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) - log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name)) - pool_id = self._create_volume_pool(pool_name) - mds_map = self._rados_command("mds dump", {}) - if pool_id not in mds_map['data_pools']: - self._rados_command("mds add_data_pool", { - 'pool': pool_name - }) - self.fs.setxattr(path, 'ceph.dir.layout.pool', pool_name, 0) - - # enforce security isolation, use seperate namespace for this volume - namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id) - log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace)) - self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace', namespace, 0) - - # Create a volume meta file, if it does not already exist, to store - # data about auth ids having access to the volume - fd = self.fs.open(self._volume_metadata_path(volume_path), - os.O_CREAT, 0o755) - self.fs.close(fd) - - return { - 'mount_path': path - } - - def delete_volume(self, volume_path, data_isolated=False): - """ - Make a volume inaccessible to guests. This function is - idempotent. This is the fast part of tearing down a volume: you must - also later call purge_volume, which is the slow part. - - :param volume_path: Same identifier used in create_volume - :return: - """ - - path = self._get_path(volume_path) - log.info("delete_volume: {0}".format(path)) - - # Create the trash folder if it doesn't already exist - trash = os.path.join(self.volume_prefix, "_deleting") - self._mkdir_p(trash) - - # We'll move it to here - trashed_volume = os.path.join(trash, volume_path.volume_id) - - # Move the volume's data to the trash folder - try: - self.fs.stat(path) - except cephfs.ObjectNotFound: - log.warning("Trying to delete volume '{0}' but it's already gone".format( - path)) - else: - self.fs.rename(path, trashed_volume) - - # Delete the volume meta file, if it's not already deleted - vol_meta_path = self._volume_metadata_path(volume_path) - try: - self.fs.unlink(vol_meta_path) - except cephfs.ObjectNotFound: - pass - - def purge_volume(self, volume_path, data_isolated=False): - """ - Finish clearing up a volume that was previously passed to delete_volume. This - function is idempotent. - """ - - trash = os.path.join(self.volume_prefix, "_deleting") - trashed_volume = os.path.join(trash, volume_path.volume_id) - - try: - self.fs.stat(trashed_volume) - except cephfs.ObjectNotFound: - log.warning("Trying to purge volume '{0}' but it's already been purged".format( - trashed_volume)) - return - - def rmtree(root_path): - log.debug("rmtree {0}".format(root_path)) - dir_handle = self.fs.opendir(root_path) - d = self.fs.readdir(dir_handle) - while d: - if d.d_name not in [".", ".."]: - # Do not use os.path.join because it is sensitive - # to string encoding, we just pass through dnames - # as byte arrays - d_full = "{0}/{1}".format(root_path, d.d_name) - if d.is_dir(): - rmtree(d_full) - else: - self.fs.unlink(d_full) - - d = self.fs.readdir(dir_handle) - self.fs.closedir(dir_handle) - - self.fs.rmdir(root_path) - - rmtree(trashed_volume) - - if data_isolated: - pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) - osd_map = self._rados_command("osd dump", {}) - pool_id = self._get_pool_id(osd_map, pool_name) - mds_map = self._rados_command("mds dump", {}) - if pool_id in mds_map['data_pools']: - self._rados_command("mds remove_data_pool", { - 'pool': pool_name - }) - self._rados_command("osd pool delete", - { - "pool": pool_name, - "pool2": pool_name, - "sure": "--yes-i-really-really-mean-it" - }) - - def _get_ancestor_xattr(self, path, attr): - """ - Helper for reading layout information: if this xattr is missing - on the requested path, keep checking parents until we find it. - """ - try: - result = self.fs.getxattr(path, attr) - if result == "": - # Annoying! cephfs gives us empty instead of an error when attr not found - raise cephfs.NoData() - else: - return result - except cephfs.NoData: - if path == "/": - raise - else: - return self._get_ancestor_xattr(os.path.split(path)[0], attr) - - def _check_compat_version(self, compat_version): - if self.version < compat_version: - msg = ("The current version of CephFSVolumeClient, version {0} " - "does not support the required feature. Need version {1} " - "or greater".format(self.version, compat_version) - ) - log.error(msg) - raise CephFSVolumeClientError(msg) - - def _metadata_get(self, path): - """ - Return a deserialized JSON object, or None - """ - fd = self.fs.open(path, "r") - # TODO iterate instead of assuming file < 4MB - read_bytes = self.fs.read(fd, 0, 4096 * 1024) - self.fs.close(fd) - if read_bytes: - return json.loads(read_bytes) - else: - return None - - def _metadata_set(self, path, data): - serialized = json.dumps(data) - fd = self.fs.open(path, "w") - try: - self.fs.write(fd, serialized, 0) - self.fs.fsync(fd, 0) - finally: - self.fs.close(fd) - - def _lock(self, path): - @contextmanager - def fn(): - while(1): - fd = self.fs.open(path, os.O_CREAT, 0o755) - self.fs.flock(fd, fcntl.LOCK_EX, self._id) - - # The locked file will be cleaned up sometime. It could be - # unlinked e.g., by an another manila share instance, before - # lock was applied on it. Perform checks to ensure that this - # does not happen. - try: - statbuf = self.fs.stat(path) - except cephfs.ObjectNotFound: - self.fs.close(fd) - continue - - fstatbuf = self.fs.fstat(fd) - if statbuf.st_ino == fstatbuf.st_ino: - break - - try: - yield - finally: - self.fs.flock(fd, fcntl.LOCK_UN, self._id) - self.fs.close(fd) - - return fn() - - def _auth_metadata_path(self, auth_id): - return os.path.join(self.volume_prefix, "${0}{1}".format( - auth_id, META_FILE_EXT)) - - def _auth_lock(self, auth_id): - return self._lock(self._auth_metadata_path(auth_id)) - - def _auth_metadata_get(self, auth_id): - """ - Call me with the metadata locked! - - Check whether a auth metadata structure can be decoded by the current - version of CephFSVolumeClient. - - Return auth metadata that the current version of CephFSVolumeClient - can decode. - """ - auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id)) - - if auth_metadata: - self._check_compat_version(auth_metadata['compat_version']) - - return auth_metadata - - def _auth_metadata_set(self, auth_id, data): - """ - Call me with the metadata locked! - - Fsync the auth metadata. - - Add two version attributes to the auth metadata, - 'compat_version', the minimum CephFSVolumeClient version that can - decode the metadata, and 'version', the CephFSVolumeClient version - that encoded the metadata. - """ - data['compat_version'] = 1 - data['version'] = self.version - return self._metadata_set(self._auth_metadata_path(auth_id), data) - - def _volume_metadata_path(self, volume_path): - return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format( - volume_path.group_id if volume_path.group_id else "", - volume_path.volume_id, - META_FILE_EXT - )) - - def _volume_lock(self, volume_path): - """ - Return a ContextManager which locks the authorization metadata for - a particular volume, and persists a flag to the metadata indicating - that it is currently locked, so that we can detect dirty situations - during recovery. - - This lock isn't just to make access to the metadata safe: it's also - designed to be used over the two-step process of checking the - metadata and then responding to an authorization request, to - ensure that at the point we respond the metadata hasn't changed - in the background. It's key to how we avoid security holes - resulting from races during that problem , - """ - return self._lock(self._volume_metadata_path(volume_path)) - - def _volume_metadata_get(self, volume_path): - """ - Call me with the metadata locked! - - Check whether a volume metadata structure can be decoded by the current - version of CephFSVolumeClient. - - Return a volume_metadata structure that the current version of - CephFSVolumeClient can decode. - """ - volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path)) - - if volume_metadata: - self._check_compat_version(volume_metadata['compat_version']) - - return volume_metadata - - def _volume_metadata_set(self, volume_path, data): - """ - Call me with the metadata locked! - - Add two version attributes to the volume metadata, - 'compat_version', the minimum CephFSVolumeClient version that can - decode the metadata and 'version', the CephFSVolumeClient version - that encoded the metadata. - """ - data['compat_version'] = 1 - data['version'] = self.version - return self._metadata_set(self._volume_metadata_path(volume_path), data) - - def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None): - """ - Get-or-create a Ceph auth identity for `auth_id` and grant them access - to - :param volume_path: - :param auth_id: - :param readonly: - :param tenant_id: Optionally provide a stringizable object to - restrict any created cephx IDs to other callers - passing the same tenant ID. - :return: - """ - - with self._auth_lock(auth_id): - # Existing meta, or None, to be updated - auth_meta = self._auth_metadata_get(auth_id) - - # volume data to be inserted - volume_path_str = str(volume_path) - volume = { - volume_path_str : { - # The access level at which the auth_id is authorized to - # access the volume. - 'access_level': 'r' if readonly else 'rw', - 'dirty': True, - } - } - if auth_meta is None: - sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format( - auth_id, tenant_id - )) - log.debug("Authorize: no existing meta") - auth_meta = { - 'dirty': True, - 'tenant_id': tenant_id.__str__() if tenant_id else None, - 'volumes': volume - } - - # Note: this is *not* guaranteeing that the key doesn't already - # exist in Ceph: we are allowing VolumeClient tenants to - # 'claim' existing Ceph keys. In order to prevent VolumeClient - # tenants from reading e.g. client.admin keys, you need to - # have configured your VolumeClient user (e.g. Manila) to - # have mon auth caps that prevent it from accessing those keys - # (e.g. limit it to only access keys with a manila.* prefix) - else: - # Disallow tenants to share auth IDs - if auth_meta['tenant_id'].__str__() != tenant_id.__str__(): - msg = "auth ID: {0} is already in use".format(auth_id) - log.error(msg) - raise CephFSVolumeClientError(msg) - - if auth_meta['dirty']: - self._recover_auth_meta(auth_id, auth_meta) - - log.debug("Authorize: existing tenant {tenant}".format( - tenant=auth_meta['tenant_id'] - )) - auth_meta['dirty'] = True - auth_meta['volumes'].update(volume) - - self._auth_metadata_set(auth_id, auth_meta) - - with self._volume_lock(volume_path): - key = self._authorize_volume(volume_path, auth_id, readonly) - - auth_meta['dirty'] = False - auth_meta['volumes'][volume_path_str]['dirty'] = False - self._auth_metadata_set(auth_id, auth_meta) - - if tenant_id: - return { - 'auth_key': key - } - else: - # Caller wasn't multi-tenant aware: be safe and don't give - # them a key - return { - 'auth_key': None - } - - def _authorize_volume(self, volume_path, auth_id, readonly): - vol_meta = self._volume_metadata_get(volume_path) - - access_level = 'r' if readonly else 'rw' - auth = { - auth_id: { - 'access_level': access_level, - 'dirty': True, - } - } - - if vol_meta is None: - vol_meta = { - 'auths': auth - } - else: - vol_meta['auths'].update(auth) - self._volume_metadata_set(volume_path, vol_meta) - - key = self._authorize_ceph(volume_path, auth_id, readonly) - - vol_meta['auths'][auth_id]['dirty'] = False - self._volume_metadata_set(volume_path, vol_meta) - - return key - - def _authorize_ceph(self, volume_path, auth_id, readonly): - path = self._get_path(volume_path) - log.debug("Authorizing Ceph id '{0}' for path '{1}'".format( - auth_id, path - )) - - # First I need to work out what the data pool is for this share: - # read the layout - pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") - namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace") - - # Now construct auth capabilities that give the guest just enough - # permissions to access the share - client_entity = "client.{0}".format(auth_id) - want_access_level = 'r' if readonly else 'rw' - want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path) - want_osd_cap = 'allow {0} pool={1} namespace={2}'.format( - want_access_level, pool_name, namespace) - - try: - existing = self._rados_command( - 'auth get', - { - 'entity': client_entity - } - ) - # FIXME: rados raising Error instead of ObjectNotFound in auth get failure - except rados.Error: - caps = self._rados_command( - 'auth get-or-create', - { - 'entity': client_entity, - 'caps': [ - 'mds', want_mds_cap, - 'osd', want_osd_cap, - 'mon', 'allow r'] - }) - else: - # entity exists, update it - cap = existing[0] - - # Construct auth caps that if present might conflict with the desired - # auth caps. - unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw' - unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path) - unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format( - unwanted_access_level, pool_name, namespace) - - def cap_update(orig, want, unwanted): - # Updates the existing auth caps such that there is a single - # occurrence of wanted auth caps and no occurrence of - # conflicting auth caps. - - if not orig: - return want - - cap_tokens = set(orig.split(",")) - - cap_tokens.discard(unwanted) - cap_tokens.add(want) - - return ",".join(cap_tokens) - - osd_cap_str = cap_update(cap['caps'].get('osd', ""), want_osd_cap, unwanted_osd_cap) - mds_cap_str = cap_update(cap['caps'].get('mds', ""), want_mds_cap, unwanted_mds_cap) - - caps = self._rados_command( - 'auth caps', - { - 'entity': client_entity, - 'caps': [ - 'mds', mds_cap_str, - 'osd', osd_cap_str, - 'mon', cap['caps'].get('mon', 'allow r')] - }) - caps = self._rados_command( - 'auth get', - { - 'entity': client_entity - } - ) - - # Result expected like this: - # [ - # { - # "entity": "client.foobar", - # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==", - # "caps": { - # "mds": "allow *", - # "mon": "allow *" - # } - # } - # ] - assert len(caps) == 1 - assert caps[0]['entity'] == client_entity - return caps[0]['key'] - - def deauthorize(self, volume_path, auth_id): - with self._auth_lock(auth_id): - # Existing meta, or None, to be updated - auth_meta = self._auth_metadata_get(auth_id) - - volume_path_str = str(volume_path) - if (auth_meta is None) or (not auth_meta['volumes']): - log.warn("deauthorized called for already-removed auth" - "ID '{auth_id}' for volume ID '{volume}'".format( - auth_id=auth_id, volume=volume_path.volume_id - )) - # Clean up the auth meta file of an auth ID - self.fs.unlink(self._auth_metadata_path(auth_id)) - return - - if volume_path_str not in auth_meta['volumes']: - log.warn("deauthorized called for already-removed auth" - "ID '{auth_id}' for volume ID '{volume}'".format( - auth_id=auth_id, volume=volume_path.volume_id - )) - return - - if auth_meta['dirty']: - self._recover_auth_meta(auth_id, auth_meta) - - auth_meta['dirty'] = True - auth_meta['volumes'][volume_path_str]['dirty'] = True - self._auth_metadata_set(auth_id, auth_meta) - - self._deauthorize_volume(volume_path, auth_id) - - # Filter out the volume we're deauthorizing - del auth_meta['volumes'][volume_path_str] - - # Clean up auth meta file - if not auth_meta['volumes']: - self.fs.unlink(self._auth_metadata_path(auth_id)) - return - - auth_meta['dirty'] = False - self._auth_metadata_set(auth_id, auth_meta) - - def _deauthorize_volume(self, volume_path, auth_id): - with self._volume_lock(volume_path): - vol_meta = self._volume_metadata_get(volume_path) - - if (vol_meta is None) or (auth_id not in vol_meta['auths']): - log.warn("deauthorized called for already-removed auth" - "ID '{auth_id}' for volume ID '{volume}'".format( - auth_id=auth_id, volume=volume_path.volume_id - )) - return - - vol_meta['auths'][auth_id]['dirty'] = True - self._volume_metadata_set(volume_path, vol_meta) - - self._deauthorize(volume_path, auth_id) - - # Remove the auth_id from the metadata *after* removing it - # from ceph, so that if we crashed here, we would actually - # recreate the auth ID during recovery (i.e. end up with - # a consistent state). - - # Filter out the auth we're removing - del vol_meta['auths'][auth_id] - self._volume_metadata_set(volume_path, vol_meta) - - def _deauthorize(self, volume_path, auth_id): - """ - The volume must still exist. - """ - client_entity = "client.{0}".format(auth_id) - path = self._get_path(volume_path) - pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") - namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace") - - # The auth_id might have read-only or read-write mount access for the - # volume path. - access_levels = ('r', 'rw') - want_mds_caps = {'allow {0} path={1}'.format(access_level, path) - for access_level in access_levels} - want_osd_caps = {'allow {0} pool={1} namespace={2}'.format( - access_level, pool_name, namespace) - for access_level in access_levels} - - try: - existing = self._rados_command( - 'auth get', - { - 'entity': client_entity - } - ) - - def cap_remove(orig, want): - cap_tokens = set(orig.split(",")) - return ",".join(cap_tokens.difference(want)) - - cap = existing[0] - osd_cap_str = cap_remove(cap['caps'].get('osd', ""), want_osd_caps) - mds_cap_str = cap_remove(cap['caps'].get('mds', ""), want_mds_caps) - if (not osd_cap_str) and (not mds_cap_str): - self._rados_command('auth del', {'entity': client_entity}, decode=False) - else: - self._rados_command( - 'auth caps', - { - 'entity': client_entity, - 'caps': [ - 'mds', mds_cap_str, - 'osd', osd_cap_str, - 'mon', cap['caps'].get('mon', 'allow r')] - }) - - # FIXME: rados raising Error instead of ObjectNotFound in auth get failure - except rados.Error: - # Already gone, great. - return - - def get_authorized_ids(self, volume_path): - """ - Expose a list of auth IDs that have access to a volume. - - return: a list of (auth_id, access_level) tuples, where - the access_level can be 'r' , or 'rw'. - None if no auth ID is given access to the volume. - """ - with self._volume_lock(volume_path): - meta = self._volume_metadata_get(volume_path) - auths = [] - if not meta or not meta['auths']: - return None - - for auth, auth_data in meta['auths'].items(): - # Skip partial auth updates. - if not auth_data['dirty']: - auths.append((auth, auth_data['access_level'])) - - return auths - - def _rados_command(self, prefix, args=None, decode=True): - """ - Safer wrapper for ceph_argparse.json_command, which raises - Error exception instead of relying on caller to check return - codes. - - Error exception can result from: - * Timeout - * Actual legitimate errors - * Malformed JSON output - - return: Decoded object from ceph, or None if empty string returned. - If decode is False, return a string (the data returned by - ceph command) - """ - if args is None: - args = {} - - argdict = args.copy() - argdict['format'] = 'json' - - ret, outbuf, outs = json_command(self.rados, - prefix=prefix, - argdict=argdict, - timeout=RADOS_TIMEOUT) - if ret != 0: - raise rados.Error(outs) - else: - if decode: - if outbuf: - try: - return json.loads(outbuf) - except (ValueError, TypeError): - raise RadosError("Invalid JSON output for command {0}".format(argdict)) - else: - return None - else: - return outbuf - - def get_used_bytes(self, volume_path): - return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir.rbytes")) - - def set_max_bytes(self, volume_path, max_bytes): - self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes', - max_bytes.__str__() if max_bytes is not None else "0", - 0) - - def _snapshot_path(self, dir_path, snapshot_name): - return os.path.join( - dir_path, self.rados.conf_get('client_snapdir'), snapshot_name - ) - - def _snapshot_create(self, dir_path, snapshot_name): - # TODO: raise intelligible exception for clusters where snaps are disabled - self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), 0o755) - - def _snapshot_destroy(self, dir_path, snapshot_name): - """ - Remove a snapshot, or do nothing if it already doesn't exist. - """ - try: - self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name)) - except cephfs.ObjectNotFound: - log.warn("Snapshot was already gone: {0}".format(snapshot_name)) - - def create_snapshot_volume(self, volume_path, snapshot_name): - self._snapshot_create(self._get_path(volume_path), snapshot_name) - - def destroy_snapshot_volume(self, volume_path, snapshot_name): - self._snapshot_destroy(self._get_path(volume_path), snapshot_name) - - def create_snapshot_group(self, group_id, snapshot_name): - if group_id is None: - raise RuntimeError("Group ID may not be None") - - return self._snapshot_create(self._get_group_path(group_id), snapshot_name) - - def destroy_snapshot_group(self, group_id, snapshot_name): - if group_id is None: - raise RuntimeError("Group ID may not be None") - if snapshot_name is None: - raise RuntimeError("Snapshot name may not be None") - - return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name) - - def _cp_r(self, src, dst): - # TODO - raise NotImplementedError() - - def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name): - dest_fs_path = self._get_path(dest_volume_path) - src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name) - - self._cp_r(src_snapshot_path, dest_fs_path) - - def put_object(self, pool_name, object_name, data): - """ - Synchronously write data to an object. - - :param pool_name: name of the pool - :type pool_name: str - :param object_name: name of the object - :type object_name: str - :param data: data to write - :type data: bytes - """ - ioctx = self.rados.open_ioctx(pool_name) - max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 - if len(data) > max_size: - msg = ("Data to be written to object '{0}' exceeds " - "{1} bytes".format(object_name, max_size)) - log.error(msg) - raise CephFSVolumeClientError(msg) - try: - ioctx.write_full(object_name, data) - finally: - ioctx.close() - - def get_object(self, pool_name, object_name): - """ - Synchronously read data from object. - - :param pool_name: name of the pool - :type pool_name: str - :param object_name: name of the object - :type object_name: str - - :returns: bytes - data read from object - """ - ioctx = self.rados.open_ioctx(pool_name) - max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 - try: - bytes_read = ioctx.read(object_name, max_size) - if ((len(bytes_read) == max_size) and - (ioctx.read(object_name, 1, offset=max_size))): - log.warning("Size of object {0} exceeds '{1}' bytes " - "read".format(object_name, max_size)) - finally: - ioctx.close() - return bytes_read - - def delete_object(self, pool_name, object_name): - ioctx = self.rados.open_ioctx(pool_name) - try: - ioctx.remove_object(object_name) - except rados.ObjectNotFound: - log.warn("Object '{0}' was already removed".format(object_name)) - finally: - ioctx.close()