Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / ceph-volume / ceph_volume / tests / api / test_lvm.py
1 import pytest
2 from ceph_volume import process, exceptions
3 from ceph_volume.api import lvm as api
4
5
6 class TestParseTags(object):
7
8     def test_no_tags_means_empty_dict(self):
9         result = api.parse_tags('')
10         assert result == {}
11
12     def test_single_tag_gets_parsed(self):
13         result = api.parse_tags('ceph.osd_something=1')
14         assert result == {'ceph.osd_something': '1'}
15
16     def test_non_ceph_tags_are_skipped(self):
17         result = api.parse_tags('foo')
18         assert result == {}
19
20     def test_mixed_non_ceph_tags(self):
21         result = api.parse_tags('foo,ceph.bar=1')
22         assert result == {'ceph.bar': '1'}
23
24     def test_multiple_csv_expands_in_dict(self):
25         result = api.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000')
26         # assert them piecemeal to avoid the un-ordered dict nature
27         assert result['ceph.osd_something'] == '1'
28         assert result['ceph.foo'] == '2'
29         assert result['ceph.fsid'] == '0000'
30
31
32 class TestGetAPIVgs(object):
33
34     def test_report_is_emtpy(self, monkeypatch):
35         monkeypatch.setattr(api.process, 'call', lambda x: ('\n\n', '', 0))
36         assert api.get_api_vgs() == []
37
38     def test_report_has_stuff(self, monkeypatch):
39         report = ['  VolGroup00']
40         monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
41         assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
42
43     def test_report_has_stuff_with_empty_attrs(self, monkeypatch):
44         report = ['  VolGroup00 ;;;;;;9g']
45         monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
46         result = api.get_api_vgs()[0]
47         assert len(result.keys()) == 7
48         assert result['vg_name'] == 'VolGroup00'
49         assert result['vg_free'] == '9g'
50
51     def test_report_has_multiple_items(self, monkeypatch):
52         report = ['   VolGroup00;;;;;;;', '    ceph_vg;;;;;;;']
53         monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
54         result = api.get_api_vgs()
55         assert result[0]['vg_name'] == 'VolGroup00'
56         assert result[1]['vg_name'] == 'ceph_vg'
57
58
59 class TestGetAPILvs(object):
60
61     def test_report_is_emtpy(self, monkeypatch):
62         monkeypatch.setattr(api.process, 'call', lambda x: ('', '', 0))
63         assert api.get_api_lvs() == []
64
65     def test_report_has_stuff(self, monkeypatch):
66         report = ['  ;/path;VolGroup00;root']
67         monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
68         result = api.get_api_lvs()
69         assert result[0]['lv_name'] == 'VolGroup00'
70
71     def test_report_has_multiple_items(self, monkeypatch):
72         report = ['  ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg']
73         monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
74         result = api.get_api_lvs()
75         assert result[0]['lv_name'] == 'VolName'
76         assert result[1]['lv_name'] == 'ceph_lv'
77
78
79 @pytest.fixture
80 def volumes(monkeypatch):
81     monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
82     volumes = api.Volumes()
83     volumes._purge()
84     # also patch api.Volumes so that when it is called, it will use the newly
85     # created fixture, with whatever the test method wants to append to it
86     monkeypatch.setattr(api, 'Volumes', lambda: volumes)
87     return volumes
88
89
90 @pytest.fixture
91 def pvolumes(monkeypatch):
92     monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
93     pvolumes = api.PVolumes()
94     pvolumes._purge()
95     return pvolumes
96
97
98 @pytest.fixture
99 def volume_groups(monkeypatch):
100     monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
101     vgs = api.VolumeGroups()
102     vgs._purge()
103     return vgs
104
105
106 class TestGetLV(object):
107
108     def test_nothing_is_passed_in(self):
109         # so we return a None
110         assert api.get_lv() is None
111
112     def test_single_lv_is_matched(self, volumes, monkeypatch):
113         FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data")
114         volumes.append(FooVolume)
115         monkeypatch.setattr(api, 'Volumes', lambda: volumes)
116         assert api.get_lv(lv_name='foo') == FooVolume
117
118     def test_single_lv_is_matched_by_uuid(self, volumes, monkeypatch):
119         FooVolume = api.Volume(
120             lv_name='foo', lv_path='/dev/vg/foo',
121             lv_uuid='1111', lv_tags="ceph.type=data")
122         volumes.append(FooVolume)
123         monkeypatch.setattr(api, 'Volumes', lambda: volumes)
124         assert api.get_lv(lv_uuid='1111') == FooVolume
125
126
127 class TestGetPV(object):
128
129     def test_nothing_is_passed_in(self):
130         # so we return a None
131         assert api.get_pv() is None
132
133     def test_single_pv_is_not_matched(self, pvolumes, monkeypatch):
134         FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
135         pvolumes.append(FooPVolume)
136         monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
137         assert api.get_pv(pv_uuid='foo') is None
138
139     def test_single_pv_is_matched(self, pvolumes, monkeypatch):
140         FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
141         pvolumes.append(FooPVolume)
142         monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
143         assert api.get_pv(pv_uuid='0000') == FooPVolume
144
145     def test_single_pv_is_matched_by_uuid(self, pvolumes, monkeypatch):
146         FooPVolume = api.PVolume(
147             pv_name='/dev/vg/foo',
148             pv_uuid='1111', pv_tags="ceph.type=data")
149         pvolumes.append(FooPVolume)
150         monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
151         assert api.get_pv(pv_uuid='1111') == FooPVolume
152
153
154 class TestPVolumes(object):
155
156     def test_filter_by_tag_does_not_match_one(self, pvolumes, monkeypatch):
157         pv_tags = "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa"
158         FooPVolume = api.PVolume(
159             pv_name='/dev/vg/foo',
160             pv_uuid='1111', pv_tags=pv_tags)
161         pvolumes.append(FooPVolume)
162         pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '2'})
163         assert pvolumes == []
164
165     def test_filter_by_tags_matches(self, pvolumes, monkeypatch):
166         pv_tags = "ceph.type=journal,ceph.osd_id=1"
167         FooPVolume = api.PVolume(
168             pv_name='/dev/vg/foo',
169             pv_uuid='1111', pv_tags=pv_tags)
170         pvolumes.append(FooPVolume)
171         pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '1'})
172         assert pvolumes == [FooPVolume]
173
174
175 class TestGetVG(object):
176
177     def test_nothing_is_passed_in(self):
178         # so we return a None
179         assert api.get_vg() is None
180
181     def test_single_vg_is_matched(self, volume_groups, monkeypatch):
182         FooVG = api.VolumeGroup(vg_name='foo')
183         volume_groups.append(FooVG)
184         monkeypatch.setattr(api, 'VolumeGroups', lambda: volume_groups)
185         assert api.get_vg(vg_name='foo') == FooVG
186
187
188 class TestVolumes(object):
189
190     def test_volume_get_has_no_volumes(self, volumes):
191         assert volumes.get() is None
192
193     def test_volume_get_filtered_has_no_volumes(self, volumes):
194         assert volumes.get(lv_name='ceph') is None
195
196     def test_volume_has_multiple_matches(self, volumes):
197         volume1 = volume2 = api.Volume(lv_name='foo', lv_path='/dev/vg/lv', lv_tags='')
198         volumes.append(volume1)
199         volumes.append(volume2)
200         with pytest.raises(exceptions.MultipleLVsError):
201             volumes.get(lv_name='foo')
202
203     def test_as_dict_infers_type_from_tags(self, volumes):
204         lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
205         osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
206         volumes.append(osd)
207         result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict()
208         assert result['type'] == 'data'
209
210     def test_as_dict_populates_path_from_lv_api(self, volumes):
211         lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
212         osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
213         volumes.append(osd)
214         result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict()
215         assert result['path'] == '/dev/vg/lv'
216
217     def test_find_the_correct_one(self, volumes):
218         volume1 = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags='')
219         volume2 = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='')
220         volumes.append(volume1)
221         volumes.append(volume2)
222         assert volumes.get(lv_name='volume1') == volume1
223
224     def test_filter_by_tag(self, volumes):
225         lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
226         osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
227         journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.type=journal')
228         volumes.append(osd)
229         volumes.append(journal)
230         volumes.filter(lv_tags={'ceph.type': 'data'})
231         assert len(volumes) == 1
232         assert volumes[0].lv_name == 'volume1'
233
234     def test_filter_by_tag_does_not_match_one(self, volumes):
235         lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
236         osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
237         journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.osd_id=1,ceph.type=journal')
238         volumes.append(osd)
239         volumes.append(journal)
240         # note the different osd_id!
241         volumes.filter(lv_tags={'ceph.type': 'data', 'ceph.osd_id': '2'})
242         assert volumes == []
243
244     def test_filter_by_vg_name(self, volumes):
245         lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
246         osd = api.Volume(lv_name='volume1', vg_name='ceph_vg', lv_tags=lv_tags)
247         journal = api.Volume(lv_name='volume2', vg_name='system_vg', lv_tags='ceph.type=journal')
248         volumes.append(osd)
249         volumes.append(journal)
250         volumes.filter(vg_name='ceph_vg')
251         assert len(volumes) == 1
252         assert volumes[0].lv_name == 'volume1'
253
254     def test_filter_by_lv_path(self, volumes):
255         osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_tags='')
256         journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_tags='')
257         volumes.append(osd)
258         volumes.append(journal)
259         volumes.filter(lv_path='/dev/volume1')
260         assert len(volumes) == 1
261         assert volumes[0].lv_name == 'volume1'
262
263     def test_filter_by_lv_uuid(self, volumes):
264         osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
265         journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
266         volumes.append(osd)
267         volumes.append(journal)
268         volumes.filter(lv_uuid='1111')
269         assert len(volumes) == 1
270         assert volumes[0].lv_name == 'volume1'
271
272     def test_filter_by_lv_uuid_nothing_found(self, volumes):
273         osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
274         journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
275         volumes.append(osd)
276         volumes.append(journal)
277         volumes.filter(lv_uuid='22222')
278         assert volumes == []
279
280     def test_filter_requires_params(self, volumes):
281         with pytest.raises(TypeError):
282             volumes.filter()
283
284
285 class TestVolumeGroups(object):
286
287     def test_volume_get_has_no_volume_groups(self, volume_groups):
288         assert volume_groups.get() is None
289
290     def test_volume_get_filtered_has_no_volumes(self, volume_groups):
291         assert volume_groups.get(vg_name='ceph') is None
292
293     def test_volume_has_multiple_matches(self, volume_groups):
294         volume1 = volume2 = api.VolumeGroup(vg_name='foo', lv_path='/dev/vg/lv', lv_tags='')
295         volume_groups.append(volume1)
296         volume_groups.append(volume2)
297         with pytest.raises(exceptions.MultipleVGsError):
298             volume_groups.get(vg_name='foo')
299
300     def test_find_the_correct_one(self, volume_groups):
301         volume1 = api.VolumeGroup(vg_name='volume1', lv_tags='')
302         volume2 = api.VolumeGroup(vg_name='volume2', lv_tags='')
303         volume_groups.append(volume1)
304         volume_groups.append(volume2)
305         assert volume_groups.get(vg_name='volume1') == volume1
306
307     def test_filter_by_tag(self, volume_groups):
308         vg_tags = "ceph.group=dmcache"
309         osd = api.VolumeGroup(vg_name='volume1', vg_tags=vg_tags)
310         journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain')
311         volume_groups.append(osd)
312         volume_groups.append(journal)
313         volume_groups.filter(vg_tags={'ceph.group': 'dmcache'})
314         assert len(volume_groups) == 1
315         assert volume_groups[0].vg_name == 'volume1'
316
317     def test_filter_by_tag_does_not_match_one(self, volume_groups):
318         vg_tags = "ceph.group=dmcache,ceph.disk_type=ssd"
319         osd = api.VolumeGroup(vg_name='volume1', vg_path='/dev/vg/lv', vg_tags=vg_tags)
320         volume_groups.append(osd)
321         volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
322         assert volume_groups == []
323
324     def test_filter_by_vg_name(self, volume_groups):
325         vg_tags = "ceph.type=data,ceph.fsid=000-aaa"
326         osd = api.VolumeGroup(vg_name='ceph_vg', vg_tags=vg_tags)
327         journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal')
328         volume_groups.append(osd)
329         volume_groups.append(journal)
330         volume_groups.filter(vg_name='ceph_vg')
331         assert len(volume_groups) == 1
332         assert volume_groups[0].vg_name == 'ceph_vg'
333
334     def test_filter_requires_params(self, volume_groups):
335         with pytest.raises(TypeError):
336             volume_groups.filter()
337
338
339 class TestGetLVFromArgument(object):
340
341     def setup(self):
342         self.foo_volume = api.Volume(
343             lv_name='foo', lv_path='/path/to/lv',
344             vg_name='foo_group', lv_tags=''
345         )
346
347     def test_non_absolute_path_is_not_valid(self, volumes):
348         volumes.append(self.foo_volume)
349         assert api.get_lv_from_argument('foo') is None
350
351     def test_too_many_slashes_is_invalid(self, volumes):
352         volumes.append(self.foo_volume)
353         assert api.get_lv_from_argument('path/to/lv') is None
354
355     def test_absolute_path_is_not_lv(self, volumes):
356         volumes.append(self.foo_volume)
357         assert api.get_lv_from_argument('/path') is None
358
359     def test_absolute_path_is_lv(self, volumes):
360         volumes.append(self.foo_volume)
361         assert api.get_lv_from_argument('/path/to/lv') == self.foo_volume
362
363
364 class TestRemoveLV(object):
365
366     def test_removes_lv(self, monkeypatch):
367         def mock_call(cmd, **kw):
368             return ('', '', 0)
369         monkeypatch.setattr(process, 'call', mock_call)
370         assert api.remove_lv("vg/lv")
371
372     def test_fails_to_remove_lv(self, monkeypatch):
373         def mock_call(cmd, **kw):
374             return ('', '', 1)
375         monkeypatch.setattr(process, 'call', mock_call)
376         with pytest.raises(RuntimeError):
377             api.remove_lv("vg/lv")
378
379
380 class TestCreateLV(object):
381
382     def setup(self):
383         self.foo_volume = api.Volume(lv_name='foo', lv_path='/path', vg_name='foo_group', lv_tags='')
384
385     def test_uses_size(self, monkeypatch, capture):
386         monkeypatch.setattr(process, 'run', capture)
387         monkeypatch.setattr(process, 'call', capture)
388         monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
389         api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
390         expected = ['lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group']
391         assert capture.calls[0]['args'][0] == expected
392
393     def test_calls_to_set_type_tag(self, monkeypatch, capture):
394         monkeypatch.setattr(process, 'run', capture)
395         monkeypatch.setattr(process, 'call', capture)
396         monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
397         api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
398         ceph_tag = ['lvchange', '--addtag', 'ceph.type=data', '/path']
399         assert capture.calls[1]['args'][0] == ceph_tag
400
401     def test_calls_to_set_data_tag(self, monkeypatch, capture):
402         monkeypatch.setattr(process, 'run', capture)
403         monkeypatch.setattr(process, 'call', capture)
404         monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
405         api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
406         data_tag = ['lvchange', '--addtag', 'ceph.data_device=/path', '/path']
407         assert capture.calls[2]['args'][0] == data_tag