Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / cephfs / test_fragment.py
1
2
3 from tasks.cephfs.cephfs_test_case import CephFSTestCase
4 from teuthology.orchestra import run
5
6 import logging
7 log = logging.getLogger(__name__)
8
9
10 class TestFragmentation(CephFSTestCase):
11     CLIENTS_REQUIRED = 1
12     MDSS_REQUIRED = 1
13
14     def get_splits(self):
15         return self.fs.mds_asok(['perf', 'dump', 'mds'])['mds']['dir_split']
16
17     def get_merges(self):
18         return self.fs.mds_asok(['perf', 'dump', 'mds'])['mds']['dir_merge']
19
20     def get_dir_ino(self, path):
21         dir_cache = self.fs.read_cache(path, 0)
22         dir_ino = None
23         dir_inono = self.mount_a.path_to_ino(path.strip("/"))
24         for ino in dir_cache:
25             if ino['ino'] == dir_inono:
26                 dir_ino = ino
27                 break
28         self.assertIsNotNone(dir_ino)
29         return dir_ino
30
31     def _configure(self, **kwargs):
32         """
33         Apply kwargs as MDS configuration settings, enable dirfrags
34         and restart the MDSs.
35         """
36         kwargs['mds_bal_frag'] = "true"
37
38         for k, v in kwargs.items():
39             self.ceph_cluster.set_ceph_conf("mds", k, v.__str__())
40
41         self.fs.set_allow_dirfrags(True)
42
43         self.mds_cluster.mds_fail_restart()
44         self.fs.wait_for_daemons()
45
46     def test_oversize(self):
47         """
48         That a directory is split when it becomes too large.
49         """
50
51         split_size = 20
52         merge_size = 5
53
54         self._configure(
55             mds_bal_split_size=split_size,
56             mds_bal_merge_size=merge_size,
57             mds_bal_split_bits=1
58         )
59
60         self.assertEqual(self.get_splits(), 0)
61
62         self.mount_a.create_n_files("splitdir/file", split_size + 1)
63
64         self.wait_until_true(
65             lambda: self.get_splits() == 1,
66             timeout=30
67         )
68
69         frags = self.get_dir_ino("/splitdir")['dirfrags']
70         self.assertEqual(len(frags), 2)
71         self.assertEqual(frags[0]['dirfrag'], "0x10000000000.0*")
72         self.assertEqual(frags[1]['dirfrag'], "0x10000000000.1*")
73         self.assertEqual(
74             sum([len(f['dentries']) for f in frags]),
75             split_size + 1
76         )
77
78         self.assertEqual(self.get_merges(), 0)
79
80         self.mount_a.run_shell(["rm", "-f", run.Raw("splitdir/file*")])
81
82         self.wait_until_true(
83             lambda: self.get_merges() == 1,
84             timeout=30
85         )
86
87         self.assertEqual(len(self.get_dir_ino("/splitdir")["dirfrags"]), 1)
88
89     def test_rapid_creation(self):
90         """
91         That the fast-splitting limit of 1.5x normal limit is
92         applied when creating dentries quickly.
93         """
94
95         split_size = 100
96         merge_size = 1
97
98         self._configure(
99             mds_bal_split_size=split_size,
100             mds_bal_merge_size=merge_size,
101             mds_bal_split_bits=3,
102             mds_bal_fragment_size_max=int(split_size * 1.5 + 2)
103         )
104
105         # We test this only at a single split level.  If a client was sending
106         # IO so fast that it hit a second split before the first split
107         # was complete, it could violate mds_bal_fragment_size_max -- there
108         # is a window where the child dirfrags of a split are unfrozen
109         # (so they can grow), but still have STATE_FRAGMENTING (so they
110         # can't be split).
111
112         # By writing 4x the split size when the split bits are set
113         # to 3 (i.e. 4-ways), I am reasonably sure to see precisely
114         # one split.  The test is to check whether that split
115         # happens soon enough that the client doesn't exceed
116         # 2x the split_size (the "immediate" split mode should
117         # kick in at 1.5x the split size).
118
119         self.assertEqual(self.get_splits(), 0)
120         self.mount_a.create_n_files("splitdir/file", split_size * 4)
121         self.wait_until_equal(
122             self.get_splits,
123             1,
124             reject_fn=lambda s: s > 1,
125             timeout=30
126         )
127
128     def test_deep_split(self):
129         """
130         That when the directory grows many times larger than split size,
131         the fragments get split again.
132         """
133
134         split_size = 100
135         merge_size = 1  # i.e. don't merge frag unless its empty
136         split_bits = 1
137
138         branch_factor = 2**split_bits
139
140         # Arbitrary: how many levels shall we try fragmenting before
141         # ending the test?
142         max_depth = 5
143
144         self._configure(
145             mds_bal_split_size=split_size,
146             mds_bal_merge_size=merge_size,
147             mds_bal_split_bits=split_bits
148         )
149
150         # Each iteration we will create another level of fragments.  The
151         # placement of dentries into fragments is by hashes (i.e. pseudo
152         # random), so we rely on statistics to get the behaviour that
153         # by writing about 1.5x as many dentries as the split_size times
154         # the number of frags, we will get them all to exceed their
155         # split size and trigger a split.
156         depth = 0
157         files_written = 0
158         splits_expected = 0
159         while depth < max_depth:
160             log.info("Writing files for depth {0}".format(depth))
161             target_files = branch_factor**depth * int(split_size * 1.5)
162             create_files = target_files - files_written
163
164             self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
165                 "{0} Writing {1} files (depth={2})".format(
166                     self.__class__.__name__, create_files, depth
167                 ))
168             self.mount_a.create_n_files("splitdir/file_{0}".format(depth),
169                                         create_files)
170             self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
171                 "{0} Done".format(self.__class__.__name__))
172
173             files_written += create_files
174             log.info("Now have {0} files".format(files_written))
175
176             splits_expected += branch_factor**depth
177             log.info("Waiting to see {0} splits".format(splits_expected))
178             try:
179                 self.wait_until_equal(
180                     self.get_splits,
181                     splits_expected,
182                     timeout=30,
183                     reject_fn=lambda x: x > splits_expected
184                 )
185
186                 frags = self.get_dir_ino("/splitdir")['dirfrags']
187                 self.assertEqual(len(frags), branch_factor**(depth+1))
188                 self.assertEqual(
189                     sum([len(f['dentries']) for f in frags]),
190                     target_files
191                 )
192             except:
193                 # On failures, log what fragmentation we actually ended
194                 # up with.  This block is just for logging, at the end
195                 # we raise the exception again.
196                 frags = self.get_dir_ino("/splitdir")['dirfrags']
197                 log.info("depth={0} splits_expected={1} files_written={2}".format(
198                     depth, splits_expected, files_written
199                 ))
200                 log.info("Dirfrags:")
201                 for f in frags:
202                     log.info("{0}: {1}".format(
203                         f['dirfrag'], len(f['dentries'])
204                     ))
205                 raise
206
207             depth += 1
208
209         # Remember the inode number because we will be checking for
210         # objects later.
211         dir_inode_no = self.mount_a.path_to_ino("splitdir")
212
213         self.mount_a.run_shell(["rm", "-rf", "splitdir/"])
214         self.mount_a.umount_wait()
215
216         self.fs.mds_asok(['flush', 'journal'])
217
218         # Wait for all strays to purge
219         self.wait_until_equal(
220             lambda: self.fs.mds_asok(['perf', 'dump', 'mds_cache']
221                                      )['mds_cache']['num_strays'],
222             0,
223             timeout=1200
224         )
225         # Check that the metadata pool objects for all the myriad
226         # child fragments are gone
227         metadata_objs = self.fs.rados(["ls"])
228         frag_objs = []
229         for o in metadata_objs:
230             if o.startswith("{0:x}.".format(dir_inode_no)):
231                 frag_objs.append(o)
232         self.assertListEqual(frag_objs, [])