--- /dev/null
+"""
+API for CRUD lvm tag operations. Follows the Ceph LVM tag naming convention
+that prefixes tags with ``ceph.`` and uses ``=`` for assignment, and provides
+set of utilities for interacting with LVM.
+"""
+from ceph_volume import process
+from ceph_volume.exceptions import MultipleLVsError, MultipleVGsError, MultiplePVsError
+
+
+def _output_parser(output, fields):
+ """
+ Newer versions of LVM allow ``--reportformat=json``, but older versions,
+ like the one included in Xenial do not. LVM has the ability to filter and
+ format its output so we assume the output will be in a format this parser
+ can handle (using ',' as a delimiter)
+
+ :param fields: A string, possibly using ',' to group many items, as it
+ would be used on the CLI
+ :param output: The CLI output from the LVM call
+ """
+ field_items = fields.split(',')
+ report = []
+ for line in output:
+ # clear the leading/trailing whitespace
+ line = line.strip()
+
+ # remove the extra '"' in each field
+ line = line.replace('"', '')
+
+ # prevent moving forward with empty contents
+ if not line:
+ continue
+
+ # spliting on ';' because that is what the lvm call uses as
+ # '--separator'
+ output_items = [i.strip() for i in line.split(';')]
+ # map the output to the fiels
+ report.append(
+ dict(zip(field_items, output_items))
+ )
+
+ return report
+
+
+def parse_tags(lv_tags):
+ """
+ Return a dictionary mapping of all the tags associated with
+ a Volume from the comma-separated tags coming from the LVM API
+
+ Input look like::
+
+ "ceph.osd_fsid=aaa-fff-bbbb,ceph.osd_id=0"
+
+ For the above example, the expected return value would be::
+
+ {
+ "ceph.osd_fsid": "aaa-fff-bbbb",
+ "ceph.osd_id": "0"
+ }
+ """
+ if not lv_tags:
+ return {}
+ tag_mapping = {}
+ tags = lv_tags.split(',')
+ for tag_assignment in tags:
+ if not tag_assignment.startswith('ceph.'):
+ continue
+ key, value = tag_assignment.split('=', 1)
+ tag_mapping[key] = value
+
+ return tag_mapping
+
+
+def get_api_vgs():
+ """
+ Return the list of group volumes available in the system using flags to
+ include common metadata associated with them
+
+ Command and sample delimeted output, should look like::
+
+ $ vgs --noheadings --separator=';' \
+ -o vg_name,pv_count,lv_count,snap_count,vg_attr,vg_size,vg_free
+ ubuntubox-vg;1;2;0;wz--n-;299.52g;12.00m
+ osd_vg;3;1;0;wz--n-;29.21g;9.21g
+
+ """
+ fields = 'vg_name,pv_count,lv_count,snap_count,vg_attr,vg_size,vg_free'
+ stdout, stderr, returncode = process.call(
+ ['vgs', '--noheadings', '--separator=";"', '-o', fields]
+ )
+ return _output_parser(stdout, fields)
+
+
+def get_api_lvs():
+ """
+ Return the list of logical volumes available in the system using flags to include common
+ metadata associated with them
+
+ Command and delimeted output, should look like::
+
+ $ lvs --noheadings --separator=';' -o lv_tags,lv_path,lv_name,vg_name
+ ;/dev/ubuntubox-vg/root;root;ubuntubox-vg
+ ;/dev/ubuntubox-vg/swap_1;swap_1;ubuntubox-vg
+
+ """
+ fields = 'lv_tags,lv_path,lv_name,vg_name,lv_uuid'
+ stdout, stderr, returncode = process.call(
+ ['lvs', '--noheadings', '--separator=";"', '-o', fields]
+ )
+ return _output_parser(stdout, fields)
+
+
+def get_api_pvs():
+ """
+ Return the list of physical volumes configured for lvm and available in the
+ system using flags to include common metadata associated with them like the uuid
+
+ Command and delimeted output, should look like::
+
+ $ pvs --noheadings --separator=';' -o pv_name,pv_tags,pv_uuid
+ /dev/sda1;;
+ /dev/sdv;;07A4F654-4162-4600-8EB3-88D1E42F368D
+
+ """
+ fields = 'pv_name,pv_tags,pv_uuid'
+
+ # note the use of `pvs -a` which will return every physical volume including
+ # ones that have not been initialized as "pv" by LVM
+ stdout, stderr, returncode = process.call(
+ ['pvs', '-a', '--no-heading', '--separator=";"', '-o', fields]
+ )
+
+ return _output_parser(stdout, fields)
+
+
+def get_lv_from_argument(argument):
+ """
+ Helper proxy function that consumes a possible logical volume passed in from the CLI
+ in the form of `vg/lv`, but with some validation so that an argument that is a full
+ path to a device can be ignored
+ """
+ if argument.startswith('/'):
+ lv = get_lv(lv_path=argument)
+ return lv
+ try:
+ vg_name, lv_name = argument.split('/')
+ except (ValueError, AttributeError):
+ return None
+ return get_lv(lv_name=lv_name, vg_name=vg_name)
+
+
+def get_lv(lv_name=None, vg_name=None, lv_path=None, lv_uuid=None, lv_tags=None):
+ """
+ Return a matching lv for the current system, requiring ``lv_name``,
+ ``vg_name``, ``lv_path`` or ``tags``. Raises an error if more than one lv
+ is found.
+
+ It is useful to use ``tags`` when trying to find a specific logical volume,
+ but it can also lead to multiple lvs being found, since a lot of metadata
+ is shared between lvs of a distinct OSD.
+ """
+ if not any([lv_name, vg_name, lv_path, lv_uuid, lv_tags]):
+ return None
+ lvs = Volumes()
+ return lvs.get(
+ lv_name=lv_name, vg_name=vg_name, lv_path=lv_path, lv_uuid=lv_uuid,
+ lv_tags=lv_tags
+ )
+
+
+def get_pv(pv_name=None, pv_uuid=None, pv_tags=None):
+ """
+ Return a matching pv (physical volume) for the current system, requiring
+ ``pv_name``, ``pv_uuid``, or ``pv_tags``. Raises an error if more than one
+ pv is found.
+ """
+ if not any([pv_name, pv_uuid, pv_tags]):
+ return None
+ pvs = PVolumes()
+ return pvs.get(pv_name=pv_name, pv_uuid=pv_uuid, pv_tags=pv_tags)
+
+
+def create_pv(device):
+ """
+ Create a physical volume from a device, useful when devices need to be later mapped
+ to journals.
+ """
+ process.run([
+ 'pvcreate',
+ '-v', # verbose
+ '-f', # force it
+ '--yes', # answer yes to any prompts
+ device
+ ])
+
+
+def create_vg(name, *devices):
+ """
+ Create a Volume Group. Command looks like::
+
+ vgcreate --force --yes group_name device
+
+ Once created the volume group is returned as a ``VolumeGroup`` object
+ """
+ process.run([
+ 'vgcreate',
+ '--force',
+ '--yes',
+ name] + list(devices)
+ )
+
+ vg = get_vg(vg_name=name)
+ return vg
+
+
+def remove_lv(path):
+ """
+ Removes a logical volume given it's absolute path.
+
+ Will return True if the lv is successfully removed or
+ raises a RuntimeError if the removal fails.
+ """
+ stdout, stderr, returncode = process.call(
+ [
+ 'lvremove',
+ '-v', # verbose
+ '-f', # force it
+ path
+ ],
+ show_command=True,
+ terminal_verbose=True,
+ )
+ if returncode != 0:
+ raise RuntimeError("Unable to remove %s".format(path))
+ return True
+
+
+def create_lv(name, group, size=None, tags=None):
+ """
+ Create a Logical Volume in a Volume Group. Command looks like::
+
+ lvcreate -L 50G -n gfslv vg0
+
+ ``name``, ``group``, are required. If ``size`` is provided it must follow
+ lvm's size notation (like 1G, or 20M). Tags are an optional dictionary and is expected to
+ conform to the convention of prefixing them with "ceph." like::
+
+ {"ceph.block_device": "/dev/ceph/osd-1"}
+ """
+ # XXX add CEPH_VOLUME_LVM_DEBUG to enable -vvvv on lv operations
+ type_path_tag = {
+ 'journal': 'ceph.journal_device',
+ 'data': 'ceph.data_device',
+ 'block': 'ceph.block_device',
+ 'wal': 'ceph.wal_device',
+ 'db': 'ceph.db_device',
+ 'lockbox': 'ceph.lockbox_device', # XXX might not ever need this lockbox sorcery
+ }
+ if size:
+ process.run([
+ 'lvcreate',
+ '--yes',
+ '-L',
+ '%s' % size,
+ '-n', name, group
+ ])
+ # create the lv with all the space available, this is needed because the
+ # system call is different for LVM
+ else:
+ process.run([
+ 'lvcreate',
+ '--yes',
+ '-l',
+ '100%FREE',
+ '-n', name, group
+ ])
+
+ lv = get_lv(lv_name=name, vg_name=group)
+ lv.set_tags(tags)
+
+ # when creating a distinct type, the caller doesn't know what the path will
+ # be so this function will set it after creation using the mapping
+ path_tag = type_path_tag.get(tags.get('ceph.type'))
+ if path_tag:
+ lv.set_tags(
+ {path_tag: lv.lv_path}
+ )
+ return lv
+
+
+def get_vg(vg_name=None, vg_tags=None):
+ """
+ Return a matching vg for the current system, requires ``vg_name`` or
+ ``tags``. Raises an error if more than one vg is found.
+
+ It is useful to use ``tags`` when trying to find a specific volume group,
+ but it can also lead to multiple vgs being found.
+ """
+ if not any([vg_name, vg_tags]):
+ return None
+ vgs = VolumeGroups()
+ return vgs.get(vg_name=vg_name, vg_tags=vg_tags)
+
+
+class VolumeGroups(list):
+ """
+ A list of all known volume groups for the current system, with the ability
+ to filter them via keyword arguments.
+ """
+
+ def __init__(self):
+ self._populate()
+
+ def _populate(self):
+ # get all the vgs in the current system
+ for vg_item in get_api_vgs():
+ self.append(VolumeGroup(**vg_item))
+
+ def _purge(self):
+ """
+ Deplete all the items in the list, used internally only so that we can
+ dynamically allocate the items when filtering without the concern of
+ messing up the contents
+ """
+ self[:] = []
+
+ def _filter(self, vg_name=None, vg_tags=None):
+ """
+ The actual method that filters using a new list. Useful so that other
+ methods that do not want to alter the contents of the list (e.g.
+ ``self.find``) can operate safely.
+
+ .. note:: ``vg_tags`` is not yet implemented
+ """
+ filtered = [i for i in self]
+ if vg_name:
+ filtered = [i for i in filtered if i.vg_name == vg_name]
+
+ # at this point, `filtered` has either all the volumes in self or is an
+ # actual filtered list if any filters were applied
+ if vg_tags:
+ tag_filtered = []
+ for volume in filtered:
+ matches = all(volume.tags.get(k) == str(v) for k, v in vg_tags.items())
+ if matches:
+ tag_filtered.append(volume)
+ return tag_filtered
+
+ return filtered
+
+ def filter(self, vg_name=None, vg_tags=None):
+ """
+ Filter out groups on top level attributes like ``vg_name`` or by
+ ``vg_tags`` where a dict is required. For example, to find a Ceph group
+ with dmcache as the type, the filter would look like::
+
+ vg_tags={'ceph.type': 'dmcache'}
+
+ .. warning:: These tags are not documented because they are currently
+ unused, but are here to maintain API consistency
+ """
+ if not any([vg_name, vg_tags]):
+ raise TypeError('.filter() requires vg_name or vg_tags (none given)')
+ # first find the filtered volumes with the values in self
+ filtered_groups = self._filter(
+ vg_name=vg_name,
+ vg_tags=vg_tags
+ )
+ # then purge everything
+ self._purge()
+ # and add the filtered items
+ self.extend(filtered_groups)
+
+ def get(self, vg_name=None, vg_tags=None):
+ """
+ This is a bit expensive, since it will try to filter out all the
+ matching items in the list, filter them out applying anything that was
+ added and return the matching item.
+
+ This method does *not* alter the list, and it will raise an error if
+ multiple VGs are matched
+
+ It is useful to use ``tags`` when trying to find a specific volume group,
+ but it can also lead to multiple vgs being found (although unlikely)
+ """
+ if not any([vg_name, vg_tags]):
+ return None
+ vgs = self._filter(
+ vg_name=vg_name,
+ vg_tags=vg_tags
+ )
+ if not vgs:
+ return None
+ if len(vgs) > 1:
+ # this is probably never going to happen, but it is here to keep
+ # the API code consistent
+ raise MultipleVGsError(vg_name)
+ return vgs[0]
+
+
+class Volumes(list):
+ """
+ A list of all known (logical) volumes for the current system, with the ability
+ to filter them via keyword arguments.
+ """
+
+ def __init__(self):
+ self._populate()
+
+ def _populate(self):
+ # get all the lvs in the current system
+ for lv_item in get_api_lvs():
+ self.append(Volume(**lv_item))
+
+ def _purge(self):
+ """
+ Deplete all the items in the list, used internally only so that we can
+ dynamically allocate the items when filtering without the concern of
+ messing up the contents
+ """
+ self[:] = []
+
+ def _filter(self, lv_name=None, vg_name=None, lv_path=None, lv_uuid=None, lv_tags=None):
+ """
+ The actual method that filters using a new list. Useful so that other
+ methods that do not want to alter the contents of the list (e.g.
+ ``self.find``) can operate safely.
+ """
+ filtered = [i for i in self]
+ if lv_name:
+ filtered = [i for i in filtered if i.lv_name == lv_name]
+
+ if vg_name:
+ filtered = [i for i in filtered if i.vg_name == vg_name]
+
+ if lv_uuid:
+ filtered = [i for i in filtered if i.lv_uuid == lv_uuid]
+
+ if lv_path:
+ filtered = [i for i in filtered if i.lv_path == lv_path]
+
+ # at this point, `filtered` has either all the volumes in self or is an
+ # actual filtered list if any filters were applied
+ if lv_tags:
+ tag_filtered = []
+ for volume in filtered:
+ # all the tags we got need to match on the volume
+ matches = all(volume.tags.get(k) == str(v) for k, v in lv_tags.items())
+ if matches:
+ tag_filtered.append(volume)
+ return tag_filtered
+
+ return filtered
+
+ def filter(self, lv_name=None, vg_name=None, lv_path=None, lv_uuid=None, lv_tags=None):
+ """
+ Filter out volumes on top level attributes like ``lv_name`` or by
+ ``lv_tags`` where a dict is required. For example, to find a volume
+ that has an OSD ID of 0, the filter would look like::
+
+ lv_tags={'ceph.osd_id': '0'}
+
+ """
+ if not any([lv_name, vg_name, lv_path, lv_uuid, lv_tags]):
+ raise TypeError('.filter() requires lv_name, vg_name, lv_path, lv_uuid, or tags (none given)')
+ # first find the filtered volumes with the values in self
+ filtered_volumes = self._filter(
+ lv_name=lv_name,
+ vg_name=vg_name,
+ lv_path=lv_path,
+ lv_uuid=lv_uuid,
+ lv_tags=lv_tags
+ )
+ # then purge everything
+ self._purge()
+ # and add the filtered items
+ self.extend(filtered_volumes)
+
+ def get(self, lv_name=None, vg_name=None, lv_path=None, lv_uuid=None, lv_tags=None):
+ """
+ This is a bit expensive, since it will try to filter out all the
+ matching items in the list, filter them out applying anything that was
+ added and return the matching item.
+
+ This method does *not* alter the list, and it will raise an error if
+ multiple LVs are matched
+
+ It is useful to use ``tags`` when trying to find a specific logical volume,
+ but it can also lead to multiple lvs being found, since a lot of metadata
+ is shared between lvs of a distinct OSD.
+ """
+ if not any([lv_name, vg_name, lv_path, lv_uuid, lv_tags]):
+ return None
+ lvs = self._filter(
+ lv_name=lv_name,
+ vg_name=vg_name,
+ lv_path=lv_path,
+ lv_uuid=lv_uuid,
+ lv_tags=lv_tags
+ )
+ if not lvs:
+ return None
+ if len(lvs) > 1:
+ raise MultipleLVsError(lv_name, lv_path)
+ return lvs[0]
+
+
+class PVolumes(list):
+ """
+ A list of all known (physical) volumes for the current system, with the ability
+ to filter them via keyword arguments.
+ """
+
+ def __init__(self):
+ self._populate()
+
+ def _populate(self):
+ # get all the pvs in the current system
+ for pv_item in get_api_pvs():
+ self.append(PVolume(**pv_item))
+
+ def _purge(self):
+ """
+ Deplete all the items in the list, used internally only so that we can
+ dynamically allocate the items when filtering without the concern of
+ messing up the contents
+ """
+ self[:] = []
+
+ def _filter(self, pv_name=None, pv_uuid=None, pv_tags=None):
+ """
+ The actual method that filters using a new list. Useful so that other
+ methods that do not want to alter the contents of the list (e.g.
+ ``self.find``) can operate safely.
+ """
+ filtered = [i for i in self]
+ if pv_name:
+ filtered = [i for i in filtered if i.pv_name == pv_name]
+
+ if pv_uuid:
+ filtered = [i for i in filtered if i.pv_uuid == pv_uuid]
+
+ # at this point, `filtered` has either all the physical volumes in self
+ # or is an actual filtered list if any filters were applied
+ if pv_tags:
+ tag_filtered = []
+ for pvolume in filtered:
+ matches = all(pvolume.tags.get(k) == str(v) for k, v in pv_tags.items())
+ if matches:
+ tag_filtered.append(pvolume)
+ # return the tag_filtered pvolumes here, the `filtered` list is no
+ # longer useable
+ return tag_filtered
+
+ return filtered
+
+ def filter(self, pv_name=None, pv_uuid=None, pv_tags=None):
+ """
+ Filter out volumes on top level attributes like ``pv_name`` or by
+ ``pv_tags`` where a dict is required. For example, to find a physical volume
+ that has an OSD ID of 0, the filter would look like::
+
+ pv_tags={'ceph.osd_id': '0'}
+
+ """
+ if not any([pv_name, pv_uuid, pv_tags]):
+ raise TypeError('.filter() requires pv_name, pv_uuid, or pv_tags (none given)')
+ # first find the filtered volumes with the values in self
+ filtered_volumes = self._filter(
+ pv_name=pv_name,
+ pv_uuid=pv_uuid,
+ pv_tags=pv_tags
+ )
+ # then purge everything
+ self._purge()
+ # and add the filtered items
+ self.extend(filtered_volumes)
+
+ def get(self, pv_name=None, pv_uuid=None, pv_tags=None):
+ """
+ This is a bit expensive, since it will try to filter out all the
+ matching items in the list, filter them out applying anything that was
+ added and return the matching item.
+
+ This method does *not* alter the list, and it will raise an error if
+ multiple pvs are matched
+
+ It is useful to use ``tags`` when trying to find a specific logical volume,
+ but it can also lead to multiple pvs being found, since a lot of metadata
+ is shared between pvs of a distinct OSD.
+ """
+ if not any([pv_name, pv_uuid, pv_tags]):
+ return None
+ pvs = self._filter(
+ pv_name=pv_name,
+ pv_uuid=pv_uuid,
+ pv_tags=pv_tags
+ )
+ if not pvs:
+ return None
+ if len(pvs) > 1:
+ raise MultiplePVsError(pv_name)
+ return pvs[0]
+
+
+class VolumeGroup(object):
+ """
+ Represents an LVM group, with some top-level attributes like ``vg_name``
+ """
+
+ def __init__(self, **kw):
+ for k, v in kw.items():
+ setattr(self, k, v)
+ self.name = kw['vg_name']
+ self.tags = parse_tags(kw.get('vg_tags', ''))
+
+ def __str__(self):
+ return '<%s>' % self.name
+
+ def __repr__(self):
+ return self.__str__()
+
+
+class Volume(object):
+ """
+ Represents a Logical Volume from LVM, with some top-level attributes like
+ ``lv_name`` and parsed tags as a dictionary of key/value pairs.
+ """
+
+ def __init__(self, **kw):
+ for k, v in kw.items():
+ setattr(self, k, v)
+ self.lv_api = kw
+ self.name = kw['lv_name']
+ self.tags = parse_tags(kw['lv_tags'])
+
+ def __str__(self):
+ return '<%s>' % self.lv_api['lv_path']
+
+ def __repr__(self):
+ return self.__str__()
+
+ def as_dict(self):
+ obj = {}
+ obj.update(self.lv_api)
+ obj['tags'] = self.tags
+ obj['name'] = self.name
+ obj['type'] = self.tags['ceph.type']
+ obj['path'] = self.lv_path
+ return obj
+
+ def clear_tags(self):
+ """
+ Removes all tags from the Logical Volume.
+ """
+ for k, v in self.tags.items():
+ tag = "%s=%s" % (k, v)
+ process.run(['lvchange', '--deltag', tag, self.lv_path])
+
+ def set_tags(self, tags):
+ """
+ :param tags: A dictionary of tag names and values, like::
+
+ {
+ "ceph.osd_fsid": "aaa-fff-bbbb",
+ "ceph.osd_id": "0"
+ }
+
+ At the end of all modifications, the tags are refreshed to reflect
+ LVM's most current view.
+ """
+ for k, v in tags.items():
+ self.set_tag(k, v)
+ # after setting all the tags, refresh them for the current object, use the
+ # lv_* identifiers to filter because those shouldn't change
+ lv_object = get_lv(lv_name=self.lv_name, lv_path=self.lv_path)
+ self.tags = lv_object.tags
+
+ def set_tag(self, key, value):
+ """
+ Set the key/value pair as an LVM tag. Does not "refresh" the values of
+ the current object for its tags. Meant to be a "fire and forget" type
+ of modification.
+ """
+ # remove it first if it exists
+ if self.tags.get(key):
+ current_value = self.tags[key]
+ tag = "%s=%s" % (key, current_value)
+ process.call(['lvchange', '--deltag', tag, self.lv_api['lv_path']])
+
+ process.call(
+ [
+ 'lvchange',
+ '--addtag', '%s=%s' % (key, value), self.lv_path
+ ]
+ )
+
+
+class PVolume(object):
+ """
+ Represents a Physical Volume from LVM, with some top-level attributes like
+ ``pv_name`` and parsed tags as a dictionary of key/value pairs.
+ """
+
+ def __init__(self, **kw):
+ for k, v in kw.items():
+ setattr(self, k, v)
+ self.pv_api = kw
+ self.name = kw['pv_name']
+ self.tags = parse_tags(kw['pv_tags'])
+
+ def __str__(self):
+ return '<%s>' % self.pv_api['pv_name']
+
+ def __repr__(self):
+ return self.__str__()
+
+ def set_tags(self, tags):
+ """
+ :param tags: A dictionary of tag names and values, like::
+
+ {
+ "ceph.osd_fsid": "aaa-fff-bbbb",
+ "ceph.osd_id": "0"
+ }
+
+ At the end of all modifications, the tags are refreshed to reflect
+ LVM's most current view.
+ """
+ for k, v in tags.items():
+ self.set_tag(k, v)
+ # after setting all the tags, refresh them for the current object, use the
+ # pv_* identifiers to filter because those shouldn't change
+ pv_object = get_pv(pv_name=self.pv_name, pv_uuid=self.pv_uuid)
+ self.tags = pv_object.tags
+
+ def set_tag(self, key, value):
+ """
+ Set the key/value pair as an LVM tag. Does not "refresh" the values of
+ the current object for its tags. Meant to be a "fire and forget" type
+ of modification.
+
+ **warning**: Altering tags on a PV has to be done ensuring that the
+ device is actually the one intended. ``pv_name`` is *not* a persistent
+ value, only ``pv_uuid`` is. Using ``pv_uuid`` is the best way to make
+ sure the device getting changed is the one needed.
+ """
+ # remove it first if it exists
+ if self.tags.get(key):
+ current_value = self.tags[key]
+ tag = "%s=%s" % (key, current_value)
+ process.call(['pvchange', '--deltag', tag, self.pv_name])
+
+ process.call(
+ [
+ 'pvchange',
+ '--addtag', '%s=%s' % (key, value), self.pv_name
+ ]
+ )