--- /dev/null
+import errno
+import os
+import pwd
+import platform
+import tempfile
+import uuid
+from ceph_volume import process
+from . import as_string
+
+
+# TODO: get these out of here and into a common area for others to consume
+if platform.system() == 'FreeBSD':
+ FREEBSD = True
+ DEFAULT_FS_TYPE = 'zfs'
+ PROCDIR = '/compat/linux/proc'
+ # FreeBSD does not have blockdevices any more
+ BLOCKDIR = '/dev'
+ ROOTGROUP = 'wheel'
+else:
+ FREEBSD = False
+ DEFAULT_FS_TYPE = 'xfs'
+ PROCDIR = '/proc'
+ BLOCKDIR = '/sys/block'
+ ROOTGROUP = 'root'
+
+
+def generate_uuid():
+ return str(uuid.uuid4())
+
+
+def get_ceph_user_ids():
+ """
+ Return the id and gid of the ceph user
+ """
+ try:
+ user = pwd.getpwnam('ceph')
+ except KeyError:
+ # is this even possible?
+ raise RuntimeError('"ceph" user is not available in the current system')
+ return user[2], user[3]
+
+
+def mkdir_p(path, chown=True):
+ """
+ A `mkdir -p` that defaults to chown the path to the ceph user
+ """
+ try:
+ os.mkdir(path)
+ except OSError as e:
+ if e.errno == errno.EEXIST:
+ pass
+ else:
+ raise
+ if chown:
+ uid, gid = get_ceph_user_ids()
+ os.chown(path, uid, gid)
+
+
+def chown(path, recursive=True):
+ """
+ ``chown`` a path to the ceph user (uid and guid fetched at runtime)
+ """
+ uid, gid = get_ceph_user_ids()
+ if os.path.islink(path):
+ path = os.path.realpath(path)
+ if recursive:
+ process.run(['chown', '-R', 'ceph:ceph', path])
+ else:
+ os.chown(path, uid, gid)
+
+
+def is_binary(path):
+ """
+ Detect if a file path is a binary or not. Will falsely report as binary
+ when utf-16 encoded. In the ceph universe there is no such risk (yet)
+ """
+ with open(path, 'rb') as fp:
+ contents = fp.read(8192)
+ if b'\x00' in contents: # a null byte may signal binary
+ return True
+ return False
+
+
+class tmp_mount(object):
+ """
+ Temporarily mount a device on a temporary directory,
+ and unmount it upon exit
+ """
+
+ def __init__(self, device):
+ self.device = device
+ self.path = None
+
+ def __enter__(self):
+ self.path = tempfile.mkdtemp()
+ process.run([
+ 'mount',
+ '-v',
+ self.device,
+ self.path
+ ])
+ return self.path
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ process.run([
+ 'umount',
+ '-v',
+ self.path
+ ])
+
+
+def path_is_mounted(path, destination=None):
+ """
+ Check if the given path is mounted
+ """
+ mounts = get_mounts(paths=True)
+ realpath = os.path.realpath(path)
+ mounted_locations = mounts.get(realpath, [])
+
+ if destination:
+ if destination.startswith('/'):
+ destination = os.path.realpath(destination)
+ return destination in mounted_locations
+ return mounted_locations != []
+
+
+def device_is_mounted(dev, destination=None):
+ """
+ Check if the given device is mounted, optionally validating that a
+ destination exists
+ """
+ mounts = get_mounts(devices=True)
+ realpath = os.path.realpath(dev) if dev.startswith('/') else dev
+ destination = os.path.realpath(destination) if destination else None
+ mounted_locations = mounts.get(realpath, [])
+
+ if destination:
+ return destination in mounted_locations
+ return mounted_locations != []
+
+
+def get_mounts(devices=False, paths=False):
+ """
+ Create a mapping of all available system mounts so that other helpers can
+ detect nicely what path or device is mounted
+
+ It ignores (most of) non existing devices, but since some setups might need
+ some extra device information, it will make an exception for:
+
+ - tmpfs
+ - devtmpfs
+
+ If ``devices`` is set to ``True`` the mapping will be a device-to-path(s),
+ if ``paths`` is set to ``True`` then the mapping will be
+ a path-to-device(s)
+ """
+ devices_mounted = {}
+ paths_mounted = {}
+ do_not_skip = ['tmpfs', 'devtmpfs']
+ default_to_devices = devices is False and paths is False
+
+ with open(PROCDIR + '/mounts', 'rb') as mounts:
+ proc_mounts = mounts.readlines()
+
+ for line in proc_mounts:
+ fields = [as_string(f) for f in line.split()]
+ if len(fields) < 3:
+ continue
+ device = os.path.realpath(fields[0]) if fields[0].startswith('/') else fields[0]
+ path = os.path.realpath(fields[1])
+ # only care about actual existing devices
+ if not os.path.exists(device) or not device.startswith('/'):
+ if device not in do_not_skip:
+ continue
+ if device in devices_mounted.keys():
+ devices_mounted[device].append(path)
+ else:
+ devices_mounted[device] = [path]
+ if path in paths_mounted.keys():
+ paths_mounted[path].append(device)
+ else:
+ paths_mounted[path] = [device]
+
+ # Default to returning information for devices if
+ if devices is True or default_to_devices:
+ return devices_mounted
+ else:
+ return paths_mounted