3 from cStringIO import StringIO
4 except ImportError: # pragma: no cover
5 from io import StringIO # pragma: no cover
6 from textwrap import dedent
8 from ceph_volume import configuration, exceptions
20 class TestConf(object):
23 self.conf_file = StringIO(dedent("""
28 def test_get_non_existing_list(self):
29 cfg = configuration.Conf()
30 cfg.is_valid = lambda: True
31 cfg.readfp(self.conf_file)
32 assert cfg.get_list('global', 'key') == []
34 def test_get_non_existing_list_get_default(self):
35 cfg = configuration.Conf()
36 cfg.is_valid = lambda: True
37 cfg.readfp(self.conf_file)
38 assert cfg.get_list('global', 'key', ['a']) == ['a']
40 def test_get_rid_of_comments(self):
41 cfg = configuration.Conf()
42 cfg.is_valid = lambda: True
43 conf_file = StringIO(dedent("""
45 default = 0 # this is a comment
49 assert cfg.get_list('foo', 'default') == ['0']
51 def test_gets_split_on_commas(self):
52 cfg = configuration.Conf()
53 cfg.is_valid = lambda: True
54 conf_file = StringIO(dedent("""
56 default = 0,1,2,3 # this is a comment
60 assert cfg.get_list('foo', 'default') == ['0', '1', '2', '3']
62 def test_spaces_and_tabs_are_ignored(self):
63 cfg = configuration.Conf()
64 cfg.is_valid = lambda: True
65 conf_file = StringIO(dedent("""
67 default = 0, 1, 2 ,3 # this is a comment
71 assert cfg.get_list('foo', 'default') == ['0', '1', '2', '3']
74 class TestLoad(object):
76 def test_load_from_path(self, tmpdir):
77 conf_path = os.path.join(str(tmpdir), 'ceph.conf')
78 with open(conf_path, 'w') as conf:
79 conf.write(tabbed_conf)
80 result = configuration.load(conf_path)
81 assert result.get('global', 'default') == '0'
83 def test_load_with_colon_comments(self, tmpdir):
84 conf_path = os.path.join(str(tmpdir), 'ceph.conf')
85 with open(conf_path, 'w') as conf:
86 conf.write(tabbed_conf)
87 result = configuration.load(conf_path)
88 assert result.get('global', 'other_c') == '1'
90 def test_load_with_hash_comments(self, tmpdir):
91 conf_path = os.path.join(str(tmpdir), 'ceph.conf')
92 with open(conf_path, 'w') as conf:
93 conf.write(tabbed_conf)
94 result = configuration.load(conf_path)
95 assert result.get('global', 'other_h') == '1'
97 def test_path_does_not_exist(self):
98 with pytest.raises(exceptions.ConfigurationError):
99 conf = configuration.load('/path/does/not/exist/ceph.con')
102 def test_unable_to_read_configuration(self, tmpdir, capsys):
103 ceph_conf = os.path.join(str(tmpdir), 'ceph.conf')
104 with open(ceph_conf, 'w') as config:
105 config.write(']broken] config\n[[')
106 with pytest.raises(RuntimeError):
107 configuration.load(ceph_conf)
108 stdout, stderr = capsys.readouterr()
109 assert 'File contains no section headers' in stdout
111 @pytest.mark.parametrize('commented', ['colon','hash'])
112 def test_coment_as_a_value(self, tmpdir, commented):
113 conf_path = os.path.join(str(tmpdir), 'ceph.conf')
114 with open(conf_path, 'w') as conf:
115 conf.write(tabbed_conf)
116 result = configuration.load(conf_path)
117 assert result.get('global', commented) == ''