+++ /dev/null
-import errno
-import os
-import pwd
-import platform
-import tempfile
-import uuid
-from ceph_volume import process
-from . import as_string
-
-
-# TODO: get these out of here and into a common area for others to consume
-if platform.system() == 'FreeBSD':
- FREEBSD = True
- DEFAULT_FS_TYPE = 'zfs'
- PROCDIR = '/compat/linux/proc'
- # FreeBSD does not have blockdevices any more
- BLOCKDIR = '/dev'
- ROOTGROUP = 'wheel'
-else:
- FREEBSD = False
- DEFAULT_FS_TYPE = 'xfs'
- PROCDIR = '/proc'
- BLOCKDIR = '/sys/block'
- ROOTGROUP = 'root'
-
-
-def generate_uuid():
- return str(uuid.uuid4())
-
-
-def get_ceph_user_ids():
- """
- Return the id and gid of the ceph user
- """
- try:
- user = pwd.getpwnam('ceph')
- except KeyError:
- # is this even possible?
- raise RuntimeError('"ceph" user is not available in the current system')
- return user[2], user[3]
-
-
-def mkdir_p(path, chown=True):
- """
- A `mkdir -p` that defaults to chown the path to the ceph user
- """
- try:
- os.mkdir(path)
- except OSError as e:
- if e.errno == errno.EEXIST:
- pass
- else:
- raise
- if chown:
- uid, gid = get_ceph_user_ids()
- os.chown(path, uid, gid)
-
-
-def chown(path, recursive=True):
- """
- ``chown`` a path to the ceph user (uid and guid fetched at runtime)
- """
- uid, gid = get_ceph_user_ids()
- if os.path.islink(path):
- path = os.path.realpath(path)
- if recursive:
- process.run(['chown', '-R', 'ceph:ceph', path])
- else:
- os.chown(path, uid, gid)
-
-
-def is_binary(path):
- """
- Detect if a file path is a binary or not. Will falsely report as binary
- when utf-16 encoded. In the ceph universe there is no such risk (yet)
- """
- with open(path, 'rb') as fp:
- contents = fp.read(8192)
- if b'\x00' in contents: # a null byte may signal binary
- return True
- return False
-
-
-class tmp_mount(object):
- """
- Temporarily mount a device on a temporary directory,
- and unmount it upon exit
- """
-
- def __init__(self, device):
- self.device = device
- self.path = None
-
- def __enter__(self):
- self.path = tempfile.mkdtemp()
- process.run([
- 'mount',
- '-v',
- self.device,
- self.path
- ])
- return self.path
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- process.run([
- 'umount',
- '-v',
- self.path
- ])
-
-
-def path_is_mounted(path, destination=None):
- """
- Check if the given path is mounted
- """
- mounts = get_mounts(paths=True)
- realpath = os.path.realpath(path)
- mounted_locations = mounts.get(realpath, [])
-
- if destination:
- if destination.startswith('/'):
- destination = os.path.realpath(destination)
- return destination in mounted_locations
- return mounted_locations != []
-
-
-def device_is_mounted(dev, destination=None):
- """
- Check if the given device is mounted, optionally validating that a
- destination exists
- """
- mounts = get_mounts(devices=True)
- realpath = os.path.realpath(dev) if dev.startswith('/') else dev
- destination = os.path.realpath(destination) if destination else None
- mounted_locations = mounts.get(realpath, [])
-
- if destination:
- return destination in mounted_locations
- return mounted_locations != []
-
-
-def get_mounts(devices=False, paths=False):
- """
- Create a mapping of all available system mounts so that other helpers can
- detect nicely what path or device is mounted
-
- It ignores (most of) non existing devices, but since some setups might need
- some extra device information, it will make an exception for:
-
- - tmpfs
- - devtmpfs
-
- If ``devices`` is set to ``True`` the mapping will be a device-to-path(s),
- if ``paths`` is set to ``True`` then the mapping will be
- a path-to-device(s)
- """
- devices_mounted = {}
- paths_mounted = {}
- do_not_skip = ['tmpfs', 'devtmpfs']
- default_to_devices = devices is False and paths is False
-
- with open(PROCDIR + '/mounts', 'rb') as mounts:
- proc_mounts = mounts.readlines()
-
- for line in proc_mounts:
- fields = [as_string(f) for f in line.split()]
- if len(fields) < 3:
- continue
- device = os.path.realpath(fields[0]) if fields[0].startswith('/') else fields[0]
- path = os.path.realpath(fields[1])
- # only care about actual existing devices
- if not os.path.exists(device) or not device.startswith('/'):
- if device not in do_not_skip:
- continue
- if device in devices_mounted.keys():
- devices_mounted[device].append(path)
- else:
- devices_mounted[device] = [path]
- if path in paths_mounted.keys():
- paths_mounted[path].append(device)
- else:
- paths_mounted[path] = [device]
-
- # Default to returning information for devices if
- if devices is True or default_to_devices:
- return devices_mounted
- else:
- return paths_mounted