X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fceph-volume%2Fceph_volume%2Ftests%2Fapi%2Ftest_lvm.py;fp=src%2Fceph%2Fsrc%2Fceph-volume%2Fceph_volume%2Ftests%2Fapi%2Ftest_lvm.py;h=1a2dff95989453a29fd104d8832e2d26315a5360;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/ceph-volume/ceph_volume/tests/api/test_lvm.py b/src/ceph/src/ceph-volume/ceph_volume/tests/api/test_lvm.py new file mode 100644 index 0000000..1a2dff9 --- /dev/null +++ b/src/ceph/src/ceph-volume/ceph_volume/tests/api/test_lvm.py @@ -0,0 +1,407 @@ +import pytest +from ceph_volume import process, exceptions +from ceph_volume.api import lvm as api + + +class TestParseTags(object): + + def test_no_tags_means_empty_dict(self): + result = api.parse_tags('') + assert result == {} + + def test_single_tag_gets_parsed(self): + result = api.parse_tags('ceph.osd_something=1') + assert result == {'ceph.osd_something': '1'} + + def test_non_ceph_tags_are_skipped(self): + result = api.parse_tags('foo') + assert result == {} + + def test_mixed_non_ceph_tags(self): + result = api.parse_tags('foo,ceph.bar=1') + assert result == {'ceph.bar': '1'} + + def test_multiple_csv_expands_in_dict(self): + result = api.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000') + # assert them piecemeal to avoid the un-ordered dict nature + assert result['ceph.osd_something'] == '1' + assert result['ceph.foo'] == '2' + assert result['ceph.fsid'] == '0000' + + +class TestGetAPIVgs(object): + + def test_report_is_emtpy(self, monkeypatch): + monkeypatch.setattr(api.process, 'call', lambda x: ('\n\n', '', 0)) + assert api.get_api_vgs() == [] + + def test_report_has_stuff(self, monkeypatch): + report = [' VolGroup00'] + monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) + assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}] + + def test_report_has_stuff_with_empty_attrs(self, monkeypatch): + report = [' VolGroup00 ;;;;;;9g'] + monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) + result = api.get_api_vgs()[0] + assert len(result.keys()) == 7 + assert result['vg_name'] == 'VolGroup00' + assert result['vg_free'] == '9g' + + def test_report_has_multiple_items(self, monkeypatch): + report = [' VolGroup00;;;;;;;', ' ceph_vg;;;;;;;'] + monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) + result = api.get_api_vgs() + assert result[0]['vg_name'] == 'VolGroup00' + assert result[1]['vg_name'] == 'ceph_vg' + + +class TestGetAPILvs(object): + + def test_report_is_emtpy(self, monkeypatch): + monkeypatch.setattr(api.process, 'call', lambda x: ('', '', 0)) + assert api.get_api_lvs() == [] + + def test_report_has_stuff(self, monkeypatch): + report = [' ;/path;VolGroup00;root'] + monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) + result = api.get_api_lvs() + assert result[0]['lv_name'] == 'VolGroup00' + + def test_report_has_multiple_items(self, monkeypatch): + report = [' ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg'] + monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) + result = api.get_api_lvs() + assert result[0]['lv_name'] == 'VolName' + assert result[1]['lv_name'] == 'ceph_lv' + + +@pytest.fixture +def volumes(monkeypatch): + monkeypatch.setattr(process, 'call', lambda x: ('', '', 0)) + volumes = api.Volumes() + volumes._purge() + # also patch api.Volumes so that when it is called, it will use the newly + # created fixture, with whatever the test method wants to append to it + monkeypatch.setattr(api, 'Volumes', lambda: volumes) + return volumes + + +@pytest.fixture +def pvolumes(monkeypatch): + monkeypatch.setattr(process, 'call', lambda x: ('', '', 0)) + pvolumes = api.PVolumes() + pvolumes._purge() + return pvolumes + + +@pytest.fixture +def volume_groups(monkeypatch): + monkeypatch.setattr(process, 'call', lambda x: ('', '', 0)) + vgs = api.VolumeGroups() + vgs._purge() + return vgs + + +class TestGetLV(object): + + def test_nothing_is_passed_in(self): + # so we return a None + assert api.get_lv() is None + + def test_single_lv_is_matched(self, volumes, monkeypatch): + FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data") + volumes.append(FooVolume) + monkeypatch.setattr(api, 'Volumes', lambda: volumes) + assert api.get_lv(lv_name='foo') == FooVolume + + def test_single_lv_is_matched_by_uuid(self, volumes, monkeypatch): + FooVolume = api.Volume( + lv_name='foo', lv_path='/dev/vg/foo', + lv_uuid='1111', lv_tags="ceph.type=data") + volumes.append(FooVolume) + monkeypatch.setattr(api, 'Volumes', lambda: volumes) + assert api.get_lv(lv_uuid='1111') == FooVolume + + +class TestGetPV(object): + + def test_nothing_is_passed_in(self): + # so we return a None + assert api.get_pv() is None + + def test_single_pv_is_not_matched(self, pvolumes, monkeypatch): + FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={}) + pvolumes.append(FooPVolume) + monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) + assert api.get_pv(pv_uuid='foo') is None + + def test_single_pv_is_matched(self, pvolumes, monkeypatch): + FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={}) + pvolumes.append(FooPVolume) + monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) + assert api.get_pv(pv_uuid='0000') == FooPVolume + + def test_single_pv_is_matched_by_uuid(self, pvolumes, monkeypatch): + FooPVolume = api.PVolume( + pv_name='/dev/vg/foo', + pv_uuid='1111', pv_tags="ceph.type=data") + pvolumes.append(FooPVolume) + monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) + assert api.get_pv(pv_uuid='1111') == FooPVolume + + +class TestPVolumes(object): + + def test_filter_by_tag_does_not_match_one(self, pvolumes, monkeypatch): + pv_tags = "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa" + FooPVolume = api.PVolume( + pv_name='/dev/vg/foo', + pv_uuid='1111', pv_tags=pv_tags) + pvolumes.append(FooPVolume) + pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '2'}) + assert pvolumes == [] + + def test_filter_by_tags_matches(self, pvolumes, monkeypatch): + pv_tags = "ceph.type=journal,ceph.osd_id=1" + FooPVolume = api.PVolume( + pv_name='/dev/vg/foo', + pv_uuid='1111', pv_tags=pv_tags) + pvolumes.append(FooPVolume) + pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '1'}) + assert pvolumes == [FooPVolume] + + +class TestGetVG(object): + + def test_nothing_is_passed_in(self): + # so we return a None + assert api.get_vg() is None + + def test_single_vg_is_matched(self, volume_groups, monkeypatch): + FooVG = api.VolumeGroup(vg_name='foo') + volume_groups.append(FooVG) + monkeypatch.setattr(api, 'VolumeGroups', lambda: volume_groups) + assert api.get_vg(vg_name='foo') == FooVG + + +class TestVolumes(object): + + def test_volume_get_has_no_volumes(self, volumes): + assert volumes.get() is None + + def test_volume_get_filtered_has_no_volumes(self, volumes): + assert volumes.get(lv_name='ceph') is None + + def test_volume_has_multiple_matches(self, volumes): + volume1 = volume2 = api.Volume(lv_name='foo', lv_path='/dev/vg/lv', lv_tags='') + volumes.append(volume1) + volumes.append(volume2) + with pytest.raises(exceptions.MultipleLVsError): + volumes.get(lv_name='foo') + + def test_as_dict_infers_type_from_tags(self, volumes): + lv_tags = "ceph.type=data,ceph.fsid=000-aaa" + osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) + volumes.append(osd) + result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict() + assert result['type'] == 'data' + + def test_as_dict_populates_path_from_lv_api(self, volumes): + lv_tags = "ceph.type=data,ceph.fsid=000-aaa" + osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) + volumes.append(osd) + result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict() + assert result['path'] == '/dev/vg/lv' + + def test_find_the_correct_one(self, volumes): + volume1 = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags='') + volume2 = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='') + volumes.append(volume1) + volumes.append(volume2) + assert volumes.get(lv_name='volume1') == volume1 + + def test_filter_by_tag(self, volumes): + lv_tags = "ceph.type=data,ceph.fsid=000-aaa" + osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) + journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.type=journal') + volumes.append(osd) + volumes.append(journal) + volumes.filter(lv_tags={'ceph.type': 'data'}) + assert len(volumes) == 1 + assert volumes[0].lv_name == 'volume1' + + def test_filter_by_tag_does_not_match_one(self, volumes): + lv_tags = "ceph.type=data,ceph.fsid=000-aaa" + osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) + journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.osd_id=1,ceph.type=journal') + volumes.append(osd) + volumes.append(journal) + # note the different osd_id! + volumes.filter(lv_tags={'ceph.type': 'data', 'ceph.osd_id': '2'}) + assert volumes == [] + + def test_filter_by_vg_name(self, volumes): + lv_tags = "ceph.type=data,ceph.fsid=000-aaa" + osd = api.Volume(lv_name='volume1', vg_name='ceph_vg', lv_tags=lv_tags) + journal = api.Volume(lv_name='volume2', vg_name='system_vg', lv_tags='ceph.type=journal') + volumes.append(osd) + volumes.append(journal) + volumes.filter(vg_name='ceph_vg') + assert len(volumes) == 1 + assert volumes[0].lv_name == 'volume1' + + def test_filter_by_lv_path(self, volumes): + osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_tags='') + journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_tags='') + volumes.append(osd) + volumes.append(journal) + volumes.filter(lv_path='/dev/volume1') + assert len(volumes) == 1 + assert volumes[0].lv_name == 'volume1' + + def test_filter_by_lv_uuid(self, volumes): + osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='') + journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='') + volumes.append(osd) + volumes.append(journal) + volumes.filter(lv_uuid='1111') + assert len(volumes) == 1 + assert volumes[0].lv_name == 'volume1' + + def test_filter_by_lv_uuid_nothing_found(self, volumes): + osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='') + journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='') + volumes.append(osd) + volumes.append(journal) + volumes.filter(lv_uuid='22222') + assert volumes == [] + + def test_filter_requires_params(self, volumes): + with pytest.raises(TypeError): + volumes.filter() + + +class TestVolumeGroups(object): + + def test_volume_get_has_no_volume_groups(self, volume_groups): + assert volume_groups.get() is None + + def test_volume_get_filtered_has_no_volumes(self, volume_groups): + assert volume_groups.get(vg_name='ceph') is None + + def test_volume_has_multiple_matches(self, volume_groups): + volume1 = volume2 = api.VolumeGroup(vg_name='foo', lv_path='/dev/vg/lv', lv_tags='') + volume_groups.append(volume1) + volume_groups.append(volume2) + with pytest.raises(exceptions.MultipleVGsError): + volume_groups.get(vg_name='foo') + + def test_find_the_correct_one(self, volume_groups): + volume1 = api.VolumeGroup(vg_name='volume1', lv_tags='') + volume2 = api.VolumeGroup(vg_name='volume2', lv_tags='') + volume_groups.append(volume1) + volume_groups.append(volume2) + assert volume_groups.get(vg_name='volume1') == volume1 + + def test_filter_by_tag(self, volume_groups): + vg_tags = "ceph.group=dmcache" + osd = api.VolumeGroup(vg_name='volume1', vg_tags=vg_tags) + journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain') + volume_groups.append(osd) + volume_groups.append(journal) + volume_groups.filter(vg_tags={'ceph.group': 'dmcache'}) + assert len(volume_groups) == 1 + assert volume_groups[0].vg_name == 'volume1' + + def test_filter_by_tag_does_not_match_one(self, volume_groups): + vg_tags = "ceph.group=dmcache,ceph.disk_type=ssd" + osd = api.VolumeGroup(vg_name='volume1', vg_path='/dev/vg/lv', vg_tags=vg_tags) + volume_groups.append(osd) + volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'}) + assert volume_groups == [] + + def test_filter_by_vg_name(self, volume_groups): + vg_tags = "ceph.type=data,ceph.fsid=000-aaa" + osd = api.VolumeGroup(vg_name='ceph_vg', vg_tags=vg_tags) + journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal') + volume_groups.append(osd) + volume_groups.append(journal) + volume_groups.filter(vg_name='ceph_vg') + assert len(volume_groups) == 1 + assert volume_groups[0].vg_name == 'ceph_vg' + + def test_filter_requires_params(self, volume_groups): + with pytest.raises(TypeError): + volume_groups.filter() + + +class TestGetLVFromArgument(object): + + def setup(self): + self.foo_volume = api.Volume( + lv_name='foo', lv_path='/path/to/lv', + vg_name='foo_group', lv_tags='' + ) + + def test_non_absolute_path_is_not_valid(self, volumes): + volumes.append(self.foo_volume) + assert api.get_lv_from_argument('foo') is None + + def test_too_many_slashes_is_invalid(self, volumes): + volumes.append(self.foo_volume) + assert api.get_lv_from_argument('path/to/lv') is None + + def test_absolute_path_is_not_lv(self, volumes): + volumes.append(self.foo_volume) + assert api.get_lv_from_argument('/path') is None + + def test_absolute_path_is_lv(self, volumes): + volumes.append(self.foo_volume) + assert api.get_lv_from_argument('/path/to/lv') == self.foo_volume + + +class TestRemoveLV(object): + + def test_removes_lv(self, monkeypatch): + def mock_call(cmd, **kw): + return ('', '', 0) + monkeypatch.setattr(process, 'call', mock_call) + assert api.remove_lv("vg/lv") + + def test_fails_to_remove_lv(self, monkeypatch): + def mock_call(cmd, **kw): + return ('', '', 1) + monkeypatch.setattr(process, 'call', mock_call) + with pytest.raises(RuntimeError): + api.remove_lv("vg/lv") + + +class TestCreateLV(object): + + def setup(self): + self.foo_volume = api.Volume(lv_name='foo', lv_path='/path', vg_name='foo_group', lv_tags='') + + def test_uses_size(self, monkeypatch, capture): + monkeypatch.setattr(process, 'run', capture) + monkeypatch.setattr(process, 'call', capture) + monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) + api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'}) + expected = ['lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group'] + assert capture.calls[0]['args'][0] == expected + + def test_calls_to_set_type_tag(self, monkeypatch, capture): + monkeypatch.setattr(process, 'run', capture) + monkeypatch.setattr(process, 'call', capture) + monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) + api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'}) + ceph_tag = ['lvchange', '--addtag', 'ceph.type=data', '/path'] + assert capture.calls[1]['args'][0] == ceph_tag + + def test_calls_to_set_data_tag(self, monkeypatch, capture): + monkeypatch.setattr(process, 'run', capture) + monkeypatch.setattr(process, 'call', capture) + monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) + api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'}) + data_tag = ['lvchange', '--addtag', 'ceph.data_device=/path', '/path'] + assert capture.calls[2]['args'][0] == data_tag