7 from ceph_volume import process
8 from . import as_string
11 # TODO: get these out of here and into a common area for others to consume
12 if platform.system() == 'FreeBSD':
14 DEFAULT_FS_TYPE = 'zfs'
15 PROCDIR = '/compat/linux/proc'
16 # FreeBSD does not have blockdevices any more
21 DEFAULT_FS_TYPE = 'xfs'
23 BLOCKDIR = '/sys/block'
28 return str(uuid.uuid4())
31 def get_ceph_user_ids():
33 Return the id and gid of the ceph user
36 user = pwd.getpwnam('ceph')
38 # is this even possible?
39 raise RuntimeError('"ceph" user is not available in the current system')
40 return user[2], user[3]
43 def mkdir_p(path, chown=True):
45 A `mkdir -p` that defaults to chown the path to the ceph user
50 if e.errno == errno.EEXIST:
55 uid, gid = get_ceph_user_ids()
56 os.chown(path, uid, gid)
59 def chown(path, recursive=True):
61 ``chown`` a path to the ceph user (uid and guid fetched at runtime)
63 uid, gid = get_ceph_user_ids()
64 if os.path.islink(path):
65 path = os.path.realpath(path)
67 process.run(['chown', '-R', 'ceph:ceph', path])
69 os.chown(path, uid, gid)
74 Detect if a file path is a binary or not. Will falsely report as binary
75 when utf-16 encoded. In the ceph universe there is no such risk (yet)
77 with open(path, 'rb') as fp:
78 contents = fp.read(8192)
79 if b'\x00' in contents: # a null byte may signal binary
84 class tmp_mount(object):
86 Temporarily mount a device on a temporary directory,
87 and unmount it upon exit
90 def __init__(self, device):
95 self.path = tempfile.mkdtemp()
104 def __exit__(self, exc_type, exc_val, exc_tb):
112 def path_is_mounted(path, destination=None):
114 Check if the given path is mounted
116 mounts = get_mounts(paths=True)
117 realpath = os.path.realpath(path)
118 mounted_locations = mounts.get(realpath, [])
121 if destination.startswith('/'):
122 destination = os.path.realpath(destination)
123 return destination in mounted_locations
124 return mounted_locations != []
127 def device_is_mounted(dev, destination=None):
129 Check if the given device is mounted, optionally validating that a
132 mounts = get_mounts(devices=True)
133 realpath = os.path.realpath(dev) if dev.startswith('/') else dev
134 destination = os.path.realpath(destination) if destination else None
135 mounted_locations = mounts.get(realpath, [])
138 return destination in mounted_locations
139 return mounted_locations != []
142 def get_mounts(devices=False, paths=False):
144 Create a mapping of all available system mounts so that other helpers can
145 detect nicely what path or device is mounted
147 It ignores (most of) non existing devices, but since some setups might need
148 some extra device information, it will make an exception for:
153 If ``devices`` is set to ``True`` the mapping will be a device-to-path(s),
154 if ``paths`` is set to ``True`` then the mapping will be
159 do_not_skip = ['tmpfs', 'devtmpfs']
160 default_to_devices = devices is False and paths is False
162 with open(PROCDIR + '/mounts', 'rb') as mounts:
163 proc_mounts = mounts.readlines()
165 for line in proc_mounts:
166 fields = [as_string(f) for f in line.split()]
169 device = os.path.realpath(fields[0]) if fields[0].startswith('/') else fields[0]
170 path = os.path.realpath(fields[1])
171 # only care about actual existing devices
172 if not os.path.exists(device) or not device.startswith('/'):
173 if device not in do_not_skip:
175 if device in devices_mounted.keys():
176 devices_mounted[device].append(path)
178 devices_mounted[device] = [path]
179 if path in paths_mounted.keys():
180 paths_mounted[path].append(device)
182 paths_mounted[path] = [device]
184 # Default to returning information for devices if
185 if devices is True or default_to_devices:
186 return devices_mounted