Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / StrayManager.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2015 Red Hat
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15
16 #include "common/perf_counters.h"
17
18 #include "mds/MDSRank.h"
19 #include "mds/MDCache.h"
20 #include "mds/MDLog.h"
21 #include "mds/CDir.h"
22 #include "mds/CDentry.h"
23 #include "events/EUpdate.h"
24 #include "messages/MClientRequest.h"
25
26 #include "StrayManager.h"
27
28 #define dout_context g_ceph_context
29 #define dout_subsys ceph_subsys_mds
30 #undef dout_prefix
31 #define dout_prefix _prefix(_dout, mds)
32 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
33   return *_dout << "mds." << mds->get_nodeid() << ".cache.strays ";
34 }
35
36 class StrayManagerIOContext : public virtual MDSIOContextBase {
37 protected:
38   StrayManager *sm;
39   MDSRank *get_mds() override
40   {
41     return sm->mds;
42   }
43 public:
44   explicit StrayManagerIOContext(StrayManager *sm_) : sm(sm_) {}
45 };
46
47 class StrayManagerLogContext : public virtual MDSLogContextBase {
48 protected:
49   StrayManager *sm;
50   MDSRank *get_mds() override
51   {
52     return sm->mds;
53   }
54 public:
55   explicit StrayManagerLogContext(StrayManager *sm_) : sm(sm_) {}
56 };
57
58 class StrayManagerContext : public virtual MDSInternalContextBase {
59 protected:
60   StrayManager *sm;
61   MDSRank *get_mds() override
62   {
63     return sm->mds;
64   }
65 public:
66   explicit StrayManagerContext(StrayManager *sm_) : sm(sm_) {}
67 };
68
69
70 /**
71  * Context wrapper for _purge_stray_purged completion
72  */
73 class C_IO_PurgeStrayPurged : public StrayManagerIOContext {
74   CDentry *dn;
75   bool only_head;
76 public:
77   C_IO_PurgeStrayPurged(StrayManager *sm_, CDentry *d, bool oh) : 
78     StrayManagerIOContext(sm_), dn(d), only_head(oh) { }
79   void finish(int r) override {
80     assert(r == 0 || r == -ENOENT);
81     sm->_purge_stray_purged(dn, only_head);
82   }
83 };
84
85
86 void StrayManager::purge(CDentry *dn)
87 {
88   CDentry::linkage_t *dnl = dn->get_projected_linkage();
89   CInode *in = dnl->get_inode();
90   dout(10) << __func__ << " " << *dn << " " << *in << dendl;
91   assert(!dn->is_replicated());
92
93   // CHEAT.  there's no real need to journal our intent to purge, since
94   // that is implicit in the dentry's presence and non-use in the stray
95   // dir.  on recovery, we'll need to re-eval all strays anyway.
96   
97   SnapContext nullsnapc;
98
99   PurgeItem item;
100   item.ino = in->inode.ino;
101   if (in->is_dir()) {
102     item.action = PurgeItem::PURGE_DIR;
103     item.fragtree = in->dirfragtree;
104   } else {
105     item.action = PurgeItem::PURGE_FILE;
106
107     const SnapContext *snapc;
108     SnapRealm *realm = in->find_snaprealm();
109     if (realm) {
110       dout(10) << " realm " << *realm << dendl;
111       snapc = &realm->get_snap_context();
112     } else {
113       dout(10) << " NO realm, using null context" << dendl;
114       snapc = &nullsnapc;
115       assert(in->last == CEPH_NOSNAP);
116     }
117
118     uint64_t to = 0;
119     if (in->is_file()) {
120       to = in->inode.get_max_size();
121       to = MAX(in->inode.size, to);
122       // when truncating a file, the filer does not delete stripe objects that are
123       // truncated to zero. so we need to purge stripe objects up to the max size
124       // the file has ever been.
125       to = MAX(in->inode.max_size_ever, to);
126     }
127
128     inode_t *pi = in->get_projected_inode();
129
130     item.size = to;
131     item.layout = pi->layout;
132     item.old_pools = pi->old_pools;
133     item.snapc = *snapc;
134   }
135
136   purge_queue.push(item, new C_IO_PurgeStrayPurged(
137         this, dn, false));
138 }
139
140 class C_PurgeStrayLogged : public StrayManagerLogContext {
141   CDentry *dn;
142   version_t pdv;
143   LogSegment *ls;
144 public:
145   C_PurgeStrayLogged(StrayManager *sm_, CDentry *d, version_t v, LogSegment *s) : 
146     StrayManagerLogContext(sm_), dn(d), pdv(v), ls(s) { }
147   void finish(int r) override {
148     sm->_purge_stray_logged(dn, pdv, ls);
149   }
150 };
151
152 class C_TruncateStrayLogged : public StrayManagerLogContext {
153   CDentry *dn;
154   LogSegment *ls;
155 public:
156   C_TruncateStrayLogged(StrayManager *sm, CDentry *d, LogSegment *s) :
157     StrayManagerLogContext(sm), dn(d), ls(s) { }
158   void finish(int r) override {
159     sm->_truncate_stray_logged(dn, ls);
160   }
161 };
162
163 void StrayManager::_purge_stray_purged(
164     CDentry *dn, bool only_head)
165 {
166   CInode *in = dn->get_projected_linkage()->get_inode();
167   dout(10) << "_purge_stray_purged " << *dn << " " << *in << dendl;
168
169   logger->inc(l_mdc_strays_enqueued);
170   num_strays_enqueuing--;
171   logger->set(l_mdc_num_strays_enqueuing, num_strays_enqueuing);
172
173   if (only_head) {
174     /* This was a ::truncate */
175     EUpdate *le = new EUpdate(mds->mdlog, "purge_stray truncate");
176     mds->mdlog->start_entry(le);
177     
178     inode_t *pi = in->project_inode();
179     pi->size = 0;
180     pi->max_size_ever = 0;
181     pi->client_ranges.clear();
182     pi->truncate_size = 0;
183     pi->truncate_from = 0;
184     pi->version = in->pre_dirty();
185
186     le->metablob.add_dir_context(dn->dir);
187     le->metablob.add_primary_dentry(dn, in, true);
188
189     mds->mdlog->submit_entry(le,
190         new C_TruncateStrayLogged(
191           this, dn, mds->mdlog->get_current_segment()));
192   } else {
193     if (in->get_num_ref() != (int)in->is_dirty() ||
194         dn->get_num_ref() != (int)dn->is_dirty() + !!in->get_num_ref() + 1/*PIN_PURGING*/) {
195       // Nobody should be taking new references to an inode when it
196       // is being purged (aside from it were 
197
198       derr << "Rogue reference after purge to " << *dn << dendl;
199       assert(0 == "rogue reference to purging inode");
200     }
201
202     // kill dentry.
203     version_t pdv = dn->pre_dirty();
204     dn->push_projected_linkage(); // NULL
205
206     EUpdate *le = new EUpdate(mds->mdlog, "purge_stray");
207     mds->mdlog->start_entry(le);
208
209     // update dirfrag fragstat, rstat
210     CDir *dir = dn->get_dir();
211     fnode_t *pf = dir->project_fnode();
212     pf->version = dir->pre_dirty();
213     if (in->is_dir())
214       pf->fragstat.nsubdirs--;
215     else
216       pf->fragstat.nfiles--;
217     pf->rstat.sub(in->inode.accounted_rstat);
218
219     le->metablob.add_dir_context(dn->dir);
220     EMetaBlob::dirlump& dl = le->metablob.add_dir(dn->dir, true);
221     le->metablob.add_null_dentry(dl, dn, true);
222     le->metablob.add_destroyed_inode(in->ino());
223
224     mds->mdlog->submit_entry(le, new C_PurgeStrayLogged(this, dn, pdv,
225           mds->mdlog->get_current_segment()));
226
227     logger->set(l_mdc_num_strays, num_strays);
228   }
229 }
230
231 void StrayManager::_purge_stray_logged(CDentry *dn, version_t pdv, LogSegment *ls)
232 {
233   CInode *in = dn->get_linkage()->get_inode();
234   dout(10) << "_purge_stray_logged " << *dn << " " << *in << dendl;
235
236   assert(!in->state_test(CInode::STATE_RECOVERING));
237
238   bool new_dn = dn->is_new();
239
240   // unlink
241   assert(dn->get_projected_linkage()->is_null());
242   dn->dir->unlink_inode(dn, !new_dn);
243   dn->pop_projected_linkage();
244   dn->mark_dirty(pdv, ls);
245
246   dn->dir->pop_and_dirty_projected_fnode(ls);
247
248   in->state_clear(CInode::STATE_ORPHAN);
249   dn->state_clear(CDentry::STATE_PURGING | CDentry::STATE_PURGINGPINNED);
250   dn->put(CDentry::PIN_PURGING);
251
252   // drop dentry?
253   if (new_dn) {
254     dout(20) << " dn is new, removing" << dendl;
255     dn->mark_clean();
256     dn->dir->remove_dentry(dn);
257   }
258
259   // drop inode
260   if (in->is_dirty())
261     in->mark_clean();
262   in->mdcache->remove_inode(in);
263 }
264
265 void StrayManager::enqueue(CDentry *dn, bool trunc)
266 {
267   CDentry::linkage_t *dnl = dn->get_projected_linkage();
268   assert(dnl);
269   CInode *in = dnl->get_inode();
270   assert(in);
271
272   /* We consider a stray to be purging as soon as it is enqueued, to avoid
273    * enqueing it twice */
274   dn->state_set(CDentry::STATE_PURGING);
275   in->state_set(CInode::STATE_PURGING);
276
277   /* We must clear this as soon as enqueuing it, to prevent the journal
278    * expiry code from seeing a dirty parent and trying to write a backtrace */
279   if (!trunc) {
280     if (in->is_dirty_parent()) {
281       in->clear_dirty_parent();
282     }
283   }
284
285   dout(20) << __func__ << ": purging dn: " << *dn << dendl;
286
287   if (!dn->state_test(CDentry::STATE_PURGINGPINNED)) {
288     dn->get(CDentry::PIN_PURGING);
289     dn->state_set(CDentry::STATE_PURGINGPINNED);
290   }
291
292   ++num_strays_enqueuing;
293   logger->set(l_mdc_num_strays_enqueuing, num_strays_enqueuing);
294
295   // Resources are available, acquire them and execute the purge
296   _enqueue(dn, trunc);
297
298   dout(10) << __func__ << ": purging this dentry immediately: "
299     << *dn << dendl;
300 }
301
302 class C_OpenSnapParents : public StrayManagerContext {
303   CDentry *dn;
304   bool trunc;
305   public:
306     C_OpenSnapParents(StrayManager *sm_, CDentry *dn_, bool t) :
307       StrayManagerContext(sm_), dn(dn_), trunc(t) { }
308     void finish(int r) override {
309       sm->_enqueue(dn, trunc);
310     }
311 };
312
313 void StrayManager::_enqueue(CDentry *dn, bool trunc)
314 {
315   assert(started);
316
317   CInode *in = dn->get_linkage()->get_inode();
318   if (in->snaprealm &&
319       !in->snaprealm->have_past_parents_open() &&
320       !in->snaprealm->open_parents(new C_OpenSnapParents(this, dn, trunc))) {
321     // this can happen if the dentry had been trimmed from cache.
322     return;
323   }
324
325   if (trunc) {
326     truncate(dn);
327   } else {
328     purge(dn);
329   }
330 }
331
332
333 void StrayManager::advance_delayed()
334 {
335   if (!started)
336     return;
337
338   for (elist<CDentry*>::iterator p = delayed_eval_stray.begin(); !p.end(); ) {
339     CDentry *dn = *p;
340     ++p;
341     dn->item_stray.remove_myself();
342     num_strays_delayed--;
343
344     if (dn->get_projected_linkage()->is_null()) {
345       /* A special case: a stray dentry can go null if its inode is being
346        * re-linked into another MDS's stray dir during a shutdown migration. */
347       dout(4) << __func__ << ": delayed dentry is now null: " << *dn << dendl;
348       continue;
349     }
350
351     const bool purging = eval_stray(dn);
352     if (!purging) {
353       derr << "Dentry " << *dn << " was purgeable but no longer is!" << dendl;
354       /*
355        * This can happen if a stray is purgeable, but has gained an extra
356        * reference by virtue of having its backtrace updated.
357        * FIXME perhaps we could simplify this further by
358        * avoiding writing the backtrace of purge-ready strays, so
359        * that this code could be more rigid?
360        */
361     }
362   }
363   logger->set(l_mdc_num_strays_delayed, num_strays_delayed);
364 }
365
366 void StrayManager::set_num_strays(uint64_t num)
367 {
368   assert(!started);
369   num_strays = num;
370   logger->set(l_mdc_num_strays, num_strays);
371 }
372
373 void StrayManager::notify_stray_created()
374 {
375   num_strays++;
376   logger->set(l_mdc_num_strays, num_strays);
377   logger->inc(l_mdc_strays_created);
378 }
379
380 void StrayManager::notify_stray_removed()
381 {
382   num_strays--;
383   logger->set(l_mdc_num_strays, num_strays);
384 }
385
386 struct C_EvalStray : public StrayManagerContext {
387   CDentry *dn;
388   C_EvalStray(StrayManager *sm_, CDentry *d) : StrayManagerContext(sm_), dn(d) {}
389   void finish(int r) override {
390     sm->eval_stray(dn);
391   }
392 };
393
394 struct C_MDC_EvalStray : public StrayManagerContext {
395   CDentry *dn;
396   C_MDC_EvalStray(StrayManager *sm_, CDentry *d) : StrayManagerContext(sm_), dn(d) {}
397   void finish(int r) override {
398     sm->eval_stray(dn);
399   }
400 };
401
402 bool StrayManager::_eval_stray(CDentry *dn, bool delay)
403 {
404   dout(10) << "eval_stray " << *dn << dendl;
405   CDentry::linkage_t *dnl = dn->get_projected_linkage();
406   assert(dnl->is_primary());
407   dout(10) << " inode is " << *dnl->get_inode() << dendl;
408   CInode *in = dnl->get_inode();
409   assert(in);
410   assert(!in->state_test(CInode::STATE_REJOINUNDEF));
411
412   // The only dentries elegible for purging are those
413   // in the stray directories
414   assert(dn->get_dir()->get_inode()->is_stray());
415
416   // Inode may not pass through this function if it
417   // was already identified for purging (i.e. cannot
418   // call eval_stray() after purge()
419   assert(!dn->state_test(CDentry::STATE_PURGING));
420
421   if (!dn->is_auth()) {
422     return false;
423   }
424
425   if (!started)
426     delay = true;
427
428   if (dn->item_stray.is_on_list()) {
429     if (delay)
430       return false;
431
432     dn->item_stray.remove_myself();
433     num_strays_delayed--;
434     logger->set(l_mdc_num_strays_delayed, num_strays_delayed);
435   }
436
437   // purge?
438   if (in->inode.nlink == 0) {
439     // past snaprealm parents imply snapped dentry remote links.
440     // only important for directories.  normal file data snaps are handled
441     // by the object store.
442     if (in->snaprealm) {
443       if (!in->snaprealm->have_past_parents_open() &&
444           !in->snaprealm->open_parents(new C_MDC_EvalStray(this, dn))) {
445         return false;
446       }
447       in->snaprealm->prune_past_parents();
448       in->purge_stale_snap_data(in->snaprealm->get_snaps());
449     }
450     if (in->is_dir()) {
451       if (in->snaprealm && in->snaprealm->has_past_parents()) {
452         dout(20) << "  directory has past parents "
453                  << in->snaprealm->srnode.past_parents << dendl;
454         if (in->state_test(CInode::STATE_MISSINGOBJS)) {
455           mds->clog->error() << "previous attempt at committing dirfrag of ino "
456                              << in->ino() << " has failed, missing object";
457           mds->handle_write_error(-ENOENT);
458         }
459         return false;  // not until some snaps are deleted.
460       }
461
462       in->mdcache->clear_dirty_bits_for_stray(in);
463
464       if (!in->remote_parents.empty()) {
465         // unlink any stale remote snap dentry.
466         for (compact_set<CDentry*>::iterator p = in->remote_parents.begin();
467              p != in->remote_parents.end(); ) {
468           CDentry *remote_dn = *p;
469           ++p;
470           assert(remote_dn->last != CEPH_NOSNAP);
471           remote_dn->unlink_remote(remote_dn->get_linkage());
472         }
473       }
474     }
475     if (dn->is_replicated()) {
476       dout(20) << " replicated" << dendl;
477       return false;
478     }
479     if (dn->is_any_leases() || in->is_any_caps()) {
480       dout(20) << " caps | leases" << dendl;
481       return false;  // wait
482     }
483     if (in->state_test(CInode::STATE_NEEDSRECOVER) ||
484         in->state_test(CInode::STATE_RECOVERING)) {
485       dout(20) << " pending recovery" << dendl;
486       return false;  // don't mess with file size probing
487     }
488     if (in->get_num_ref() > (int)in->is_dirty() + (int)in->is_dirty_parent()) {
489       dout(20) << " too many inode refs" << dendl;
490       return false;
491     }
492     if (dn->get_num_ref() > (int)dn->is_dirty() + !!in->get_num_ref()) {
493       dout(20) << " too many dn refs" << dendl;
494       return false;
495     }
496     if (delay) {
497       if (!dn->item_stray.is_on_list()) {
498         delayed_eval_stray.push_back(&dn->item_stray);
499         num_strays_delayed++;
500         logger->set(l_mdc_num_strays_delayed, num_strays_delayed);
501       }
502     // don't purge multiversion inode with snap data
503     } else if (in->snaprealm && in->snaprealm->has_past_parents() &&
504               !in->old_inodes.empty()) {
505       // A file with snapshots: we will truncate the HEAD revision
506       // but leave the metadata intact.
507       assert(!in->is_dir());
508       dout(20) << " file has past parents "
509         << in->snaprealm->srnode.past_parents << dendl;
510       if (in->is_file() && in->get_projected_inode()->size > 0) {
511         enqueue(dn, true); // truncate head objects    
512       }
513     } else {
514       // A straightforward file, ready to be purged.  Enqueue it.
515       if (in->is_dir()) {
516         in->close_dirfrags();
517       }
518
519       enqueue(dn, false);
520     }
521
522     return true;
523   } else {
524     /*
525      * Where a stray has some links, they should be remotes, check
526      * if we can do anything with them if we happen to have them in
527      * cache.
528      */
529     _eval_stray_remote(dn, NULL);
530     return false;
531   }
532 }
533
534 void StrayManager::activate()
535 {
536   dout(10) << __func__ << dendl;
537   started = true;
538   purge_queue.activate();
539 }
540
541 bool StrayManager::eval_stray(CDentry *dn, bool delay)
542 {
543   // avoid nested eval_stray
544   if (dn->state_test(CDentry::STATE_EVALUATINGSTRAY))
545       return false;
546
547   dn->state_set(CDentry::STATE_EVALUATINGSTRAY);
548   bool ret = _eval_stray(dn, delay);
549   dn->state_clear(CDentry::STATE_EVALUATINGSTRAY);
550   return ret;
551 }
552
553 void StrayManager::eval_remote(CDentry *remote_dn)
554 {
555   dout(10) << __func__ << " " << *remote_dn << dendl;
556
557   CDentry::linkage_t *dnl = remote_dn->get_projected_linkage();
558   assert(dnl->is_remote());
559   CInode *in = dnl->get_inode();
560
561   if (!in) {
562     dout(20) << __func__ << ": no inode, cannot evaluate" << dendl;
563     return;
564   }
565
566   if (remote_dn->last != CEPH_NOSNAP) {
567     dout(20) << __func__ << ": snap dentry, cannot evaluate" << dendl;
568     return;
569   }
570
571   // refers to stray?
572   CDentry *primary_dn = in->get_projected_parent_dn();
573   assert(primary_dn != NULL);
574   if (primary_dn->get_dir()->get_inode()->is_stray()) {
575     _eval_stray_remote(primary_dn, remote_dn);
576   } else {
577     dout(20) << __func__ << ": inode's primary dn not stray" << dendl;
578   }
579 }
580
581 class C_RetryEvalRemote : public StrayManagerContext {
582   CDentry *dn;
583   public:
584     C_RetryEvalRemote(StrayManager *sm_, CDentry *dn_) :
585       StrayManagerContext(sm_), dn(dn_) {
586       dn->get(CDentry::PIN_PTRWAITER);
587     }
588     void finish(int r) override {
589       if (dn->get_projected_linkage()->is_remote())
590         sm->eval_remote(dn);
591       dn->put(CDentry::PIN_PTRWAITER);
592     }
593 };
594
595 void StrayManager::_eval_stray_remote(CDentry *stray_dn, CDentry *remote_dn)
596 {
597   dout(20) << __func__ << " " << *stray_dn << dendl;
598   assert(stray_dn != NULL);
599   assert(stray_dn->get_dir()->get_inode()->is_stray());
600   CDentry::linkage_t *stray_dnl = stray_dn->get_projected_linkage();
601   assert(stray_dnl->is_primary());
602   CInode *stray_in = stray_dnl->get_inode();
603   assert(stray_in->inode.nlink >= 1);
604   assert(stray_in->last == CEPH_NOSNAP);
605
606   /* If no remote_dn hinted, pick one arbitrarily */
607   if (remote_dn == NULL) {
608     if (!stray_in->remote_parents.empty()) {
609       for (compact_set<CDentry*>::iterator p = stray_in->remote_parents.begin();
610            p != stray_in->remote_parents.end();
611            ++p)
612         if ((*p)->last == CEPH_NOSNAP && !(*p)->is_projected()) {
613           if ((*p)->is_auth()) {
614             remote_dn = *p;
615             if (remote_dn->dir->can_auth_pin())
616               break;
617           } else if (!remote_dn) {
618             remote_dn = *p;
619           }
620         }
621     }
622     if (!remote_dn) {
623       dout(20) << __func__ << ": not reintegrating (no remote parents in cache)" << dendl;
624       return;
625     }
626   }
627   assert(remote_dn->last == CEPH_NOSNAP);
628   // NOTE: we repeat this check in _rename(), since our submission path is racey.
629   if (!remote_dn->is_projected()) {
630     if (remote_dn->is_auth()) {
631       if (remote_dn->dir->can_auth_pin()) {
632         reintegrate_stray(stray_dn, remote_dn);
633       } else {
634         remote_dn->dir->add_waiter(CDir::WAIT_UNFREEZE, new C_RetryEvalRemote(this, remote_dn));
635         dout(20) << __func__ << ": not reintegrating (can't authpin remote parent)" << dendl;
636       }
637
638     } else if (!remote_dn->is_auth() && stray_dn->is_auth()) {
639       migrate_stray(stray_dn, remote_dn->authority().first);
640     } else {
641       dout(20) << __func__ << ": not reintegrating" << dendl;
642     }
643   } else {
644     // don't do anything if the remote parent is projected, or we may
645     // break user-visible semantics!
646     dout(20) << __func__ << ": not reintegrating (projected)" << dendl;
647   }
648 }
649
650 void StrayManager::reintegrate_stray(CDentry *straydn, CDentry *rdn)
651 {
652   dout(10) << __func__ << " " << *straydn << " into " << *rdn << dendl;
653
654   logger->inc(l_mdc_strays_reintegrated);
655   
656   // rename it to another mds.
657   filepath src;
658   straydn->make_path(src);
659   filepath dst;
660   rdn->make_path(dst);
661
662   MClientRequest *req = new MClientRequest(CEPH_MDS_OP_RENAME);
663   req->set_filepath(dst);
664   req->set_filepath2(src);
665   req->set_tid(mds->issue_tid());
666
667   mds->send_message_mds(req, rdn->authority().first);
668 }
669  
670 void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to)
671 {
672   CInode *in = dn->get_projected_linkage()->get_inode();
673   assert(in);
674   CInode *diri = dn->dir->get_inode();
675   assert(diri->is_stray());
676   dout(10) << "migrate_stray from mds." << MDS_INO_STRAY_OWNER(diri->inode.ino)
677            << " to mds." << to
678            << " " << *dn << " " << *in << dendl;
679
680   logger->inc(l_mdc_strays_migrated);
681
682   // rename it to another mds.
683   filepath src;
684   dn->make_path(src);
685   assert(src.depth() == 2);
686
687   filepath dst(MDS_INO_MDSDIR(to));
688   dst.push_dentry(src[0]);
689   dst.push_dentry(src[1]);
690
691   MClientRequest *req = new MClientRequest(CEPH_MDS_OP_RENAME);
692   req->set_filepath(dst);
693   req->set_filepath2(src);
694   req->set_tid(mds->issue_tid());
695
696   mds->send_message_mds(req, to);
697 }
698
699 StrayManager::StrayManager(MDSRank *mds, PurgeQueue &purge_queue_)
700   : delayed_eval_stray(member_offset(CDentry, item_stray)),
701     mds(mds), logger(NULL), started(false), num_strays(0),
702     num_strays_delayed(0), num_strays_enqueuing(0),
703     purge_queue(purge_queue_)
704 {
705   assert(mds != NULL);
706 }
707
708 void StrayManager::truncate(CDentry *dn)
709 {
710   const CDentry::linkage_t *dnl = dn->get_projected_linkage();
711   const CInode *in = dnl->get_inode();
712   assert(in);
713   dout(10) << __func__ << ": " << *dn << " " << *in << dendl;
714   assert(!dn->is_replicated());
715
716   const SnapRealm *realm = in->find_snaprealm();
717   assert(realm);
718   dout(10) << " realm " << *realm << dendl;
719   const SnapContext *snapc = &realm->get_snap_context();
720
721   uint64_t to = in->inode.get_max_size();
722   to = MAX(in->inode.size, to);
723   // when truncating a file, the filer does not delete stripe objects that are
724   // truncated to zero. so we need to purge stripe objects up to the max size
725   // the file has ever been.
726   to = MAX(in->inode.max_size_ever, to);
727
728   assert(to > 0);
729
730   PurgeItem item;
731   item.ino = in->inode.ino;
732   item.layout = in->inode.layout;
733   item.snapc = *snapc;
734   item.size = to;
735
736   purge_queue.push(item, new C_IO_PurgeStrayPurged(
737         this, dn, true));
738 }
739
740 void StrayManager::_truncate_stray_logged(CDentry *dn, LogSegment *ls)
741 {
742   CInode *in = dn->get_projected_linkage()->get_inode();
743
744   dout(10) << __func__ << ": " << *dn << " " << *in << dendl;
745
746   dn->state_clear(CDentry::STATE_PURGING | CDentry::STATE_PURGINGPINNED);
747   dn->put(CDentry::PIN_PURGING);
748
749   in->pop_and_dirty_projected_inode(ls);
750
751   eval_stray(dn);
752 }
753