Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / cephfs / test_strays.py
1 import json
2 import time
3 import logging
4 from textwrap import dedent
5 import datetime
6 import gevent
7 import datetime
8
9 from teuthology.orchestra.run import CommandFailedError, Raw
10 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
11
12 log = logging.getLogger(__name__)
13
14
15 class TestStrays(CephFSTestCase):
16     MDSS_REQUIRED = 2
17
18     OPS_THROTTLE = 1
19     FILES_THROTTLE = 2
20
21     # Range of different file sizes used in throttle test's workload
22     throttle_workload_size_range = 16
23
24     @for_teuthology
25     def test_ops_throttle(self):
26         self._test_throttling(self.OPS_THROTTLE)
27
28     @for_teuthology
29     def test_files_throttle(self):
30         self._test_throttling(self.FILES_THROTTLE)
31
32     def test_dir_deletion(self):
33         """
34         That when deleting a bunch of dentries and the containing
35         directory, everything gets purged.
36         Catches cases where the client might e.g. fail to trim
37         the unlinked dir from its cache.
38         """
39         file_count = 1000
40         create_script = dedent("""
41             import os
42
43             mount_path = "{mount_path}"
44             subdir = "delete_me"
45             size = {size}
46             file_count = {file_count}
47             os.mkdir(os.path.join(mount_path, subdir))
48             for i in xrange(0, file_count):
49                 filename = "{{0}}_{{1}}.bin".format(i, size)
50                 f = open(os.path.join(mount_path, subdir, filename), 'w')
51                 f.write(size * 'x')
52                 f.close()
53         """.format(
54             mount_path=self.mount_a.mountpoint,
55             size=1024,
56             file_count=file_count
57         ))
58
59         self.mount_a.run_python(create_script)
60
61         # That the dirfrag object is created
62         self.fs.mds_asok(["flush", "journal"])
63         dir_ino = self.mount_a.path_to_ino("delete_me")
64         self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0))
65
66         # Remove everything
67         self.mount_a.run_shell(["rm", "-rf", "delete_me"])
68         self.fs.mds_asok(["flush", "journal"])
69
70         # That all the removed files get created as strays
71         strays = self.get_mdc_stat("strays_created")
72         self.assertEqual(strays, file_count + 1)
73
74         # That the strays all get enqueued for purge
75         self.wait_until_equal(
76             lambda: self.get_mdc_stat("strays_enqueued"),
77             strays,
78             timeout=600
79
80         )
81
82         # That all the purge operations execute
83         self.wait_until_equal(
84             lambda: self.get_stat("purge_queue", "pq_executed"),
85             strays,
86             timeout=600
87         )
88
89         # That finally, the directory metadata object is gone
90         self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0))
91
92         # That finally, the data objects are all gone
93         self.await_data_pool_empty()
94
95     def _test_throttling(self, throttle_type):
96         self.data_log = []
97         try:
98             return self._do_test_throttling(throttle_type)
99         except:
100             for l in self.data_log:
101                 log.info(",".join([l_.__str__() for l_ in l]))
102             raise
103
104     def _do_test_throttling(self, throttle_type):
105         """
106         That the mds_max_purge_ops setting is respected
107         """
108
109         def set_throttles(files, ops):
110             """
111             Helper for updating ops/files limits, and calculating effective
112             ops_per_pg setting to give the same ops limit.
113             """
114             self.set_conf('mds', 'mds_max_purge_files', "%d" % files)
115             self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops)
116
117             pgs = self.fs.mon_manager.get_pool_property(
118                 self.fs.get_data_pool_name(),
119                 "pg_num"
120             )
121             ops_per_pg = float(ops) / pgs
122             self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg)
123
124         # Test conditions depend on what we're going to be exercising.
125         # * Lift the threshold on whatever throttle we are *not* testing, so
126         #   that the throttle of interest is the one that will be the bottleneck
127         # * Create either many small files (test file count throttling) or fewer
128         #   large files (test op throttling)
129         if throttle_type == self.OPS_THROTTLE:
130             set_throttles(files=100000000, ops=16)
131             size_unit = 1024 * 1024  # big files, generate lots of ops
132             file_multiplier = 100
133         elif throttle_type == self.FILES_THROTTLE:
134             # The default value of file limit is pretty permissive, so to avoid
135             # the test running too fast, create lots of files and set the limit
136             # pretty low.
137             set_throttles(ops=100000000, files=6)
138             size_unit = 1024  # small, numerous files
139             file_multiplier = 200
140         else:
141             raise NotImplemented(throttle_type)
142
143         # Pick up config changes
144         self.fs.mds_fail_restart()
145         self.fs.wait_for_daemons()
146
147         create_script = dedent("""
148             import os
149
150             mount_path = "{mount_path}"
151             subdir = "delete_me"
152             size_unit = {size_unit}
153             file_multiplier = {file_multiplier}
154             os.mkdir(os.path.join(mount_path, subdir))
155             for i in xrange(0, file_multiplier):
156                 for size in xrange(0, {size_range}*size_unit, size_unit):
157                     filename = "{{0}}_{{1}}.bin".format(i, size / size_unit)
158                     f = open(os.path.join(mount_path, subdir, filename), 'w')
159                     f.write(size * 'x')
160                     f.close()
161         """.format(
162             mount_path=self.mount_a.mountpoint,
163             size_unit=size_unit,
164             file_multiplier=file_multiplier,
165             size_range=self.throttle_workload_size_range
166         ))
167
168         self.mount_a.run_python(create_script)
169
170         # We will run the deletion in the background, to reduce the risk of it completing before
171         # we have started monitoring the stray statistics.
172         def background():
173             self.mount_a.run_shell(["rm", "-rf", "delete_me"])
174             self.fs.mds_asok(["flush", "journal"])
175
176         background_thread = gevent.spawn(background)
177
178         total_inodes = file_multiplier * self.throttle_workload_size_range + 1
179         mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds'))
180         mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds'))
181
182         # During this phase we look for the concurrent ops to exceed half
183         # the limit (a heuristic) and not exceed the limit (a correctness
184         # condition).
185         purge_timeout = 600
186         elapsed = 0
187         files_high_water = 0
188         ops_high_water = 0
189
190         while True:
191             stats = self.fs.mds_asok(['perf', 'dump'])
192             mdc_stats = stats['mds_cache']
193             pq_stats = stats['purge_queue']
194             if elapsed >= purge_timeout:
195                 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats))
196
197             num_strays = mdc_stats['num_strays']
198             num_strays_purging = pq_stats['pq_executing']
199             num_purge_ops = pq_stats['pq_executing_ops']
200
201             self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops])
202
203             files_high_water = max(files_high_water, num_strays_purging)
204             ops_high_water = max(ops_high_water, num_purge_ops)
205
206             total_strays_created = mdc_stats['strays_created']
207             total_strays_purged = pq_stats['pq_executed']
208
209             if total_strays_purged == total_inodes:
210                 log.info("Complete purge in {0} seconds".format(elapsed))
211                 break
212             elif total_strays_purged > total_inodes:
213                 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats))
214             else:
215                 if throttle_type == self.OPS_THROTTLE:
216                     # 11 is filer_max_purge_ops plus one for the backtrace:
217                     # limit is allowed to be overshot by this much.
218                     if num_purge_ops > mds_max_purge_ops + 11:
219                         raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
220                             num_purge_ops, mds_max_purge_ops
221                         ))
222                 elif throttle_type == self.FILES_THROTTLE:
223                     if num_strays_purging > mds_max_purge_files:
224                         raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
225                             num_strays_purging, mds_max_purge_files
226                         ))
227                 else:
228                     raise NotImplemented(throttle_type)
229
230                 log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
231                     num_strays_purging, num_strays,
232                     total_strays_purged, total_strays_created
233                 ))
234                 time.sleep(1)
235                 elapsed += 1
236
237         background_thread.join()
238
239         # Check that we got up to a respectable rate during the purge.  This is totally
240         # racy, but should be safeish unless the cluster is pathologically slow, or
241         # insanely fast such that the deletions all pass before we have polled the
242         # statistics.
243         if throttle_type == self.OPS_THROTTLE:
244             if ops_high_water < mds_max_purge_ops / 2:
245                 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
246                     ops_high_water, mds_max_purge_ops
247                 ))
248         elif throttle_type == self.FILES_THROTTLE:
249             if files_high_water < mds_max_purge_files / 2:
250                 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
251                     ops_high_water, mds_max_purge_files
252                 ))
253
254         # Sanity check all MDC stray stats
255         stats = self.fs.mds_asok(['perf', 'dump'])
256         mdc_stats = stats['mds_cache']
257         pq_stats = stats['purge_queue']
258         self.assertEqual(mdc_stats['num_strays'], 0)
259         self.assertEqual(mdc_stats['num_strays_delayed'], 0)
260         self.assertEqual(pq_stats['pq_executing'], 0)
261         self.assertEqual(pq_stats['pq_executing_ops'], 0)
262         self.assertEqual(mdc_stats['strays_created'], total_inodes)
263         self.assertEqual(mdc_stats['strays_enqueued'], total_inodes)
264         self.assertEqual(pq_stats['pq_executed'], total_inodes)
265
266     def get_mdc_stat(self, name, mds_id=None):
267         return self.get_stat("mds_cache", name, mds_id)
268
269     def get_stat(self, subsys, name, mds_id=None):
270         return self.fs.mds_asok(['perf', 'dump', subsys, name],
271                                 mds_id=mds_id)[subsys][name]
272
273     def _wait_for_counter(self, subsys, counter, expect_val, timeout=60,
274                           mds_id=None):
275         self.wait_until_equal(
276             lambda: self.get_stat(subsys, counter, mds_id),
277             expect_val=expect_val, timeout=timeout,
278             reject_fn=lambda x: x > expect_val
279         )
280
281     def test_open_inode(self):
282         """
283         That the case of a dentry unlinked while a client holds an
284         inode open is handled correctly.
285
286         The inode should be moved into a stray dentry, while the original
287         dentry and directory should be purged.
288
289         The inode's data should be purged when the client eventually closes
290         it.
291         """
292         mount_a_client_id = self.mount_a.get_global_id()
293
294         # Write some bytes to a file
295         size_mb = 8
296
297         # Hold the file open
298         p = self.mount_a.open_background("open_file")
299         self.mount_a.write_n_mb("open_file", size_mb)
300         open_file_ino = self.mount_a.path_to_ino("open_file")
301
302         self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
303
304         # Unlink the dentry
305         self.mount_a.run_shell(["rm", "-f", "open_file"])
306
307         # Wait to see the stray count increment
308         self.wait_until_equal(
309             lambda: self.get_mdc_stat("num_strays"),
310             expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
311
312         # See that while the stray count has incremented, none have passed
313         # on to the purge queue
314         self.assertEqual(self.get_mdc_stat("strays_created"), 1)
315         self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
316
317         # See that the client still holds 2 caps
318         self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
319
320         # See that the data objects remain in the data pool
321         self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024))
322
323         # Now close the file
324         self.mount_a.kill_background(p)
325
326         # Wait to see the client cap count decrement
327         self.wait_until_equal(
328             lambda: self.get_session(mount_a_client_id)['num_caps'],
329             expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1
330         )
331         # Wait to see the purge counter increment, stray count go to zero
332         self._wait_for_counter("mds_cache", "strays_enqueued", 1)
333         self.wait_until_equal(
334             lambda: self.get_mdc_stat("num_strays"),
335             expect_val=0, timeout=6, reject_fn=lambda x: x > 1
336         )
337         self._wait_for_counter("purge_queue", "pq_executed", 1)
338
339         # See that the data objects no longer exist
340         self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024))
341
342         self.await_data_pool_empty()
343
344     def test_hardlink_reintegration(self):
345         """
346         That removal of primary dentry of hardlinked inode results
347         in reintegration of inode into the previously-remote dentry,
348         rather than lingering as a stray indefinitely.
349         """
350         # Write some bytes to file_a
351         size_mb = 8
352         self.mount_a.run_shell(["mkdir", "dir_1"])
353         self.mount_a.write_n_mb("dir_1/file_a", size_mb)
354         ino = self.mount_a.path_to_ino("dir_1/file_a")
355
356         # Create a hardlink named file_b
357         self.mount_a.run_shell(["mkdir", "dir_2"])
358         self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
359         self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino)
360
361         # Flush journal
362         self.fs.mds_asok(['flush', 'journal'])
363
364         # See that backtrace for the file points to the file_a path
365         pre_unlink_bt = self.fs.read_backtrace(ino)
366         self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a")
367
368         # empty mds cache. otherwise mds reintegrates stray when unlink finishes
369         self.mount_a.umount_wait()
370         self.fs.mds_asok(['flush', 'journal'])
371         self.fs.mds_fail_restart()
372         self.fs.wait_for_daemons()
373         self.mount_a.mount()
374
375         # Unlink file_a
376         self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"])
377
378         # See that a stray was created
379         self.assertEqual(self.get_mdc_stat("num_strays"), 1)
380         self.assertEqual(self.get_mdc_stat("strays_created"), 1)
381
382         # Wait, see that data objects are still present (i.e. that the
383         # stray did not advance to purging given time)
384         time.sleep(30)
385         self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
386         self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
387
388         # See that before reintegration, the inode's backtrace points to a stray dir
389         self.fs.mds_asok(['flush', 'journal'])
390         self.assertTrue(self.get_backtrace_path(ino).startswith("stray"))
391
392         last_reintegrated = self.get_mdc_stat("strays_reintegrated")
393
394         # Do a metadata operation on the remaining link (mv is heavy handed, but
395         # others like touch may be satisfied from caps without poking MDS)
396         self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
397
398         # Stray reintegration should happen as a result of the eval_remote call
399         # on responding to a client request.
400         self.wait_until_equal(
401             lambda: self.get_mdc_stat("num_strays"),
402             expect_val=0,
403             timeout=60
404         )
405
406         # See the reintegration counter increment
407         curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
408         self.assertGreater(curr_reintegrated, last_reintegrated)
409         last_reintegrated = curr_reintegrated
410
411         # Flush the journal
412         self.fs.mds_asok(['flush', 'journal'])
413
414         # See that the backtrace for the file points to the remaining link's path
415         post_reint_bt = self.fs.read_backtrace(ino)
416         self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c")
417
418         # mds should reintegrates stray when unlink finishes
419         self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
420         self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"])
421
422         # Stray reintegration should happen as a result of the notify_stray call
423         # on completion of unlink
424         self.wait_until_equal(
425             lambda: self.get_mdc_stat("num_strays"),
426             expect_val=0,
427             timeout=60
428         )
429
430         # See the reintegration counter increment
431         curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
432         self.assertGreater(curr_reintegrated, last_reintegrated)
433         last_reintegrated = curr_reintegrated
434
435         # Flush the journal
436         self.fs.mds_asok(['flush', 'journal'])
437
438         # See that the backtrace for the file points to the newest link's path
439         post_reint_bt = self.fs.read_backtrace(ino)
440         self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d")
441
442         # Now really delete it
443         self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"])
444         self._wait_for_counter("mds_cache", "strays_enqueued", 1)
445         self._wait_for_counter("purge_queue", "pq_executed", 1)
446
447         self.assert_purge_idle()
448         self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
449
450         # We caused the inode to go stray 3 times
451         self.assertEqual(self.get_mdc_stat("strays_created"), 3)
452         # We purged it at the last
453         self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1)
454
455     def test_mv_hardlink_cleanup(self):
456         """
457         That when doing a rename from A to B, and B has hardlinks,
458         then we make a stray for B which is then reintegrated
459         into one of his hardlinks.
460         """
461         # Create file_a, file_b, and a hardlink to file_b
462         size_mb = 8
463         self.mount_a.write_n_mb("file_a", size_mb)
464         file_a_ino = self.mount_a.path_to_ino("file_a")
465
466         self.mount_a.write_n_mb("file_b", size_mb)
467         file_b_ino = self.mount_a.path_to_ino("file_b")
468
469         self.mount_a.run_shell(["ln", "file_b", "linkto_b"])
470         self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino)
471
472         # mv file_a file_b
473         self.mount_a.run_shell(["mv", "file_a", "file_b"])
474
475         # Stray reintegration should happen as a result of the notify_stray call on
476         # completion of rename
477         self.wait_until_equal(
478             lambda: self.get_mdc_stat("num_strays"),
479             expect_val=0,
480             timeout=60
481         )
482
483         self.assertEqual(self.get_mdc_stat("strays_created"), 1)
484         self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1)
485
486         # No data objects should have been deleted, as both files still have linkage.
487         self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
488         self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024))
489
490         self.fs.mds_asok(['flush', 'journal'])
491
492         post_reint_bt = self.fs.read_backtrace(file_b_ino)
493         self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b")
494
495     def _setup_two_ranks(self):
496         # Set up two MDSs
497         self.fs.set_max_mds(2)
498
499         # See that we have two active MDSs
500         self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
501                               reject_fn=lambda v: v > 2 or v < 1)
502
503         active_mds_names = self.fs.get_active_names()
504         rank_0_id = active_mds_names[0]
505         rank_1_id = active_mds_names[1]
506         log.info("Ranks 0 and 1 are {0} and {1}".format(
507             rank_0_id, rank_1_id))
508
509         # Get rid of other MDS daemons so that it's easier to know which
510         # daemons to expect in which ranks after restarts
511         for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}:
512             self.mds_cluster.mds_stop(unneeded_mds)
513             self.mds_cluster.mds_fail(unneeded_mds)
514
515         return rank_0_id, rank_1_id
516
517     def _force_migrate(self, to_id, path, watch_ino):
518         """
519         :param to_id: MDS id to move it to
520         :param path: Filesystem path (string) to move
521         :param watch_ino: Inode number to look for at destination to confirm move
522         :return: None
523         """
524         self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path])
525
526         # Poll the MDS cache dump to watch for the export completing
527         migrated = False
528         migrate_timeout = 60
529         migrate_elapsed = 0
530         while not migrated:
531             data = self.fs.mds_asok(["dump", "cache"], to_id)
532             for inode_data in data:
533                 if inode_data['ino'] == watch_ino:
534                     log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2)))
535                     if inode_data['is_auth'] is True:
536                         migrated = True
537                     break
538
539             if not migrated:
540                 if migrate_elapsed > migrate_timeout:
541                     raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed))
542                 else:
543                     migrate_elapsed += 1
544                     time.sleep(1)
545
546     def _is_stopped(self, rank):
547         mds_map = self.fs.get_mds_map()
548         return rank not in [i['rank'] for i in mds_map['info'].values()]
549
550     def test_purge_on_shutdown(self):
551         """
552         That when an MDS rank is shut down, its purge queue is
553         drained in the process.
554         """
555         rank_0_id, rank_1_id = self._setup_two_ranks()
556
557         self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
558         self.mds_cluster.mds_fail_restart(rank_1_id)
559         self.fs.wait_for_daemons()
560
561         file_count = 5
562
563         self.mount_a.create_n_files("delete_me/file", file_count)
564
565         self._force_migrate(rank_1_id, "delete_me",
566                             self.mount_a.path_to_ino("delete_me/file_0"))
567
568         self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
569         self.mount_a.umount_wait()
570
571         # See all the strays go into purge queue
572         self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id)
573         self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id)
574         self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0)
575
576         # See nothing get purged from the purge queue (yet)
577         time.sleep(10)
578         self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
579
580         # Shut down rank 1
581         self.fs.set_max_mds(1)
582         self.fs.deactivate(1)
583
584         # It shouldn't proceed past stopping because its still not allowed
585         # to purge
586         time.sleep(10)
587         self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
588         self.assertFalse(self._is_stopped(1))
589
590         # Permit the daemon to start purging again
591         self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id),
592                                             'injectargs',
593                                             "--mds_max_purge_files 100")
594
595         # It should now proceed through shutdown
596         self.wait_until_true(
597             lambda: self._is_stopped(1),
598             timeout=60
599         )
600
601         # ...and in the process purge all that data
602         self.await_data_pool_empty()
603
604     def test_migration_on_shutdown(self):
605         """
606         That when an MDS rank is shut down, any non-purgeable strays
607         get migrated to another rank.
608         """
609
610         rank_0_id, rank_1_id = self._setup_two_ranks()
611
612         # Create a non-purgeable stray in a ~mds1 stray directory
613         # by doing a hard link and deleting the original file
614         self.mount_a.run_shell(["mkdir", "dir_1", "dir_2"])
615         self.mount_a.run_shell(["touch", "dir_1/original"])
616         self.mount_a.run_shell(["ln", "dir_1/original", "dir_2/linkto"])
617
618         self._force_migrate(rank_1_id, "dir_1",
619                             self.mount_a.path_to_ino("dir_1/original"))
620
621         # empty mds cache. otherwise mds reintegrates stray when unlink finishes
622         self.mount_a.umount_wait()
623         self.fs.mds_asok(['flush', 'journal'], rank_0_id)
624         self.fs.mds_asok(['flush', 'journal'], rank_1_id)
625         self.fs.mds_fail_restart()
626         self.fs.wait_for_daemons()
627
628         active_mds_names = self.fs.get_active_names()
629         rank_0_id = active_mds_names[0]
630         rank_1_id = active_mds_names[1]
631
632         self.mount_a.mount()
633
634         self.mount_a.run_shell(["rm", "-f", "dir_1/original"])
635         self.mount_a.umount_wait()
636
637         self._wait_for_counter("mds_cache", "strays_created", 1,
638                                mds_id=rank_1_id)
639
640         # Shut down rank 1
641         self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1")
642         self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1")
643
644         # Wait til we get to a single active MDS mdsmap state
645         self.wait_until_true(lambda: self._is_stopped(1), timeout=120)
646
647         # See that the stray counter on rank 0 has incremented
648         self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
649
650     def assert_backtrace(self, ino, expected_path):
651         """
652         Assert that the backtrace in the data pool for an inode matches
653         an expected /foo/bar path.
654         """
655         expected_elements = expected_path.strip("/").split("/")
656         bt = self.fs.read_backtrace(ino)
657         actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']]))
658         self.assertListEqual(expected_elements, actual_elements)
659
660     def get_backtrace_path(self, ino):
661         bt = self.fs.read_backtrace(ino)
662         elements = reversed([dn['dname'] for dn in bt['ancestors']])
663         return "/".join(elements)
664
665     def assert_purge_idle(self):
666         """
667         Assert that the MDS perf counters indicate no strays exist and
668         no ongoing purge activity.  Sanity check for when PurgeQueue should
669         be idle.
670         """
671         mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
672         pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
673         self.assertEqual(mdc_stats["num_strays"], 0)
674         self.assertEqual(mdc_stats["num_strays_delayed"], 0)
675         self.assertEqual(pq_stats["pq_executing"], 0)
676         self.assertEqual(pq_stats["pq_executing_ops"], 0)
677
678     def test_mv_cleanup(self):
679         """
680         That when doing a rename from A to B, and B has no hardlinks,
681         then we make a stray for B and purge him.
682         """
683         # Create file_a and file_b, write some to both
684         size_mb = 8
685         self.mount_a.write_n_mb("file_a", size_mb)
686         file_a_ino = self.mount_a.path_to_ino("file_a")
687         self.mount_a.write_n_mb("file_b", size_mb)
688         file_b_ino = self.mount_a.path_to_ino("file_b")
689
690         self.fs.mds_asok(['flush', 'journal'])
691         self.assert_backtrace(file_a_ino, "file_a")
692         self.assert_backtrace(file_b_ino, "file_b")
693
694         # mv file_a file_b
695         self.mount_a.run_shell(['mv', 'file_a', 'file_b'])
696
697         # See that stray counter increments
698         self.assertEqual(self.get_mdc_stat("strays_created"), 1)
699         # Wait for purge counter to increment
700         self._wait_for_counter("mds_cache", "strays_enqueued", 1)
701         self._wait_for_counter("purge_queue", "pq_executed", 1)
702
703         self.assert_purge_idle()
704
705         # file_b should have been purged
706         self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024))
707
708         # Backtrace should have updated from file_a to file_b
709         self.fs.mds_asok(['flush', 'journal'])
710         self.assert_backtrace(file_a_ino, "file_b")
711
712         # file_a's data should still exist
713         self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
714
715     def _pool_df(self, pool_name):
716         """
717         Return a dict like
718             {
719                 "kb_used": 0,
720                 "bytes_used": 0,
721                 "max_avail": 19630292406,
722                 "objects": 0
723             }
724
725         :param pool_name: Which pool (must exist)
726         """
727         out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty")
728         for p in json.loads(out)['pools']:
729             if p['name'] == pool_name:
730                 return p['stats']
731
732         raise RuntimeError("Pool '{0}' not found".format(pool_name))
733
734     def await_data_pool_empty(self):
735         self.wait_until_true(
736             lambda: self._pool_df(
737                 self.fs.get_data_pool_name()
738             )['objects'] == 0,
739             timeout=60)
740
741     def test_snapshot_remove(self):
742         """
743         That removal of a snapshot that references a now-unlinked file results
744         in purging on the stray for the file.
745         """
746         # Enable snapshots
747         self.fs.mon_manager.raw_cluster_cmd("mds", "set", "allow_new_snaps", "true",
748                                             "--yes-i-really-mean-it")
749
750         # Create a dir with a file in it
751         size_mb = 8
752         self.mount_a.run_shell(["mkdir", "snapdir"])
753         self.mount_a.run_shell(["mkdir", "snapdir/subdir"])
754         self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024)
755         file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a")
756
757         # Snapshot the dir
758         self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"])
759
760         # Cause the head revision to deviate from the snapshot
761         self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb)
762
763         # Flush the journal so that backtraces, dirfrag objects will actually be written
764         self.fs.mds_asok(["flush", "journal"])
765
766         # Unlink the file
767         self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
768         self.mount_a.run_shell(["rmdir", "snapdir/subdir"])
769
770         # Unmount the client because when I come back to check the data is still
771         # in the file I don't want to just see what's in the page cache.
772         self.mount_a.umount_wait()
773
774         self.assertEqual(self.get_mdc_stat("strays_created"), 2)
775
776         # FIXME: at this stage we see a purge and the stray count drops to
777         # zero, but there's actually still a stray, so at the very
778         # least the StrayManager stats code is slightly off
779
780         self.mount_a.mount()
781
782         # See that the data from the snapshotted revision of the file is still present
783         # and correct
784         self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024)
785
786         # Remove the snapshot
787         self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"])
788
789         # Purging file_a doesn't happen until after we've flushed the journal, because
790         # it is referenced by the snapshotted subdir, and the snapshot isn't really
791         # gone until the journal references to it are gone
792         self.fs.mds_asok(["flush", "journal"])
793
794         # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs.
795         # See also: http://tracker.ceph.com/issues/20072
796         self.wait_until_true(
797             lambda: self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024),
798             timeout=60
799         )
800
801         # See that a purge happens now
802         self._wait_for_counter("mds_cache", "strays_enqueued", 2)
803         self._wait_for_counter("purge_queue", "pq_executed", 2)
804
805         self.await_data_pool_empty()
806
807     def test_fancy_layout(self):
808         """
809         purge stray file with fancy layout
810         """
811
812         file_name = "fancy_layout_file"
813         self.mount_a.run_shell(["touch", file_name])
814
815         file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608"
816         self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout)
817
818         # 35MB requires 7 objects
819         size_mb = 35
820         self.mount_a.write_n_mb(file_name, size_mb)
821
822         self.mount_a.run_shell(["rm", "-f", file_name])
823         self.fs.mds_asok(["flush", "journal"])
824
825         # can't use self.fs.data_objects_absent here, it does not support fancy layout
826         self.await_data_pool_empty()
827
828     def test_dirfrag_limit(self):
829         """
830         That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
831
832         That fragmentation (forced) will allow more entries to be created.
833
834         That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged.
835         """
836
837         self.fs.set_allow_dirfrags(True)
838
839         LOW_LIMIT = 50
840         for mds in self.fs.get_daemon_names():
841             self.fs.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT)], mds)
842
843         try:
844             self.mount_a.run_python(dedent("""
845                 import os
846                 path = os.path.join("{path}", "subdir")
847                 os.mkdir(path)
848                 for n in range(0, {file_count}):
849                     open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
850                 """.format(
851             path=self.mount_a.mountpoint,
852             file_count=LOW_LIMIT+1
853             )))
854         except CommandFailedError:
855             pass # ENOSPAC
856         else:
857             raise RuntimeError("fragment size exceeded")
858
859         # Now test that we can go beyond the limit if we fragment the directory
860
861         self.mount_a.run_python(dedent("""
862             import os
863             path = os.path.join("{path}", "subdir2")
864             os.mkdir(path)
865             for n in range(0, {file_count}):
866                 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
867             dfd = os.open(path, os.O_DIRECTORY)
868             os.fsync(dfd)
869             """.format(
870         path=self.mount_a.mountpoint,
871         file_count=LOW_LIMIT
872         )))
873
874         # Ensure that subdir2 is fragmented
875         mds_id = self.fs.get_active_names()[0]
876         self.fs.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id)
877
878         # remount+flush (release client caps)
879         self.mount_a.umount_wait()
880         self.fs.mds_asok(["flush", "journal"], mds_id)
881         self.mount_a.mount()
882         self.mount_a.wait_until_mounted()
883
884         # Create 50% more files than the current fragment limit
885         self.mount_a.run_python(dedent("""
886             import os
887             path = os.path.join("{path}", "subdir2")
888             for n in range({file_count}, ({file_count}*3)//2):
889                 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
890             """.format(
891         path=self.mount_a.mountpoint,
892         file_count=LOW_LIMIT
893         )))
894
895         # Now test the stray directory size is limited and recovers
896         strays_before = self.get_mdc_stat("strays_created")
897         try:
898             self.mount_a.run_python(dedent("""
899                 import os
900                 path = os.path.join("{path}", "subdir3")
901                 os.mkdir(path)
902                 for n in range({file_count}):
903                     fpath = os.path.join(path, "%s" % n)
904                     f = open(fpath, 'w')
905                     f.write("%s" % n)
906                     f.close()
907                     os.unlink(fpath)
908                 """.format(
909             path=self.mount_a.mountpoint,
910             file_count=LOW_LIMIT*10 # 10 stray directories, should collide before this count
911             )))
912         except CommandFailedError:
913             pass # ENOSPAC
914         else:
915             raise RuntimeError("fragment size exceeded")
916
917         strays_after = self.get_mdc_stat("strays_created")
918         self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT)
919
920         self._wait_for_counter("mds_cache", "strays_enqueued", strays_after)
921         self._wait_for_counter("purge_queue", "pq_executed", strays_after)
922
923         self.mount_a.run_python(dedent("""
924             import os
925             path = os.path.join("{path}", "subdir4")
926             os.mkdir(path)
927             for n in range({file_count}):
928                 fpath = os.path.join(path, "%s" % n)
929                 f = open(fpath, 'w')
930                 f.write("%s" % n)
931                 f.close()
932                 os.unlink(fpath)
933             """.format(
934         path=self.mount_a.mountpoint,
935         file_count=LOW_LIMIT
936         )))
937
938     def test_purge_queue_upgrade(self):
939         """
940         That when starting on a system with no purge queue in the metadata
941         pool, we silently create one.
942         :return:
943         """
944
945         self.mds_cluster.mds_stop()
946         self.mds_cluster.mds_fail()
947         self.fs.rados(["rm", "500.00000000"])
948         self.mds_cluster.mds_restart()
949         self.fs.wait_for_daemons()
950
951     def test_purge_queue_op_rate(self):
952         """
953         A busy purge queue is meant to aggregate operations sufficiently
954         that our RADOS ops to the metadata pool are not O(files).  Check
955         that that is so.
956         :return:
957         """
958
959         # For low rates of deletion, the rate of metadata ops actually
960         # will be o(files), so to see the desired behaviour we have to give
961         # the system a significant quantity, i.e. an order of magnitude
962         # more than the number of files it will purge at one time.
963
964         max_purge_files = 2
965
966         self.set_conf('mds', 'mds_bal_frag', 'false')
967         self.set_conf('mds', 'mds_max_purge_files', "%d" % max_purge_files)
968         self.fs.mds_fail_restart()
969         self.fs.wait_for_daemons()
970
971         phase_1_files = 256
972         phase_2_files = 512
973
974         self.mount_a.run_shell(["mkdir", "phase1"])
975         self.mount_a.create_n_files("phase1/file", phase_1_files)
976
977         self.mount_a.run_shell(["mkdir", "phase2"])
978         self.mount_a.create_n_files("phase2/file", phase_2_files)
979
980         def unlink_and_count_ops(path, expected_deletions):
981             initial_ops = self.get_stat("objecter", "op")
982             initial_pq_executed = self.get_stat("purge_queue", "pq_executed")
983
984             self.mount_a.run_shell(["rm", "-rf", path])
985
986             self._wait_for_counter(
987                 "purge_queue", "pq_executed", initial_pq_executed + expected_deletions
988             )
989
990             final_ops = self.get_stat("objecter", "op")
991
992             # Calculation of the *overhead* operations, i.e. do not include
993             # the operations where we actually delete files.
994             return final_ops - initial_ops - expected_deletions
995
996         self.fs.mds_asok(['flush', 'journal'])
997         phase1_ops = unlink_and_count_ops("phase1/", phase_1_files + 1)
998
999         self.fs.mds_asok(['flush', 'journal'])
1000         phase2_ops = unlink_and_count_ops("phase2/", phase_2_files + 1)
1001
1002         log.info("Phase 1: {0}".format(phase1_ops))
1003         log.info("Phase 2: {0}".format(phase2_ops))
1004
1005         # The success criterion is that deleting double the number
1006         # of files doesn't generate double the number of overhead ops
1007         # -- this comparison is a rough approximation of that rule.
1008         self.assertTrue(phase2_ops < phase1_ops * 1.25)
1009
1010         # Finally, check that our activity did include properly quiescing
1011         # the queue (i.e. call to Journaler::write_head in the right place),
1012         # by restarting the MDS and checking that it doesn't try re-executing
1013         # any of the work we did.
1014         self.fs.mds_asok(['flush', 'journal'])  # flush to ensure no strays
1015                                                 # hanging around
1016         self.fs.mds_fail_restart()
1017         self.fs.wait_for_daemons()
1018         time.sleep(10)
1019         self.assertEqual(self.get_stat("purge_queue", "pq_executed"), 0)
1020
1021     def test_replicated_delete_speed(self):
1022         """
1023         That deletions of replicated metadata are not pathologically slow
1024         """
1025         rank_0_id, rank_1_id = self._setup_two_ranks()
1026
1027         self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
1028         self.mds_cluster.mds_fail_restart(rank_1_id)
1029         self.fs.wait_for_daemons()
1030
1031         file_count = 10
1032
1033         self.mount_a.create_n_files("delete_me/file", file_count)
1034
1035         self._force_migrate(rank_1_id, "delete_me",
1036                             self.mount_a.path_to_ino("delete_me/file_0"))
1037
1038         begin = datetime.datetime.now()
1039         self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
1040         end = datetime.datetime.now()
1041
1042         # What we're really checking here is that we are completing client
1043         # operations immediately rather than delaying until the next tick.
1044         tick_period = float(self.fs.get_config("mds_tick_interval",
1045                                                service_type="mds"))
1046
1047         duration = (end - begin).total_seconds()
1048         self.assertLess(duration, (file_count * tick_period) * 0.25)
1049