2 from ceph_volume.devices.lvm import activate
3 from ceph_volume.api import lvm as api
8 def __init__(self, **kw):
10 self.bluestore = False
11 self.filestore = False
12 self.auto_detect_objectstore = None
13 for k, v in kw.items():
17 class TestActivate(object):
19 # these tests are very functional, hence the heavy patching, it is hard to
20 # test the negative side effect with an actual functional run, so we must
21 # setup a perfect scenario for this test to check it can really work
23 def test_no_osd_id_matches_fsid(self, is_root, volumes, monkeypatch, capture):
24 FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.osd_fsid=1234")
25 volumes.append(FooVolume)
26 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
27 monkeypatch.setattr(activate, 'activate_filestore', capture)
28 args = Args(osd_id=None, osd_fsid='1234', filestore=True)
29 activate.Activate([]).activate(args)
30 assert capture.calls[0]['args'][0] == [FooVolume]
32 def test_no_osd_id_matches_fsid_bluestore(self, is_root, volumes, monkeypatch, capture):
33 FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.osd_fsid=1234")
34 volumes.append(FooVolume)
35 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
36 monkeypatch.setattr(activate, 'activate_bluestore', capture)
37 args = Args(osd_id=None, osd_fsid='1234', bluestore=True)
38 activate.Activate([]).activate(args)
39 assert capture.calls[0]['args'][0] == [FooVolume]
41 def test_no_osd_id_no_matching_fsid(self, is_root, volumes, monkeypatch, capture):
42 FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.osd_fsid=11234")
43 volumes.append(FooVolume)
44 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
45 monkeypatch.setattr(activate, 'activate_filestore', capture)
46 args = Args(osd_id=None, osd_fsid='1234')
47 with pytest.raises(RuntimeError):
48 activate.Activate([]).activate(args)
51 class TestActivateFlags(object):
53 def test_default_objectstore(self, capture):
54 args = ['0', 'asdf-ljh-asdf']
55 activation = activate.Activate(args)
56 activation.activate = capture
58 parsed_args = capture.calls[0]['args'][0]
59 assert parsed_args.filestore is False
60 assert parsed_args.bluestore is True
62 def test_uses_filestore(self, capture):
63 args = ['--filestore', '0', 'asdf-ljh-asdf']
64 activation = activate.Activate(args)
65 activation.activate = capture
67 parsed_args = capture.calls[0]['args'][0]
68 assert parsed_args.filestore is True
69 assert parsed_args.bluestore is False
71 def test_uses_bluestore(self, capture):
72 args = ['--bluestore', '0', 'asdf-ljh-asdf']
73 activation = activate.Activate(args)
74 activation.activate = capture
76 parsed_args = capture.calls[0]['args'][0]
77 assert parsed_args.filestore is False
78 assert parsed_args.bluestore is True