Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / CDir.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15
16 #include "include/types.h"
17
18 #include "CDir.h"
19 #include "CDentry.h"
20 #include "CInode.h"
21 #include "Mutation.h"
22
23 #include "MDSMap.h"
24 #include "MDSRank.h"
25 #include "MDCache.h"
26 #include "Locker.h"
27 #include "MDLog.h"
28 #include "LogSegment.h"
29
30 #include "common/bloom_filter.hpp"
31 #include "include/Context.h"
32 #include "common/Clock.h"
33
34 #include "osdc/Objecter.h"
35
36 #include "common/config.h"
37 #include "include/assert.h"
38 #include "include/compat.h"
39
40 #define dout_context g_ceph_context
41 #define dout_subsys ceph_subsys_mds
42 #undef dout_prefix
43 #define dout_prefix *_dout << "mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
44
45 int CDir::num_frozen_trees = 0;
46 int CDir::num_freezing_trees = 0;
47
48 class CDirContext : public MDSInternalContextBase
49 {
50 protected:
51   CDir *dir;
52   MDSRank* get_mds() override {return dir->cache->mds;}
53
54 public:
55   explicit CDirContext(CDir *d) : dir(d) {
56     assert(dir != NULL);
57   }
58 };
59
60
61 class CDirIOContext : public MDSIOContextBase
62 {
63 protected:
64   CDir *dir;
65   MDSRank* get_mds() override {return dir->cache->mds;}
66
67 public:
68   explicit CDirIOContext(CDir *d) : dir(d) {
69     assert(dir != NULL);
70   }
71 };
72
73
74 // PINS
75 //int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
76
77
78 ostream& operator<<(ostream& out, const CDir& dir)
79 {
80   out << "[dir " << dir.dirfrag() << " " << dir.get_path() << "/"
81       << " [" << dir.first << ",head]";
82   if (dir.is_auth()) {
83     out << " auth";
84     if (dir.is_replicated())
85       out << dir.get_replicas();
86
87     if (dir.is_projected())
88       out << " pv=" << dir.get_projected_version();
89     out << " v=" << dir.get_version();
90     out << " cv=" << dir.get_committing_version();
91     out << "/" << dir.get_committed_version();
92   } else {
93     mds_authority_t a = dir.authority();
94     out << " rep@" << a.first;
95     if (a.second != CDIR_AUTH_UNKNOWN)
96       out << "," << a.second;
97     out << "." << dir.get_replica_nonce();
98   }
99
100   if (dir.is_rep()) out << " REP";
101
102   if (dir.get_dir_auth() != CDIR_AUTH_DEFAULT) {
103     if (dir.get_dir_auth().second == CDIR_AUTH_UNKNOWN)
104       out << " dir_auth=" << dir.get_dir_auth().first;
105     else
106       out << " dir_auth=" << dir.get_dir_auth();
107   }
108   
109   if (dir.get_cum_auth_pins())
110     out << " ap=" << dir.get_auth_pins() 
111         << "+" << dir.get_dir_auth_pins()
112         << "+" << dir.get_nested_auth_pins();
113
114   out << " state=" << dir.get_state();
115   if (dir.state_test(CDir::STATE_COMPLETE)) out << "|complete";
116   if (dir.state_test(CDir::STATE_FREEZINGTREE)) out << "|freezingtree";
117   if (dir.state_test(CDir::STATE_FROZENTREE)) out << "|frozentree";
118   if (dir.state_test(CDir::STATE_AUXSUBTREE)) out << "|auxsubtree";
119   //if (dir.state_test(CDir::STATE_FROZENTREELEAF)) out << "|frozentreeleaf";
120   if (dir.state_test(CDir::STATE_FROZENDIR)) out << "|frozendir";
121   if (dir.state_test(CDir::STATE_FREEZINGDIR)) out << "|freezingdir";
122   if (dir.state_test(CDir::STATE_EXPORTBOUND)) out << "|exportbound";
123   if (dir.state_test(CDir::STATE_IMPORTBOUND)) out << "|importbound";
124   if (dir.state_test(CDir::STATE_BADFRAG)) out << "|badfrag";
125   if (dir.state_test(CDir::STATE_FRAGMENTING)) out << "|fragmenting";
126
127   // fragstat
128   out << " " << dir.fnode.fragstat;
129   if (!(dir.fnode.fragstat == dir.fnode.accounted_fragstat))
130     out << "/" << dir.fnode.accounted_fragstat;
131   if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
132     const fnode_t *pf = dir.get_projected_fnode();
133     out << "->" << pf->fragstat;
134     if (!(pf->fragstat == pf->accounted_fragstat))
135       out << "/" << pf->accounted_fragstat;
136   }
137   
138   // rstat
139   out << " " << dir.fnode.rstat;
140   if (!(dir.fnode.rstat == dir.fnode.accounted_rstat))
141     out << "/" << dir.fnode.accounted_rstat;
142   if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
143     const fnode_t *pf = dir.get_projected_fnode();
144     out << "->" << pf->rstat;
145     if (!(pf->rstat == pf->accounted_rstat))
146       out << "/" << pf->accounted_rstat;
147  }
148
149   out << " hs=" << dir.get_num_head_items() << "+" << dir.get_num_head_null();
150   out << ",ss=" << dir.get_num_snap_items() << "+" << dir.get_num_snap_null();
151   if (dir.get_num_dirty())
152     out << " dirty=" << dir.get_num_dirty();
153   
154   if (dir.get_num_ref()) {
155     out << " |";
156     dir.print_pin_set(out);
157   }
158
159   out << " " << &dir;
160   return out << "]";
161 }
162
163
164 void CDir::print(ostream& out) 
165 {
166   out << *this;
167 }
168
169
170
171
172 ostream& CDir::print_db_line_prefix(ostream& out) 
173 {
174   return out << ceph_clock_now() << " mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
175 }
176
177
178
179 // -------------------------------------------------------------------
180 // CDir
181
182 CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) :
183   cache(mdcache), inode(in), frag(fg),
184   first(2),
185   dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
186   projected_version(0),  item_dirty(this), item_new(this),
187   num_head_items(0), num_head_null(0),
188   num_snap_items(0), num_snap_null(0),
189   num_dirty(0), committing_version(0), committed_version(0),
190   dir_auth_pins(0), request_pins(0),
191   dir_rep(REP_NONE),
192   pop_me(ceph_clock_now()),
193   pop_nested(ceph_clock_now()),
194   pop_auth_subtree(ceph_clock_now()),
195   pop_auth_subtree_nested(ceph_clock_now()),
196   num_dentries_nested(0), num_dentries_auth_subtree(0),
197   num_dentries_auth_subtree_nested(0),
198   dir_auth(CDIR_AUTH_DEFAULT)
199 {
200   state = STATE_INITIAL;
201
202   memset(&fnode, 0, sizeof(fnode));
203
204   // auth
205   assert(in->is_dir());
206   if (auth) 
207     state |= STATE_AUTH;
208 }
209
210 /**
211  * Check the recursive statistics on size for consistency.
212  * If mds_debug_scatterstat is enabled, assert for correctness,
213  * otherwise just print out the mismatch and continue.
214  */
215 bool CDir::check_rstats(bool scrub)
216 {
217   if (!g_conf->mds_debug_scatterstat && !scrub)
218     return true;
219
220   dout(25) << "check_rstats on " << this << dendl;
221   if (!is_complete() || !is_auth() || is_frozen()) {
222     assert(!scrub);
223     dout(10) << "check_rstats bailing out -- incomplete or non-auth or frozen dir!" << dendl;
224     return true;
225   }
226
227   frag_info_t frag_info;
228   nest_info_t nest_info;
229   for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
230     if (i->second->last != CEPH_NOSNAP)
231       continue;
232     CDentry::linkage_t *dnl = i->second->get_linkage();
233     if (dnl->is_primary()) {
234       CInode *in = dnl->get_inode();
235       nest_info.add(in->inode.accounted_rstat);
236       if (in->is_dir())
237         frag_info.nsubdirs++;
238       else
239         frag_info.nfiles++;
240     } else if (dnl->is_remote())
241       frag_info.nfiles++;
242   }
243
244   bool good = true;
245   // fragstat
246   if(!frag_info.same_sums(fnode.fragstat)) {
247     dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl;
248     dout(1) << "get_num_head_items() = " << get_num_head_items()
249              << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
250              << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
251     good = false;
252   } else {
253     dout(20) << "get_num_head_items() = " << get_num_head_items()
254              << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
255              << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
256   }
257
258   // rstat
259   if (!nest_info.same_sums(fnode.rstat)) {
260     dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl;
261     dout(1) << "total of child dentrys: " << nest_info << dendl;
262     dout(1) << "my rstats:              " << fnode.rstat << dendl;
263     good = false;
264   } else {
265     dout(20) << "total of child dentrys: " << nest_info << dendl;
266     dout(20) << "my rstats:              " << fnode.rstat << dendl;
267   }
268
269   if (!good) {
270     if (!scrub) {
271       for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
272         CDentry *dn = i->second;
273         if (dn->get_linkage()->is_primary()) {
274           CInode *in = dn->get_linkage()->inode;
275           dout(1) << *dn << " rstat " << in->inode.accounted_rstat << dendl;
276         } else {
277           dout(1) << *dn << dendl;
278         }
279       }
280
281       assert(frag_info.nfiles == fnode.fragstat.nfiles);
282       assert(frag_info.nsubdirs == fnode.fragstat.nsubdirs);
283       assert(nest_info.rbytes == fnode.rstat.rbytes);
284       assert(nest_info.rfiles == fnode.rstat.rfiles);
285       assert(nest_info.rsubdirs == fnode.rstat.rsubdirs);
286     }
287   }
288   dout(10) << "check_rstats complete on " << this << dendl;
289   return good;
290 }
291
292 CDentry *CDir::lookup(const string& name, snapid_t snap)
293
294   dout(20) << "lookup (" << snap << ", '" << name << "')" << dendl;
295   map_t::iterator iter = items.lower_bound(dentry_key_t(snap, name.c_str(),
296                                                         inode->hash_dentry_name(name)));
297   if (iter == items.end())
298     return 0;
299   if (iter->second->name == name &&
300       iter->second->first <= snap &&
301       iter->second->last >= snap) {
302     dout(20) << "  hit -> " << iter->first << dendl;
303     return iter->second;
304   }
305   dout(20) << "  miss -> " << iter->first << dendl;
306   return 0;
307 }
308
309 CDentry *CDir::lookup_exact_snap(const string& name, snapid_t last) {
310   map_t::iterator p = items.find(dentry_key_t(last, name.c_str(),
311                                               inode->hash_dentry_name(name)));
312   if (p == items.end())
313     return NULL;
314   return p->second;
315 }
316
317 /***
318  * linking fun
319  */
320
321 CDentry* CDir::add_null_dentry(const string& dname,
322                                snapid_t first, snapid_t last)
323 {
324   // foreign
325   assert(lookup_exact_snap(dname, last) == 0);
326    
327   // create dentry
328   CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
329   if (is_auth()) 
330     dn->state_set(CDentry::STATE_AUTH);
331
332   cache->bottom_lru.lru_insert_mid(dn);
333   dn->state_set(CDentry::STATE_BOTTOMLRU);
334
335   dn->dir = this;
336   dn->version = get_projected_version();
337   
338   // add to dir
339   assert(items.count(dn->key()) == 0);
340   //assert(null_items.count(dn->name) == 0);
341
342   items[dn->key()] = dn;
343   if (last == CEPH_NOSNAP)
344     num_head_null++;
345   else
346     num_snap_null++;
347
348   if (state_test(CDir::STATE_DNPINNEDFRAG)) {
349     dn->get(CDentry::PIN_FRAGMENTING);
350     dn->state_set(CDentry::STATE_FRAGMENTING);
351   }    
352
353   dout(12) << "add_null_dentry " << *dn << dendl;
354
355   // pin?
356   if (get_num_any() == 1)
357     get(PIN_CHILD);
358   
359   assert(get_num_any() == items.size());
360   return dn;
361 }
362
363
364 CDentry* CDir::add_primary_dentry(const string& dname, CInode *in,
365                                   snapid_t first, snapid_t last) 
366 {
367   // primary
368   assert(lookup_exact_snap(dname, last) == 0);
369   
370   // create dentry
371   CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
372   if (is_auth()) 
373     dn->state_set(CDentry::STATE_AUTH);
374   if (is_auth() || !inode->is_stray()) {
375     cache->lru.lru_insert_mid(dn);
376   } else {
377     cache->bottom_lru.lru_insert_mid(dn);
378     dn->state_set(CDentry::STATE_BOTTOMLRU);
379   }
380
381   dn->dir = this;
382   dn->version = get_projected_version();
383   
384   // add to dir
385   assert(items.count(dn->key()) == 0);
386   //assert(null_items.count(dn->name) == 0);
387
388   items[dn->key()] = dn;
389
390   dn->get_linkage()->inode = in;
391   in->set_primary_parent(dn);
392
393   link_inode_work(dn, in);
394
395   if (dn->last == CEPH_NOSNAP)
396     num_head_items++;
397   else
398     num_snap_items++;
399   
400   if (state_test(CDir::STATE_DNPINNEDFRAG)) {
401     dn->get(CDentry::PIN_FRAGMENTING);
402     dn->state_set(CDentry::STATE_FRAGMENTING);
403   }    
404
405   dout(12) << "add_primary_dentry " << *dn << dendl;
406
407   // pin?
408   if (get_num_any() == 1)
409     get(PIN_CHILD);
410   assert(get_num_any() == items.size());
411   return dn;
412 }
413
414 CDentry* CDir::add_remote_dentry(const string& dname, inodeno_t ino, unsigned char d_type,
415                                  snapid_t first, snapid_t last) 
416 {
417   // foreign
418   assert(lookup_exact_snap(dname, last) == 0);
419
420   // create dentry
421   CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), ino, d_type, first, last);
422   if (is_auth()) 
423     dn->state_set(CDentry::STATE_AUTH);
424   cache->lru.lru_insert_mid(dn);
425
426   dn->dir = this;
427   dn->version = get_projected_version();
428   
429   // add to dir
430   assert(items.count(dn->key()) == 0);
431   //assert(null_items.count(dn->name) == 0);
432
433   items[dn->key()] = dn;
434   if (last == CEPH_NOSNAP)
435     num_head_items++;
436   else
437     num_snap_items++;
438
439   if (state_test(CDir::STATE_DNPINNEDFRAG)) {
440     dn->get(CDentry::PIN_FRAGMENTING);
441     dn->state_set(CDentry::STATE_FRAGMENTING);
442   }    
443
444   dout(12) << "add_remote_dentry " << *dn << dendl;
445
446   // pin?
447   if (get_num_any() == 1)
448     get(PIN_CHILD);
449   
450   assert(get_num_any() == items.size());
451   return dn;
452 }
453
454
455
456 void CDir::remove_dentry(CDentry *dn) 
457 {
458   dout(12) << "remove_dentry " << *dn << dendl;
459
460   // there should be no client leases at this point!
461   assert(dn->client_lease_map.empty());
462
463   if (state_test(CDir::STATE_DNPINNEDFRAG)) {
464     dn->put(CDentry::PIN_FRAGMENTING);
465     dn->state_clear(CDentry::STATE_FRAGMENTING);
466   }    
467
468   if (dn->get_linkage()->is_null()) {
469     if (dn->last == CEPH_NOSNAP)
470       num_head_null--;
471     else
472       num_snap_null--;
473   } else {
474     if (dn->last == CEPH_NOSNAP)
475       num_head_items--;
476     else
477       num_snap_items--;
478   }
479
480   if (!dn->get_linkage()->is_null())
481     // detach inode and dentry
482     unlink_inode_work(dn);
483   
484   // remove from list
485   assert(items.count(dn->key()) == 1);
486   items.erase(dn->key());
487
488   // clean?
489   if (dn->is_dirty())
490     dn->mark_clean();
491
492   if (dn->state_test(CDentry::STATE_BOTTOMLRU))
493     cache->bottom_lru.lru_remove(dn);
494   else
495     cache->lru.lru_remove(dn);
496   delete dn;
497
498   // unpin?
499   if (get_num_any() == 0)
500     put(PIN_CHILD);
501   assert(get_num_any() == items.size());
502 }
503
504 void CDir::link_remote_inode(CDentry *dn, CInode *in)
505 {
506   link_remote_inode(dn, in->ino(), IFTODT(in->get_projected_inode()->mode));
507 }
508
509 void CDir::link_remote_inode(CDentry *dn, inodeno_t ino, unsigned char d_type)
510 {
511   dout(12) << "link_remote_inode " << *dn << " remote " << ino << dendl;
512   assert(dn->get_linkage()->is_null());
513
514   dn->get_linkage()->set_remote(ino, d_type);
515
516   if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
517     cache->bottom_lru.lru_remove(dn);
518     cache->lru.lru_insert_mid(dn);
519     dn->state_clear(CDentry::STATE_BOTTOMLRU);
520   }
521
522   if (dn->last == CEPH_NOSNAP) {
523     num_head_items++;
524     num_head_null--;
525   } else {
526     num_snap_items++;
527     num_snap_null--;
528   }
529   assert(get_num_any() == items.size());
530 }
531
532 void CDir::link_primary_inode(CDentry *dn, CInode *in)
533 {
534   dout(12) << "link_primary_inode " << *dn << " " << *in << dendl;
535   assert(dn->get_linkage()->is_null());
536
537   dn->get_linkage()->inode = in;
538   in->set_primary_parent(dn);
539
540   link_inode_work(dn, in);
541
542   if (dn->state_test(CDentry::STATE_BOTTOMLRU) &&
543       (is_auth() || !inode->is_stray())) {
544     cache->bottom_lru.lru_remove(dn);
545     cache->lru.lru_insert_mid(dn);
546     dn->state_clear(CDentry::STATE_BOTTOMLRU);
547   }
548   
549   if (dn->last == CEPH_NOSNAP) {
550     num_head_items++;
551     num_head_null--;
552   } else {
553     num_snap_items++;
554     num_snap_null--;
555   }
556
557   assert(get_num_any() == items.size());
558 }
559
560 void CDir::link_inode_work( CDentry *dn, CInode *in)
561 {
562   assert(dn->get_linkage()->get_inode() == in);
563   assert(in->get_parent_dn() == dn);
564
565   // set inode version
566   //in->inode.version = dn->get_version();
567   
568   // pin dentry?
569   if (in->get_num_ref())
570     dn->get(CDentry::PIN_INODEPIN);
571   
572   // adjust auth pin count
573   if (in->auth_pins + in->nested_auth_pins)
574     dn->adjust_nested_auth_pins(in->auth_pins + in->nested_auth_pins, in->auth_pins, NULL);
575
576   // verify open snaprealm parent
577   if (in->snaprealm)
578     in->snaprealm->adjust_parent();
579   else if (in->is_any_caps())
580     in->move_to_realm(inode->find_snaprealm());
581 }
582
583 void CDir::unlink_inode(CDentry *dn, bool adjust_lru)
584 {
585   if (dn->get_linkage()->is_primary()) {
586     dout(12) << "unlink_inode " << *dn << " " << *dn->get_linkage()->get_inode() << dendl;
587   } else {
588     dout(12) << "unlink_inode " << *dn << dendl;
589   }
590
591   unlink_inode_work(dn);
592
593   if (adjust_lru && !dn->state_test(CDentry::STATE_BOTTOMLRU)) {
594     cache->lru.lru_remove(dn);
595     cache->bottom_lru.lru_insert_mid(dn);
596     dn->state_set(CDentry::STATE_BOTTOMLRU);
597   }
598
599   if (dn->last == CEPH_NOSNAP) {
600     num_head_items--;
601     num_head_null++;
602   } else {
603     num_snap_items--;
604     num_snap_null++;
605   }
606   assert(get_num_any() == items.size());
607 }
608
609
610 void CDir::try_remove_unlinked_dn(CDentry *dn)
611 {
612   assert(dn->dir == this);
613   assert(dn->get_linkage()->is_null());
614   
615   // no pins (besides dirty)?
616   if (dn->get_num_ref() != dn->is_dirty()) 
617     return;
618
619   // was the dn new?
620   if (dn->is_new()) {
621     dout(10) << "try_remove_unlinked_dn " << *dn << " in " << *this << dendl;
622     if (dn->is_dirty())
623       dn->mark_clean();
624     remove_dentry(dn);
625
626     // NOTE: we may not have any more dirty dentries, but the fnode
627     // still changed, so the directory must remain dirty.
628   }
629 }
630
631
632 void CDir::unlink_inode_work( CDentry *dn )
633 {
634   CInode *in = dn->get_linkage()->get_inode();
635
636   if (dn->get_linkage()->is_remote()) {
637     // remote
638     if (in) 
639       dn->unlink_remote(dn->get_linkage());
640
641     dn->get_linkage()->set_remote(0, 0);
642   } else if (dn->get_linkage()->is_primary()) {
643     // primary
644     // unpin dentry?
645     if (in->get_num_ref())
646       dn->put(CDentry::PIN_INODEPIN);
647     
648     // unlink auth_pin count
649     if (in->auth_pins + in->nested_auth_pins)
650       dn->adjust_nested_auth_pins(0 - (in->auth_pins + in->nested_auth_pins), 0 - in->auth_pins, NULL);
651     
652     // detach inode
653     in->remove_primary_parent(dn);
654     dn->get_linkage()->inode = 0;
655   } else {
656     assert(!dn->get_linkage()->is_null());
657   }
658 }
659
660 void CDir::add_to_bloom(CDentry *dn)
661 {
662   assert(dn->last == CEPH_NOSNAP);
663   if (!bloom) {
664     /* not create bloom filter for incomplete dir that was added by log replay */
665     if (!is_complete())
666       return;
667
668     /* don't maintain bloom filters in standby replay (saves cycles, and also
669      * avoids need to implement clearing it in EExport for #16924) */
670     if (cache->mds->is_standby_replay()) {
671       return;
672     }
673
674     unsigned size = get_num_head_items() + get_num_snap_items();
675     if (size < 100) size = 100;
676     bloom.reset(new bloom_filter(size, 1.0 / size, 0));
677   }
678   /* This size and false positive probability is completely random.*/
679   bloom->insert(dn->name.c_str(), dn->name.size());
680 }
681
682 bool CDir::is_in_bloom(const string& name)
683 {
684   if (!bloom)
685     return false;
686   return bloom->contains(name.c_str(), name.size());
687 }
688
689 void CDir::remove_null_dentries() {
690   dout(12) << "remove_null_dentries " << *this << dendl;
691
692   CDir::map_t::iterator p = items.begin();
693   while (p != items.end()) {
694     CDentry *dn = p->second;
695     ++p;
696     if (dn->get_linkage()->is_null() && !dn->is_projected())
697       remove_dentry(dn);
698   }
699
700   assert(num_snap_null == 0);
701   assert(num_head_null == 0);
702   assert(get_num_any() == items.size());
703 }
704
705 /** remove dirty null dentries for deleted directory. the dirfrag will be
706  *  deleted soon, so it's safe to not commit dirty dentries.
707  *
708  *  This is called when a directory is being deleted, a prerequisite
709  *  of which is that its children have been unlinked: we expect to only see
710  *  null, unprojected dentries here.
711  */
712 void CDir::try_remove_dentries_for_stray()
713 {
714   dout(10) << __func__ << dendl;
715   assert(get_parent_dir()->inode->is_stray());
716
717   // clear dirty only when the directory was not snapshotted
718   bool clear_dirty = !inode->snaprealm;
719
720   CDir::map_t::iterator p = items.begin();
721   while (p != items.end()) {
722     CDentry *dn = p->second;
723     ++p;
724     if (dn->last == CEPH_NOSNAP) {
725       assert(!dn->is_projected());
726       assert(dn->get_linkage()->is_null());
727       if (clear_dirty && dn->is_dirty())
728         dn->mark_clean();
729       // It's OK to remove lease prematurely because we will never link
730       // the dentry to inode again.
731       if (dn->is_any_leases())
732         dn->remove_client_leases(cache->mds->locker);
733       if (dn->get_num_ref() == 0)
734         remove_dentry(dn);
735     } else {
736       assert(!dn->is_projected());
737       CDentry::linkage_t *dnl= dn->get_linkage();
738       CInode *in = NULL;
739       if (dnl->is_primary()) {
740         in = dnl->get_inode();
741         if (clear_dirty && in->is_dirty())
742           in->mark_clean();
743       }
744       if (clear_dirty && dn->is_dirty())
745         dn->mark_clean();
746       if (dn->get_num_ref() == 0) {
747         remove_dentry(dn);
748         if (in)
749           cache->remove_inode(in);
750       }
751     }
752   }
753
754   if (clear_dirty && is_dirty())
755     mark_clean();
756 }
757
758 bool CDir::try_trim_snap_dentry(CDentry *dn, const set<snapid_t>& snaps)
759 {
760   assert(dn->last != CEPH_NOSNAP);
761   set<snapid_t>::const_iterator p = snaps.lower_bound(dn->first);
762   CDentry::linkage_t *dnl= dn->get_linkage();
763   CInode *in = 0;
764   if (dnl->is_primary())
765     in = dnl->get_inode();
766   if ((p == snaps.end() || *p > dn->last) &&
767       (dn->get_num_ref() == dn->is_dirty()) &&
768       (!in || in->get_num_ref() == in->is_dirty())) {
769     dout(10) << " purging snapped " << *dn << dendl;
770     if (in && in->is_dirty())
771       in->mark_clean();
772     remove_dentry(dn);
773     if (in) {
774       dout(10) << " purging snapped " << *in << dendl;
775       cache->remove_inode(in);
776     }
777     return true;
778   }
779   return false;
780 }
781
782
783 void CDir::purge_stale_snap_data(const set<snapid_t>& snaps)
784 {
785   dout(10) << "purge_stale_snap_data " << snaps << dendl;
786
787   CDir::map_t::iterator p = items.begin();
788   while (p != items.end()) {
789     CDentry *dn = p->second;
790     ++p;
791
792     if (dn->last == CEPH_NOSNAP)
793       continue;
794
795     try_trim_snap_dentry(dn, snaps);
796   }
797 }
798
799
800 /**
801  * steal_dentry -- semi-violently move a dentry from one CDir to another
802  * (*) violently, in that nitems, most pins, etc. are not correctly maintained 
803  * on the old CDir corpse; must call finish_old_fragment() when finished.
804  */
805 void CDir::steal_dentry(CDentry *dn)
806 {
807   dout(15) << "steal_dentry " << *dn << dendl;
808
809   items[dn->key()] = dn;
810
811   dn->dir->items.erase(dn->key());
812   if (dn->dir->items.empty())
813     dn->dir->put(PIN_CHILD);
814
815   if (get_num_any() == 0)
816     get(PIN_CHILD);
817   if (dn->get_linkage()->is_null()) {
818     if (dn->last == CEPH_NOSNAP)
819       num_head_null++;
820     else
821       num_snap_null++;
822   } else if (dn->last == CEPH_NOSNAP) {
823       num_head_items++;
824
825     if (dn->get_linkage()->is_primary()) {
826       CInode *in = dn->get_linkage()->get_inode();
827       inode_t *pi = in->get_projected_inode();
828       if (dn->get_linkage()->get_inode()->is_dir())
829         fnode.fragstat.nsubdirs++;
830       else
831         fnode.fragstat.nfiles++;
832       fnode.rstat.rbytes += pi->accounted_rstat.rbytes;
833       fnode.rstat.rfiles += pi->accounted_rstat.rfiles;
834       fnode.rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
835       fnode.rstat.rsnaprealms += pi->accounted_rstat.rsnaprealms;
836       if (pi->accounted_rstat.rctime > fnode.rstat.rctime)
837         fnode.rstat.rctime = pi->accounted_rstat.rctime;
838
839       // move dirty inode rstat to new dirfrag
840       if (in->is_dirty_rstat())
841         dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
842     } else if (dn->get_linkage()->is_remote()) {
843       if (dn->get_linkage()->get_remote_d_type() == DT_DIR)
844         fnode.fragstat.nsubdirs++;
845       else
846         fnode.fragstat.nfiles++;
847     }
848   } else {
849     num_snap_items++;
850     if (dn->get_linkage()->is_primary()) {
851       CInode *in = dn->get_linkage()->get_inode();
852       if (in->is_dirty_rstat())
853         dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
854     }
855   }
856
857   if (dn->auth_pins || dn->nested_auth_pins) {
858     // use the helpers here to maintain the auth_pin invariants on the dir inode
859     int ap = dn->get_num_auth_pins() + dn->get_num_nested_auth_pins();
860     int dap = dn->get_num_dir_auth_pins();
861     assert(dap <= ap);
862     adjust_nested_auth_pins(ap, dap, NULL);
863     dn->dir->adjust_nested_auth_pins(-ap, -dap, NULL);
864   }
865
866   if (dn->is_dirty()) 
867     num_dirty++;
868
869   dn->dir = this;
870 }
871
872 void CDir::prepare_old_fragment(map<string_snap_t, std::list<MDSInternalContextBase*> >& dentry_waiters, bool replay)
873 {
874   // auth_pin old fragment for duration so that any auth_pinning
875   // during the dentry migration doesn't trigger side effects
876   if (!replay && is_auth())
877     auth_pin(this);
878
879   if (!waiting_on_dentry.empty()) {
880     for (auto p = waiting_on_dentry.begin(); p != waiting_on_dentry.end(); ++p)
881       dentry_waiters[p->first].swap(p->second);
882     waiting_on_dentry.clear();
883     put(PIN_DNWAITER);
884   }
885 }
886
887 void CDir::prepare_new_fragment(bool replay)
888 {
889   if (!replay && is_auth()) {
890     _freeze_dir();
891     mark_complete();
892   }
893   inode->add_dirfrag(this);
894 }
895
896 void CDir::finish_old_fragment(list<MDSInternalContextBase*>& waiters, bool replay)
897 {
898   // take waiters _before_ unfreeze...
899   if (!replay) {
900     take_waiting(WAIT_ANY_MASK, waiters);
901     if (is_auth()) {
902       auth_unpin(this);  // pinned in prepare_old_fragment
903       assert(is_frozen_dir());
904       unfreeze_dir();
905     }
906   }
907
908   assert(nested_auth_pins == 0);
909   assert(dir_auth_pins == 0);
910   assert(auth_pins == 0);
911
912   num_head_items = num_head_null = 0;
913   num_snap_items = num_snap_null = 0;
914
915   // this mirrors init_fragment_pins()
916   if (is_auth()) 
917     clear_replica_map();
918   if (is_dirty())
919     mark_clean();
920   if (state_test(STATE_IMPORTBOUND))
921     put(PIN_IMPORTBOUND);
922   if (state_test(STATE_EXPORTBOUND))
923     put(PIN_EXPORTBOUND);
924   if (is_subtree_root())
925     put(PIN_SUBTREE);
926
927   if (auth_pins > 0)
928     put(PIN_AUTHPIN);
929
930   assert(get_num_ref() == (state_test(STATE_STICKY) ? 1:0));
931 }
932
933 void CDir::init_fragment_pins()
934 {
935   if (is_replicated())
936     get(PIN_REPLICATED);
937   if (state_test(STATE_DIRTY))
938     get(PIN_DIRTY);
939   if (state_test(STATE_EXPORTBOUND))
940     get(PIN_EXPORTBOUND);
941   if (state_test(STATE_IMPORTBOUND))
942     get(PIN_IMPORTBOUND);
943   if (is_subtree_root())
944     get(PIN_SUBTREE);
945 }
946
947 void CDir::split(int bits, list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
948 {
949   dout(10) << "split by " << bits << " bits on " << *this << dendl;
950
951   assert(replay || is_complete() || !is_auth());
952
953   list<frag_t> frags;
954   frag.split(bits, frags);
955
956   vector<CDir*> subfrags(1 << bits);
957   
958   double fac = 1.0 / (double)(1 << bits);  // for scaling load vecs
959
960   version_t rstat_version = inode->get_projected_inode()->rstat.version;
961   version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
962
963   nest_info_t rstatdiff;
964   frag_info_t fragstatdiff;
965   if (fnode.accounted_rstat.version == rstat_version)
966     rstatdiff.add_delta(fnode.accounted_rstat, fnode.rstat);
967   if (fnode.accounted_fragstat.version == dirstat_version)
968     fragstatdiff.add_delta(fnode.accounted_fragstat, fnode.fragstat);
969   dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
970
971   map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
972   prepare_old_fragment(dentry_waiters, replay);
973
974   // create subfrag dirs
975   int n = 0;
976   for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
977     CDir *f = new CDir(inode, *p, cache, is_auth());
978     f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
979     f->get_replicas() = get_replicas();
980     f->dir_auth = dir_auth;
981     f->init_fragment_pins();
982     f->set_version(get_version());
983
984     f->pop_me = pop_me;
985     f->pop_me.scale(fac);
986
987     // FIXME; this is an approximation
988     f->pop_nested = pop_nested;
989     f->pop_nested.scale(fac);
990     f->pop_auth_subtree = pop_auth_subtree;
991     f->pop_auth_subtree.scale(fac);
992     f->pop_auth_subtree_nested = pop_auth_subtree_nested;
993     f->pop_auth_subtree_nested.scale(fac);
994
995     dout(10) << " subfrag " << *p << " " << *f << dendl;
996     subfrags[n++] = f;
997     subs.push_back(f);
998
999     f->set_dir_auth(get_dir_auth());
1000     f->prepare_new_fragment(replay);
1001   }
1002   
1003   // repartition dentries
1004   while (!items.empty()) {
1005     CDir::map_t::iterator p = items.begin();
1006     
1007     CDentry *dn = p->second;
1008     frag_t subfrag = inode->pick_dirfrag(dn->name);
1009     int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1010     dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
1011     CDir *f = subfrags[n];
1012     f->steal_dentry(dn);
1013   }
1014
1015   for (auto& p : dentry_waiters) {
1016     frag_t subfrag = inode->pick_dirfrag(p.first.name);
1017     int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1018     CDir *f = subfrags[n];
1019
1020     if (f->waiting_on_dentry.empty())
1021       f->get(PIN_DNWAITER);
1022     f->waiting_on_dentry[p.first].swap(p.second);
1023   }
1024
1025   // FIXME: handle dirty old rstat
1026
1027   // fix up new frag fragstats
1028   for (int i=0; i<n; i++) {
1029     CDir *f = subfrags[i];
1030     f->fnode.rstat.version = rstat_version;
1031     f->fnode.accounted_rstat = f->fnode.rstat;
1032     f->fnode.fragstat.version = dirstat_version;
1033     f->fnode.accounted_fragstat = f->fnode.fragstat;
1034     dout(10) << " rstat " << f->fnode.rstat << " fragstat " << f->fnode.fragstat
1035              << " on " << *f << dendl;
1036   }
1037
1038   // give any outstanding frag stat differential to first frag
1039   dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff
1040            << " to " << *subfrags[0] << dendl;
1041   subfrags[0]->fnode.accounted_rstat.add(rstatdiff);
1042   subfrags[0]->fnode.accounted_fragstat.add(fragstatdiff);
1043
1044   finish_old_fragment(waiters, replay);
1045 }
1046
1047 void CDir::merge(list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
1048 {
1049   dout(10) << "merge " << subs << dendl;
1050
1051   mds_authority_t new_auth = CDIR_AUTH_DEFAULT;
1052   for (auto dir : subs) {
1053     if (dir->get_dir_auth() != CDIR_AUTH_DEFAULT &&
1054         dir->get_dir_auth() != new_auth) {
1055       assert(new_auth == CDIR_AUTH_DEFAULT);
1056       new_auth = dir->get_dir_auth();
1057     }
1058   }
1059
1060   set_dir_auth(new_auth);
1061   prepare_new_fragment(replay);
1062
1063   nest_info_t rstatdiff;
1064   frag_info_t fragstatdiff;
1065   bool touched_mtime, touched_chattr;
1066   version_t rstat_version = inode->get_projected_inode()->rstat.version;
1067   version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1068
1069   map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
1070
1071   for (auto dir : subs) {
1072     dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
1073     assert(!dir->is_auth() || dir->is_complete() || replay);
1074
1075     if (dir->fnode.accounted_rstat.version == rstat_version)
1076       rstatdiff.add_delta(dir->fnode.accounted_rstat, dir->fnode.rstat);
1077     if (dir->fnode.accounted_fragstat.version == dirstat_version)
1078       fragstatdiff.add_delta(dir->fnode.accounted_fragstat, dir->fnode.fragstat,
1079                              &touched_mtime, &touched_chattr);
1080
1081     dir->prepare_old_fragment(dentry_waiters, replay);
1082
1083     // steal dentries
1084     while (!dir->items.empty()) 
1085       steal_dentry(dir->items.begin()->second);
1086     
1087     // merge replica map
1088     for (const auto &p : dir->get_replicas()) {
1089       unsigned cur = get_replicas()[p.first];
1090       if (p.second > cur)
1091         get_replicas()[p.first] = p.second;
1092     }
1093
1094     // merge version
1095     if (dir->get_version() > get_version())
1096       set_version(dir->get_version());
1097
1098     // merge state
1099     state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
1100
1101     dir->finish_old_fragment(waiters, replay);
1102     inode->close_dirfrag(dir->get_frag());
1103   }
1104
1105   if (!dentry_waiters.empty()) {
1106     get(PIN_DNWAITER);
1107     for (auto& p : dentry_waiters) {
1108       waiting_on_dentry[p.first].swap(p.second);
1109     }
1110   }
1111
1112   if (is_auth() && !replay)
1113     mark_complete();
1114
1115   // FIXME: merge dirty old rstat
1116   fnode.rstat.version = rstat_version;
1117   fnode.accounted_rstat = fnode.rstat;
1118   fnode.accounted_rstat.add(rstatdiff);
1119
1120   fnode.fragstat.version = dirstat_version;
1121   fnode.accounted_fragstat = fnode.fragstat;
1122   fnode.accounted_fragstat.add(fragstatdiff);
1123
1124   init_fragment_pins();
1125 }
1126
1127
1128
1129
1130 void CDir::resync_accounted_fragstat()
1131 {
1132   fnode_t *pf = get_projected_fnode();
1133   inode_t *pi = inode->get_projected_inode();
1134
1135   if (pf->accounted_fragstat.version != pi->dirstat.version) {
1136     pf->fragstat.version = pi->dirstat.version;
1137     dout(10) << "resync_accounted_fragstat " << pf->accounted_fragstat << " -> " << pf->fragstat << dendl;
1138     pf->accounted_fragstat = pf->fragstat;
1139   }
1140 }
1141
1142 /*
1143  * resync rstat and accounted_rstat with inode
1144  */
1145 void CDir::resync_accounted_rstat()
1146 {
1147   fnode_t *pf = get_projected_fnode();
1148   inode_t *pi = inode->get_projected_inode();
1149   
1150   if (pf->accounted_rstat.version != pi->rstat.version) {
1151     pf->rstat.version = pi->rstat.version;
1152     dout(10) << "resync_accounted_rstat " << pf->accounted_rstat << " -> " << pf->rstat << dendl;
1153     pf->accounted_rstat = pf->rstat;
1154     dirty_old_rstat.clear();
1155   }
1156 }
1157
1158 void CDir::assimilate_dirty_rstat_inodes()
1159 {
1160   dout(10) << "assimilate_dirty_rstat_inodes" << dendl;
1161   for (elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1162        !p.end(); ++p) {
1163     CInode *in = *p;
1164     assert(in->is_auth());
1165     if (in->is_frozen())
1166       continue;
1167
1168     inode_t *pi = in->project_inode();
1169     pi->version = in->pre_dirty();
1170
1171     inode->mdcache->project_rstat_inode_to_frag(in, this, 0, 0, NULL);
1172   }
1173   state_set(STATE_ASSIMRSTAT);
1174   dout(10) << "assimilate_dirty_rstat_inodes done" << dendl;
1175 }
1176
1177 void CDir::assimilate_dirty_rstat_inodes_finish(MutationRef& mut, EMetaBlob *blob)
1178 {
1179   if (!state_test(STATE_ASSIMRSTAT))
1180     return;
1181   state_clear(STATE_ASSIMRSTAT);
1182   dout(10) << "assimilate_dirty_rstat_inodes_finish" << dendl;
1183   elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1184   while (!p.end()) {
1185     CInode *in = *p;
1186     ++p;
1187
1188     if (in->is_frozen())
1189       continue;
1190
1191     CDentry *dn = in->get_projected_parent_dn();
1192
1193     mut->auth_pin(in);
1194     mut->add_projected_inode(in);
1195
1196     in->clear_dirty_rstat();
1197     blob->add_primary_dentry(dn, in, true);
1198   }
1199
1200   if (!dirty_rstat_inodes.empty())
1201     inode->mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
1202 }
1203
1204
1205
1206
1207 /****************************************
1208  * WAITING
1209  */
1210
1211 void CDir::add_dentry_waiter(const string& dname, snapid_t snapid, MDSInternalContextBase *c) 
1212 {
1213   if (waiting_on_dentry.empty())
1214     get(PIN_DNWAITER);
1215   waiting_on_dentry[string_snap_t(dname, snapid)].push_back(c);
1216   dout(10) << "add_dentry_waiter dentry " << dname
1217            << " snap " << snapid
1218            << " " << c << " on " << *this << dendl;
1219 }
1220
1221 void CDir::take_dentry_waiting(const string& dname, snapid_t first, snapid_t last,
1222                                list<MDSInternalContextBase*>& ls)
1223 {
1224   if (waiting_on_dentry.empty())
1225     return;
1226   
1227   string_snap_t lb(dname, first);
1228   string_snap_t ub(dname, last);
1229   compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.lower_bound(lb);
1230   while (p != waiting_on_dentry.end() &&
1231          !(ub < p->first)) {
1232     dout(10) << "take_dentry_waiting dentry " << dname
1233              << " [" << first << "," << last << "] found waiter on snap "
1234              << p->first.snapid
1235              << " on " << *this << dendl;
1236     ls.splice(ls.end(), p->second);
1237     waiting_on_dentry.erase(p++);
1238   }
1239
1240   if (waiting_on_dentry.empty())
1241     put(PIN_DNWAITER);
1242 }
1243
1244 void CDir::take_sub_waiting(list<MDSInternalContextBase*>& ls)
1245 {
1246   dout(10) << "take_sub_waiting" << dendl;
1247   if (!waiting_on_dentry.empty()) {
1248     for (compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
1249          p != waiting_on_dentry.end();
1250          ++p) 
1251       ls.splice(ls.end(), p->second);
1252     waiting_on_dentry.clear();
1253     put(PIN_DNWAITER);
1254   }
1255 }
1256
1257
1258
1259 void CDir::add_waiter(uint64_t tag, MDSInternalContextBase *c) 
1260 {
1261   // hierarchical?
1262
1263   // at free root?
1264   if (tag & WAIT_ATFREEZEROOT) {
1265     if (!(is_freezing_tree_root() || is_frozen_tree_root() ||
1266           is_freezing_dir() || is_frozen_dir())) {
1267       // try parent
1268       dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATFREEZEROOT, " << *this << " is not root, trying parent" << dendl;
1269       inode->parent->dir->add_waiter(tag, c);
1270       return;
1271     }
1272   }
1273   
1274   // at subtree root?
1275   if (tag & WAIT_ATSUBTREEROOT) {
1276     if (!is_subtree_root()) {
1277       // try parent
1278       dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl;
1279       inode->parent->dir->add_waiter(tag, c);
1280       return;
1281     }
1282   }
1283
1284   assert(!(tag & WAIT_CREATED) || state_test(STATE_CREATING));
1285
1286   MDSCacheObject::add_waiter(tag, c);
1287 }
1288
1289
1290
1291 /* NOTE: this checks dentry waiters too */
1292 void CDir::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
1293 {
1294   if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
1295     // take all dentry waiters
1296     while (!waiting_on_dentry.empty()) {
1297       compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
1298       dout(10) << "take_waiting dentry " << p->first.name
1299                << " snap " << p->first.snapid << " on " << *this << dendl;
1300       ls.splice(ls.end(), p->second);
1301       waiting_on_dentry.erase(p);
1302     }
1303     put(PIN_DNWAITER);
1304   }
1305   
1306   // waiting
1307   MDSCacheObject::take_waiting(mask, ls);
1308 }
1309
1310
1311 void CDir::finish_waiting(uint64_t mask, int result) 
1312 {
1313   dout(11) << "finish_waiting mask " << hex << mask << dec << " result " << result << " on " << *this << dendl;
1314
1315   list<MDSInternalContextBase*> finished;
1316   take_waiting(mask, finished);
1317   if (result < 0)
1318     finish_contexts(g_ceph_context, finished, result);
1319   else
1320     cache->mds->queue_waiters(finished);
1321 }
1322
1323
1324
1325 // dirty/clean
1326
1327 fnode_t *CDir::project_fnode()
1328 {
1329   assert(get_version() != 0);
1330   fnode_t *p = new fnode_t;
1331   *p = *get_projected_fnode();
1332   projected_fnode.push_back(p);
1333
1334   if (scrub_infop && scrub_infop->last_scrub_dirty) {
1335     p->localized_scrub_stamp = scrub_infop->last_local.time;
1336     p->localized_scrub_version = scrub_infop->last_local.version;
1337     p->recursive_scrub_stamp = scrub_infop->last_recursive.time;
1338     p->recursive_scrub_version = scrub_infop->last_recursive.version;
1339     scrub_infop->last_scrub_dirty = false;
1340     scrub_maybe_delete_info();
1341   }
1342
1343   dout(10) << "project_fnode " << p << dendl;
1344   return p;
1345 }
1346
1347 void CDir::pop_and_dirty_projected_fnode(LogSegment *ls)
1348 {
1349   assert(!projected_fnode.empty());
1350   dout(15) << "pop_and_dirty_projected_fnode " << projected_fnode.front()
1351            << " v" << projected_fnode.front()->version << dendl;
1352   fnode = *projected_fnode.front();
1353   _mark_dirty(ls);
1354   delete projected_fnode.front();
1355   projected_fnode.pop_front();
1356 }
1357
1358
1359 version_t CDir::pre_dirty(version_t min)
1360 {
1361   if (min > projected_version)
1362     projected_version = min;
1363   ++projected_version;
1364   dout(10) << "pre_dirty " << projected_version << dendl;
1365   return projected_version;
1366 }
1367
1368 void CDir::mark_dirty(version_t pv, LogSegment *ls)
1369 {
1370   assert(get_version() < pv);
1371   assert(pv <= projected_version);
1372   fnode.version = pv;
1373   _mark_dirty(ls);
1374 }
1375
1376 void CDir::_mark_dirty(LogSegment *ls)
1377 {
1378   if (!state_test(STATE_DIRTY)) {
1379     dout(10) << "mark_dirty (was clean) " << *this << " version " << get_version() << dendl;
1380     _set_dirty_flag();
1381     assert(ls);
1382   } else {
1383     dout(10) << "mark_dirty (already dirty) " << *this << " version " << get_version() << dendl;
1384   }
1385   if (ls) {
1386     ls->dirty_dirfrags.push_back(&item_dirty);
1387
1388     // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1389     if (committed_version == 0 && !item_new.is_on_list())
1390       ls->new_dirfrags.push_back(&item_new);
1391   }
1392 }
1393
1394 void CDir::mark_new(LogSegment *ls)
1395 {
1396   ls->new_dirfrags.push_back(&item_new);
1397   state_clear(STATE_CREATING);
1398
1399   list<MDSInternalContextBase*> waiters;
1400   take_waiting(CDir::WAIT_CREATED, waiters);
1401   cache->mds->queue_waiters(waiters);
1402 }
1403
1404 void CDir::mark_clean()
1405 {
1406   dout(10) << "mark_clean " << *this << " version " << get_version() << dendl;
1407   if (state_test(STATE_DIRTY)) {
1408     item_dirty.remove_myself();
1409     item_new.remove_myself();
1410
1411     state_clear(STATE_DIRTY);
1412     put(PIN_DIRTY);
1413   }
1414 }
1415
1416 // caller should hold auth pin of this
1417 void CDir::log_mark_dirty()
1418 {
1419   if (is_dirty() || is_projected())
1420     return; // noop if it is already dirty or will be dirty
1421
1422   version_t pv = pre_dirty();
1423   mark_dirty(pv, cache->mds->mdlog->get_current_segment());
1424 }
1425
1426 void CDir::mark_complete() {
1427   state_set(STATE_COMPLETE);
1428   bloom.reset();
1429 }
1430
1431 void CDir::first_get()
1432 {
1433   inode->get(CInode::PIN_DIRFRAG);
1434 }
1435
1436 void CDir::last_put()
1437 {
1438   inode->put(CInode::PIN_DIRFRAG);
1439 }
1440
1441
1442
1443 /******************************************************************************
1444  * FETCH and COMMIT
1445  */
1446
1447 // -----------------------
1448 // FETCH
1449 void CDir::fetch(MDSInternalContextBase *c, bool ignore_authpinnability)
1450 {
1451   string want;
1452   return fetch(c, want, ignore_authpinnability);
1453 }
1454
1455 void CDir::fetch(MDSInternalContextBase *c, const string& want_dn, bool ignore_authpinnability)
1456 {
1457   dout(10) << "fetch on " << *this << dendl;
1458   
1459   assert(is_auth());
1460   assert(!is_complete());
1461
1462   if (!can_auth_pin() && !ignore_authpinnability) {
1463     if (c) {
1464       dout(7) << "fetch waiting for authpinnable" << dendl;
1465       add_waiter(WAIT_UNFREEZE, c);
1466     } else
1467       dout(7) << "fetch not authpinnable and no context" << dendl;
1468     return;
1469   }
1470
1471   // unlinked directory inode shouldn't have any entry
1472   if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
1473       !inode->snaprealm) {
1474     dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl;
1475     if (get_version() == 0) {
1476       assert(inode->is_auth());
1477       set_version(1);
1478
1479       if (state_test(STATE_REJOINUNDEF)) {
1480         assert(cache->mds->is_rejoin());
1481         state_clear(STATE_REJOINUNDEF);
1482         cache->opened_undef_dirfrag(this);
1483       }
1484     }
1485     mark_complete();
1486
1487     if (c)
1488       cache->mds->queue_waiter(c);
1489     return;
1490   }
1491
1492   if (c) add_waiter(WAIT_COMPLETE, c);
1493   if (!want_dn.empty()) wanted_items.insert(want_dn);
1494   
1495   // already fetching?
1496   if (state_test(CDir::STATE_FETCHING)) {
1497     dout(7) << "already fetching; waiting" << dendl;
1498     return;
1499   }
1500
1501   auth_pin(this);
1502   state_set(CDir::STATE_FETCHING);
1503
1504   if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1505
1506   std::set<dentry_key_t> empty;
1507   _omap_fetch(NULL, empty);
1508 }
1509
1510 void CDir::fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1511 {
1512   dout(10) << "fetch " << keys.size() << " keys on " << *this << dendl;
1513
1514   assert(is_auth());
1515   assert(!is_complete());
1516
1517   if (!can_auth_pin()) {
1518     dout(7) << "fetch keys waiting for authpinnable" << dendl;
1519     add_waiter(WAIT_UNFREEZE, c);
1520     return;
1521   }
1522   if (state_test(CDir::STATE_FETCHING)) {
1523     dout(7) << "fetch keys waiting for full fetch" << dendl;
1524     add_waiter(WAIT_COMPLETE, c);
1525     return;
1526   }
1527
1528   auth_pin(this);
1529   if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1530
1531   _omap_fetch(c, keys);
1532 }
1533
1534 class C_IO_Dir_OMAP_FetchedMore : public CDirIOContext {
1535   MDSInternalContextBase *fin;
1536 public:
1537   bufferlist hdrbl;
1538   bool more = false;
1539   map<string, bufferlist> omap;      ///< carry-over from before
1540   map<string, bufferlist> omap_more; ///< new batch
1541   int ret;
1542   C_IO_Dir_OMAP_FetchedMore(CDir *d, MDSInternalContextBase *f) :
1543     CDirIOContext(d), fin(f), ret(0) { }
1544   void finish(int r) {
1545     // merge results
1546     if (omap.empty()) {
1547       omap.swap(omap_more);
1548     } else {
1549       omap.insert(omap_more.begin(), omap_more.end());
1550     }
1551     if (more) {
1552       dir->_omap_fetch_more(hdrbl, omap, fin);
1553     } else {
1554       dir->_omap_fetched(hdrbl, omap, !fin, r);
1555       if (fin)
1556         fin->complete(r);
1557     }
1558   }
1559 };
1560
1561 class C_IO_Dir_OMAP_Fetched : public CDirIOContext {
1562   MDSInternalContextBase *fin;
1563 public:
1564   bufferlist hdrbl;
1565   bool more = false;
1566   map<string, bufferlist> omap;
1567   bufferlist btbl;
1568   int ret1, ret2, ret3;
1569
1570   C_IO_Dir_OMAP_Fetched(CDir *d, MDSInternalContextBase *f) :
1571     CDirIOContext(d), fin(f), ret1(0), ret2(0), ret3(0) { }
1572   void finish(int r) override {
1573     // check the correctness of backtrace
1574     if (r >= 0 && ret3 != -ECANCELED)
1575       dir->inode->verify_diri_backtrace(btbl, ret3);
1576     if (r >= 0) r = ret1;
1577     if (r >= 0) r = ret2;
1578     if (more) {
1579       dir->_omap_fetch_more(hdrbl, omap, fin);
1580     } else {
1581       dir->_omap_fetched(hdrbl, omap, !fin, r);
1582       if (fin)
1583         fin->complete(r);
1584     }
1585   }
1586 };
1587
1588 void CDir::_omap_fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1589 {
1590   C_IO_Dir_OMAP_Fetched *fin = new C_IO_Dir_OMAP_Fetched(this, c);
1591   object_t oid = get_ondisk_object();
1592   object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1593   ObjectOperation rd;
1594   rd.omap_get_header(&fin->hdrbl, &fin->ret1);
1595   if (keys.empty()) {
1596     assert(!c);
1597     rd.omap_get_vals("", "", g_conf->mds_dir_keys_per_op,
1598                      &fin->omap, &fin->more, &fin->ret2);
1599   } else {
1600     assert(c);
1601     std::set<std::string> str_keys;
1602     for (auto p = keys.begin(); p != keys.end(); ++p) {
1603       string str;
1604       p->encode(str);
1605       str_keys.insert(str);
1606     }
1607     rd.omap_get_vals_by_keys(str_keys, &fin->omap, &fin->ret2);
1608   }
1609   // check the correctness of backtrace
1610   if (g_conf->mds_verify_backtrace > 0 && frag == frag_t()) {
1611     rd.getxattr("parent", &fin->btbl, &fin->ret3);
1612     rd.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1613   } else {
1614     fin->ret3 = -ECANCELED;
1615   }
1616
1617   cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1618                              new C_OnFinisher(fin, cache->mds->finisher));
1619 }
1620
1621 void CDir::_omap_fetch_more(
1622   bufferlist& hdrbl,
1623   map<string, bufferlist>& omap,
1624   MDSInternalContextBase *c)
1625 {
1626   // we have more omap keys to fetch!
1627   object_t oid = get_ondisk_object();
1628   object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1629   C_IO_Dir_OMAP_FetchedMore *fin = new C_IO_Dir_OMAP_FetchedMore(this, c);
1630   fin->hdrbl.claim(hdrbl);
1631   fin->omap.swap(omap);
1632   ObjectOperation rd;
1633   rd.omap_get_vals(fin->omap.rbegin()->first,
1634                    "", /* filter prefix */
1635                    g_conf->mds_dir_keys_per_op,
1636                    &fin->omap_more,
1637                    &fin->more,
1638                    &fin->ret);
1639   cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1640                              new C_OnFinisher(fin, cache->mds->finisher));
1641 }
1642
1643 CDentry *CDir::_load_dentry(
1644     const std::string &key,
1645     const std::string &dname,
1646     const snapid_t last,
1647     bufferlist &bl,
1648     const int pos,
1649     const std::set<snapid_t> *snaps,
1650     bool *force_dirty,
1651     list<CInode*> *undef_inodes)
1652 {
1653   bufferlist::iterator q = bl.begin();
1654
1655   snapid_t first;
1656   ::decode(first, q);
1657
1658   // marker
1659   char type;
1660   ::decode(type, q);
1661
1662   dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
1663            << " [" << first << "," << last << "]"
1664            << dendl;
1665
1666   bool stale = false;
1667   if (snaps && last != CEPH_NOSNAP) {
1668     set<snapid_t>::const_iterator p = snaps->lower_bound(first);
1669     if (p == snaps->end() || *p > last) {
1670       dout(10) << " skipping stale dentry on [" << first << "," << last << "]" << dendl;
1671       stale = true;
1672     }
1673   }
1674   
1675   /*
1676    * look for existing dentry for _last_ snap, because unlink +
1677    * create may leave a "hole" (epochs during which the dentry
1678    * doesn't exist) but for which no explicit negative dentry is in
1679    * the cache.
1680    */
1681   CDentry *dn;
1682   if (stale)
1683     dn = lookup_exact_snap(dname, last);
1684   else
1685     dn = lookup(dname, last);
1686
1687   if (type == 'L') {
1688     // hard link
1689     inodeno_t ino;
1690     unsigned char d_type;
1691     ::decode(ino, q);
1692     ::decode(d_type, q);
1693
1694     if (stale) {
1695       if (!dn) {
1696         stale_items.insert(key);
1697         *force_dirty = true;
1698       }
1699       return dn;
1700     }
1701
1702     if (dn) {
1703       if (dn->get_linkage()->get_inode() == 0) {
1704         dout(12) << "_fetched  had NEG dentry " << *dn << dendl;
1705       } else {
1706         dout(12) << "_fetched  had dentry " << *dn << dendl;
1707       }
1708     } else {
1709       // (remote) link
1710       dn = add_remote_dentry(dname, ino, d_type, first, last);
1711       
1712       // link to inode?
1713       CInode *in = cache->get_inode(ino);   // we may or may not have it.
1714       if (in) {
1715         dn->link_remote(dn->get_linkage(), in);
1716         dout(12) << "_fetched  got remote link " << ino << " which we have " << *in << dendl;
1717       } else {
1718         dout(12) << "_fetched  got remote link " << ino << " (dont' have it)" << dendl;
1719       }
1720     }
1721   } 
1722   else if (type == 'I') {
1723     // inode
1724     
1725     // Load inode data before looking up or constructing CInode
1726     InodeStore inode_data;
1727     inode_data.decode_bare(q);
1728     
1729     if (stale) {
1730       if (!dn) {
1731         stale_items.insert(key);
1732         *force_dirty = true;
1733       }
1734       return dn;
1735     }
1736
1737     bool undef_inode = false;
1738     if (dn) {
1739       CInode *in = dn->get_linkage()->get_inode();
1740       if (in) {
1741         dout(12) << "_fetched  had dentry " << *dn << dendl;
1742         if (in->state_test(CInode::STATE_REJOINUNDEF)) {
1743           undef_inodes->push_back(in);
1744           undef_inode = true;
1745         }
1746       } else
1747         dout(12) << "_fetched  had NEG dentry " << *dn << dendl;
1748     }
1749
1750     if (!dn || undef_inode) {
1751       // add inode
1752       CInode *in = cache->get_inode(inode_data.inode.ino, last);
1753       if (!in || undef_inode) {
1754         if (undef_inode && in)
1755           in->first = first;
1756         else
1757           in = new CInode(cache, true, first, last);
1758         
1759         in->inode = inode_data.inode;
1760         // symlink?
1761         if (in->is_symlink()) 
1762           in->symlink = inode_data.symlink;
1763         
1764         in->dirfragtree.swap(inode_data.dirfragtree);
1765         in->xattrs.swap(inode_data.xattrs);
1766         in->old_inodes.swap(inode_data.old_inodes);
1767         if (!in->old_inodes.empty()) {
1768           snapid_t min_first = in->old_inodes.rbegin()->first + 1;
1769           if (min_first > in->first)
1770             in->first = min_first;
1771         }
1772
1773         in->oldest_snap = inode_data.oldest_snap;
1774         in->decode_snap_blob(inode_data.snap_blob);
1775         if (snaps && !in->snaprealm)
1776           in->purge_stale_snap_data(*snaps);
1777
1778         if (!undef_inode) {
1779           cache->add_inode(in); // add
1780           dn = add_primary_dentry(dname, in, first, last); // link
1781         }
1782         dout(12) << "_fetched  got " << *dn << " " << *in << dendl;
1783
1784         if (in->inode.is_dirty_rstat())
1785           in->mark_dirty_rstat();
1786
1787         //in->hack_accessed = false;
1788         //in->hack_load_stamp = ceph_clock_now();
1789         //num_new_inodes_loaded++;
1790       } else {
1791         dout(0) << "_fetched  badness: got (but i already had) " << *in
1792                 << " mode " << in->inode.mode
1793                 << " mtime " << in->inode.mtime << dendl;
1794         string dirpath, inopath;
1795         this->inode->make_path_string(dirpath);
1796         in->make_path_string(inopath);
1797         cache->mds->clog->error() << "loaded dup inode " << inode_data.inode.ino
1798           << " [" << first << "," << last << "] v" << inode_data.inode.version
1799           << " at " << dirpath << "/" << dname
1800           << ", but inode " << in->vino() << " v" << in->inode.version
1801           << " already exists at " << inopath;
1802         return dn;
1803       }
1804     }
1805   } else {
1806     std::ostringstream oss;
1807     oss << "Invalid tag char '" << type << "' pos " << pos;
1808     throw buffer::malformed_input(oss.str());
1809   }
1810
1811   return dn;
1812 }
1813
1814 void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
1815                          bool complete, int r)
1816 {
1817   LogChannelRef clog = cache->mds->clog;
1818   dout(10) << "_fetched header " << hdrbl.length() << " bytes "
1819            << omap.size() << " keys for " << *this << dendl;
1820
1821   assert(r == 0 || r == -ENOENT || r == -ENODATA);
1822   assert(is_auth());
1823   assert(!is_frozen());
1824
1825   if (hdrbl.length() == 0) {
1826     dout(0) << "_fetched missing object for " << *this << dendl;
1827
1828     clog->error() << "dir " << dirfrag() << " object missing on disk; some "
1829                      "files may be lost (" << get_path() << ")";
1830
1831     go_bad(complete);
1832     return;
1833   }
1834
1835   fnode_t got_fnode;
1836   {
1837     bufferlist::iterator p = hdrbl.begin();
1838     try {
1839       ::decode(got_fnode, p);
1840     } catch (const buffer::error &err) {
1841       derr << "Corrupt fnode in dirfrag " << dirfrag()
1842         << ": " << err << dendl;
1843       clog->warn() << "Corrupt fnode header in " << dirfrag() << ": "
1844                   << err << " (" << get_path() << ")";
1845       go_bad(complete);
1846       return;
1847     }
1848     if (!p.end()) {
1849       clog->warn() << "header buffer of dir " << dirfrag() << " has "
1850                   << hdrbl.length() - p.get_off() << " extra bytes ("
1851                   << get_path() << ")";
1852       go_bad(complete);
1853       return;
1854     }
1855   }
1856
1857   dout(10) << "_fetched version " << got_fnode.version << dendl;
1858   
1859   // take the loaded fnode?
1860   // only if we are a fresh CDir* with no prior state.
1861   if (get_version() == 0) {
1862     assert(!is_projected());
1863     assert(!state_test(STATE_COMMITTING));
1864     fnode = got_fnode;
1865     projected_version = committing_version = committed_version = got_fnode.version;
1866
1867     if (state_test(STATE_REJOINUNDEF)) {
1868       assert(cache->mds->is_rejoin());
1869       state_clear(STATE_REJOINUNDEF);
1870       cache->opened_undef_dirfrag(this);
1871     }
1872   }
1873
1874   list<CInode*> undef_inodes;
1875
1876   // purge stale snaps?
1877   // only if we have past_parents open!
1878   bool force_dirty = false;
1879   const set<snapid_t> *snaps = NULL;
1880   SnapRealm *realm = inode->find_snaprealm();
1881   if (!realm->have_past_parents_open()) {
1882     dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
1883   } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
1884     snaps = &realm->get_snaps();
1885     dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
1886              << " < " << realm->get_last_destroyed()
1887              << ", snap purge based on " << *snaps << dendl;
1888     if (get_num_snap_items() == 0) {
1889       fnode.snap_purged_thru = realm->get_last_destroyed();
1890       force_dirty = true;
1891     }
1892   }
1893
1894   unsigned pos = omap.size() - 1;
1895   for (map<string, bufferlist>::reverse_iterator p = omap.rbegin();
1896        p != omap.rend();
1897        ++p, --pos) {
1898     string dname;
1899     snapid_t last;
1900     dentry_key_t::decode_helper(p->first, dname, last);
1901
1902     CDentry *dn = NULL;
1903     try {
1904       dn = _load_dentry(
1905             p->first, dname, last, p->second, pos, snaps,
1906             &force_dirty, &undef_inodes);
1907     } catch (const buffer::error &err) {
1908       cache->mds->clog->warn() << "Corrupt dentry '" << dname << "' in "
1909                                   "dir frag " << dirfrag() << ": "
1910                                << err << "(" << get_path() << ")";
1911
1912       // Remember that this dentry is damaged.  Subsequent operations
1913       // that try to act directly on it will get their EIOs, but this
1914       // dirfrag as a whole will continue to look okay (minus the
1915       // mysteriously-missing dentry)
1916       go_bad_dentry(last, dname);
1917
1918       // Anyone who was WAIT_DENTRY for this guy will get kicked
1919       // to RetryRequest, and hit the DamageTable-interrogating path.
1920       // Stats will now be bogus because we will think we're complete,
1921       // but have 1 or more missing dentries.
1922       continue;
1923     }
1924
1925     if (dn && (wanted_items.count(dname) > 0 || !complete)) {
1926       dout(10) << " touching wanted dn " << *dn << dendl;
1927       inode->mdcache->touch_dentry(dn);
1928     }
1929
1930     /** clean underwater item?
1931      * Underwater item is something that is dirty in our cache from
1932      * journal replay, but was previously flushed to disk before the
1933      * mds failed.
1934      *
1935      * We only do this is committed_version == 0. that implies either
1936      * - this is a fetch after from a clean/empty CDir is created
1937      *   (and has no effect, since the dn won't exist); or
1938      * - this is a fetch after _recovery_, which is what we're worried 
1939      *   about.  Items that are marked dirty from the journal should be
1940      *   marked clean if they appear on disk.
1941      */
1942     if (committed_version == 0 &&     
1943         dn &&
1944         dn->get_version() <= got_fnode.version &&
1945         dn->is_dirty()) {
1946       dout(10) << "_fetched  had underwater dentry " << *dn << ", marking clean" << dendl;
1947       dn->mark_clean();
1948
1949       if (dn->get_linkage()->is_primary()) {
1950         assert(dn->get_linkage()->get_inode()->get_version() <= got_fnode.version);
1951         dout(10) << "_fetched  had underwater inode " << *dn->get_linkage()->get_inode() << ", marking clean" << dendl;
1952         dn->get_linkage()->get_inode()->mark_clean();
1953       }
1954     }
1955   }
1956
1957   //cache->mds->logger->inc("newin", num_new_inodes_loaded);
1958
1959   // mark complete, !fetching
1960   if (complete) {
1961     wanted_items.clear();
1962     mark_complete();
1963     state_clear(STATE_FETCHING);
1964
1965     if (scrub_infop && scrub_infop->need_scrub_local) {
1966       scrub_infop->need_scrub_local = false;
1967       scrub_local();
1968     }
1969   }
1970
1971   // open & force frags
1972   while (!undef_inodes.empty()) {
1973     CInode *in = undef_inodes.front();
1974     undef_inodes.pop_front();
1975     in->state_clear(CInode::STATE_REJOINUNDEF);
1976     cache->opened_undef_inode(in);
1977   }
1978
1979   // dirty myself to remove stale snap dentries
1980   if (force_dirty && !inode->mdcache->is_readonly())
1981     log_mark_dirty();
1982
1983   auth_unpin(this);
1984
1985   if (complete) {
1986     // kick waiters
1987     finish_waiting(WAIT_COMPLETE, 0);
1988   }
1989 }
1990
1991 void CDir::_go_bad()
1992 {
1993   if (get_version() == 0)
1994     set_version(1);
1995   state_set(STATE_BADFRAG);
1996   // mark complete, !fetching
1997   mark_complete();
1998   state_clear(STATE_FETCHING);
1999   auth_unpin(this);
2000
2001   // kick waiters
2002   finish_waiting(WAIT_COMPLETE, -EIO);
2003 }
2004
2005 void CDir::go_bad_dentry(snapid_t last, const std::string &dname)
2006 {
2007   dout(10) << "go_bad_dentry " << dname << dendl;
2008   const bool fatal = cache->mds->damage_table.notify_dentry(
2009       inode->ino(), frag, last, dname, get_path() + "/" + dname);
2010   if (fatal) {
2011     cache->mds->damaged();
2012     ceph_abort();  // unreachable, damaged() respawns us
2013   }
2014 }
2015
2016 void CDir::go_bad(bool complete)
2017 {
2018   dout(10) << "go_bad " << frag << dendl;
2019   const bool fatal = cache->mds->damage_table.notify_dirfrag(
2020       inode->ino(), frag, get_path());
2021   if (fatal) {
2022     cache->mds->damaged();
2023     ceph_abort();  // unreachable, damaged() respawns us
2024   }
2025
2026   if (complete)
2027     _go_bad();
2028   else
2029     auth_unpin(this);
2030 }
2031
2032 // -----------------------
2033 // COMMIT
2034
2035 /**
2036  * commit
2037  *
2038  * @param want - min version i want committed
2039  * @param c - callback for completion
2040  */
2041 void CDir::commit(version_t want, MDSInternalContextBase *c, bool ignore_authpinnability, int op_prio)
2042 {
2043   dout(10) << "commit want " << want << " on " << *this << dendl;
2044   if (want == 0) want = get_version();
2045
2046   // preconditions
2047   assert(want <= get_version() || get_version() == 0);    // can't commit the future
2048   assert(want > committed_version); // the caller is stupid
2049   assert(is_auth());
2050   assert(ignore_authpinnability || can_auth_pin());
2051
2052   // note: queue up a noop if necessary, so that we always
2053   // get an auth_pin.
2054   if (!c)
2055     c = new C_MDSInternalNoop;
2056
2057   // auth_pin on first waiter
2058   if (waiting_for_commit.empty())
2059     auth_pin(this);
2060   waiting_for_commit[want].push_back(c);
2061   
2062   // ok.
2063   _commit(want, op_prio);
2064 }
2065
2066 class C_IO_Dir_Committed : public CDirIOContext {
2067   version_t version;
2068 public:
2069   C_IO_Dir_Committed(CDir *d, version_t v) : CDirIOContext(d), version(v) { }
2070   void finish(int r) override {
2071     dir->_committed(r, version);
2072   }
2073 };
2074
2075 /**
2076  * Flush out the modified dentries in this dir. Keep the bufferlist
2077  * below max_write_size;
2078  */
2079 void CDir::_omap_commit(int op_prio)
2080 {
2081   dout(10) << "_omap_commit" << dendl;
2082
2083   unsigned max_write_size = cache->max_dir_commit_size;
2084   unsigned write_size = 0;
2085
2086   if (op_prio < 0)
2087     op_prio = CEPH_MSG_PRIO_DEFAULT;
2088
2089   // snap purge?
2090   const set<snapid_t> *snaps = NULL;
2091   SnapRealm *realm = inode->find_snaprealm();
2092   if (!realm->have_past_parents_open()) {
2093     dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
2094   } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
2095     snaps = &realm->get_snaps();
2096     dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
2097              << " < " << realm->get_last_destroyed()
2098              << ", snap purge based on " << *snaps << dendl;
2099     // fnode.snap_purged_thru = realm->get_last_destroyed();
2100   }
2101
2102   set<string> to_remove;
2103   map<string, bufferlist> to_set;
2104
2105   C_GatherBuilder gather(g_ceph_context,
2106                          new C_OnFinisher(new C_IO_Dir_Committed(this,
2107                                                                  get_version()),
2108                                           cache->mds->finisher));
2109
2110   SnapContext snapc;
2111   object_t oid = get_ondisk_object();
2112   object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
2113
2114   if (!stale_items.empty()) {
2115     for (compact_set<string>::iterator p = stale_items.begin();
2116          p != stale_items.end();
2117          ++p) {
2118       to_remove.insert(*p);
2119       write_size += (*p).length();
2120     }
2121     stale_items.clear();
2122   }
2123
2124   for (map_t::iterator p = items.begin();
2125       p != items.end(); ) {
2126     CDentry *dn = p->second;
2127     ++p;
2128
2129     string key;
2130     dn->key().encode(key);
2131
2132     if (dn->last != CEPH_NOSNAP &&
2133         snaps && try_trim_snap_dentry(dn, *snaps)) {
2134       dout(10) << " rm " << key << dendl;
2135       write_size += key.length();
2136       to_remove.insert(key);
2137       continue;
2138     }
2139
2140     if (!dn->is_dirty() &&
2141         (!dn->state_test(CDentry::STATE_FRAGMENTING) || dn->get_linkage()->is_null()))
2142       continue;  // skip clean dentries
2143
2144     if (dn->get_linkage()->is_null()) {
2145       dout(10) << " rm " << dn->name << " " << *dn << dendl;
2146       write_size += key.length();
2147       to_remove.insert(key);
2148     } else {
2149       dout(10) << " set " << dn->name << " " << *dn << dendl;
2150       bufferlist dnbl;
2151       _encode_dentry(dn, dnbl, snaps);
2152       write_size += key.length() + dnbl.length();
2153       to_set[key].swap(dnbl);
2154     }
2155
2156     if (write_size >= max_write_size) {
2157       ObjectOperation op;
2158       op.priority = op_prio;
2159
2160       // don't create new dirfrag blindly
2161       if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2162         op.stat(NULL, (ceph::real_time*) NULL, NULL);
2163
2164       if (!to_set.empty())
2165         op.omap_set(to_set);
2166       if (!to_remove.empty())
2167         op.omap_rm_keys(to_remove);
2168
2169       cache->mds->objecter->mutate(oid, oloc, op, snapc,
2170                                    ceph::real_clock::now(),
2171                                    0, gather.new_sub());
2172
2173       write_size = 0;
2174       to_set.clear();
2175       to_remove.clear();
2176     }
2177   }
2178
2179   ObjectOperation op;
2180   op.priority = op_prio;
2181
2182   // don't create new dirfrag blindly
2183   if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2184     op.stat(NULL, (ceph::real_time*)NULL, NULL);
2185
2186   /*
2187    * save the header at the last moment.. If we were to send it off before other
2188    * updates, but die before sending them all, we'd think that the on-disk state
2189    * was fully committed even though it wasn't! However, since the messages are
2190    * strictly ordered between the MDS and the OSD, and since messages to a given
2191    * PG are strictly ordered, if we simply send the message containing the header
2192    * off last, we cannot get our header into an incorrect state.
2193    */
2194   bufferlist header;
2195   ::encode(fnode, header);
2196   op.omap_set_header(header);
2197
2198   if (!to_set.empty())
2199     op.omap_set(to_set);
2200   if (!to_remove.empty())
2201     op.omap_rm_keys(to_remove);
2202
2203   cache->mds->objecter->mutate(oid, oloc, op, snapc,
2204                                ceph::real_clock::now(),
2205                                0, gather.new_sub());
2206
2207   gather.activate();
2208 }
2209
2210 void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
2211                           const set<snapid_t> *snaps)
2212 {
2213   // clear dentry NEW flag, if any.  we can no longer silently drop it.
2214   dn->clear_new();
2215
2216   ::encode(dn->first, bl);
2217
2218   // primary or remote?
2219   if (dn->linkage.is_remote()) {
2220     inodeno_t ino = dn->linkage.get_remote_ino();
2221     unsigned char d_type = dn->linkage.get_remote_d_type();
2222     dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' remote ino " << ino << dendl;
2223     
2224     // marker, name, ino
2225     bl.append('L');         // remote link
2226     ::encode(ino, bl);
2227     ::encode(d_type, bl);
2228   } else if (dn->linkage.is_primary()) {
2229     // primary link
2230     CInode *in = dn->linkage.get_inode();
2231     assert(in);
2232     
2233     dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' inode " << *in << dendl;
2234     
2235     // marker, name, inode, [symlink string]
2236     bl.append('I');         // inode
2237
2238     if (in->is_multiversion()) {
2239       if (!in->snaprealm) {
2240         if (snaps)
2241           in->purge_stale_snap_data(*snaps);
2242       } else if (in->snaprealm->have_past_parents_open()) {
2243         in->purge_stale_snap_data(in->snaprealm->get_snaps());
2244       }
2245     }
2246
2247     bufferlist snap_blob;
2248     in->encode_snap_blob(snap_blob);
2249     in->encode_bare(bl, cache->mds->mdsmap->get_up_features(), &snap_blob);
2250   } else {
2251     assert(!dn->linkage.is_null());
2252   }
2253 }
2254
2255 void CDir::_commit(version_t want, int op_prio)
2256 {
2257   dout(10) << "_commit want " << want << " on " << *this << dendl;
2258
2259   // we can't commit things in the future.
2260   // (even the projected future.)
2261   assert(want <= get_version() || get_version() == 0);
2262
2263   // check pre+postconditions.
2264   assert(is_auth());
2265
2266   // already committed?
2267   if (committed_version >= want) {
2268     dout(10) << "already committed " << committed_version << " >= " << want << dendl;
2269     return;
2270   }
2271   // already committing >= want?
2272   if (committing_version >= want) {
2273     dout(10) << "already committing " << committing_version << " >= " << want << dendl;
2274     assert(state_test(STATE_COMMITTING));
2275     return;
2276   }
2277
2278   // alrady committed an older version?
2279   if (committing_version > committed_version) {
2280     dout(10) << "already committing older " << committing_version << ", waiting for that to finish" << dendl;
2281     return;
2282   }
2283   
2284   // commit.
2285   committing_version = get_version();
2286
2287   // mark committing (if not already)
2288   if (!state_test(STATE_COMMITTING)) {
2289     dout(10) << "marking committing" << dendl;
2290     state_set(STATE_COMMITTING);
2291   }
2292   
2293   if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_commit);
2294
2295   _omap_commit(op_prio);
2296 }
2297
2298
2299 /**
2300  * _committed
2301  *
2302  * @param v version i just committed
2303  */
2304 void CDir::_committed(int r, version_t v)
2305 {
2306   if (r < 0) {
2307     // the directory could be partly purged during MDS failover
2308     if (r == -ENOENT && committed_version == 0 &&
2309         !inode->is_base() && get_parent_dir()->inode->is_stray()) {
2310       r = 0;
2311       if (inode->snaprealm)
2312         inode->state_set(CInode::STATE_MISSINGOBJS);
2313     }
2314     if (r < 0) {
2315       dout(1) << "commit error " << r << " v " << v << dendl;
2316       cache->mds->clog->error() << "failed to commit dir " << dirfrag() << " object,"
2317                                 << " errno " << r;
2318       cache->mds->handle_write_error(r);
2319       return;
2320     }
2321   }
2322
2323   dout(10) << "_committed v " << v << " on " << *this << dendl;
2324   assert(is_auth());
2325
2326   bool stray = inode->is_stray();
2327
2328   // take note.
2329   assert(v > committed_version);
2330   assert(v <= committing_version);
2331   committed_version = v;
2332
2333   // _all_ commits done?
2334   if (committing_version == committed_version) 
2335     state_clear(CDir::STATE_COMMITTING);
2336
2337   // _any_ commit, even if we've been redirtied, means we're no longer new.
2338   item_new.remove_myself();
2339   
2340   // dir clean?
2341   if (committed_version == get_version()) 
2342     mark_clean();
2343
2344   // dentries clean?
2345   for (map_t::iterator it = items.begin();
2346        it != items.end(); ) {
2347     CDentry *dn = it->second;
2348     ++it;
2349     
2350     // inode?
2351     if (dn->linkage.is_primary()) {
2352       CInode *in = dn->linkage.get_inode();
2353       assert(in);
2354       assert(in->is_auth());
2355       
2356       if (committed_version >= in->get_version()) {
2357         if (in->is_dirty()) {
2358           dout(15) << " dir " << committed_version << " >= inode " << in->get_version() << " now clean " << *in << dendl;
2359           in->mark_clean();
2360         }
2361       } else {
2362         dout(15) << " dir " << committed_version << " < inode " << in->get_version() << " still dirty " << *in << dendl;
2363         assert(in->is_dirty() || in->last < CEPH_NOSNAP);  // special case for cow snap items (not predirtied)
2364       }
2365     }
2366
2367     // dentry
2368     if (committed_version >= dn->get_version()) {
2369       if (dn->is_dirty()) {
2370         dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
2371         dn->mark_clean();
2372
2373         // drop clean null stray dentries immediately
2374         if (stray && 
2375             dn->get_num_ref() == 0 &&
2376             !dn->is_projected() &&
2377             dn->get_linkage()->is_null())
2378           remove_dentry(dn);
2379       } 
2380     } else {
2381       dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
2382     }
2383   }
2384
2385   // finishers?
2386   bool were_waiters = !waiting_for_commit.empty();
2387   
2388   compact_map<version_t, list<MDSInternalContextBase*> >::iterator p = waiting_for_commit.begin();
2389   while (p != waiting_for_commit.end()) {
2390     compact_map<version_t, list<MDSInternalContextBase*> >::iterator n = p;
2391     ++n;
2392     if (p->first > committed_version) {
2393       dout(10) << " there are waiters for " << p->first << ", committing again" << dendl;
2394       _commit(p->first, -1);
2395       break;
2396     }
2397     cache->mds->queue_waiters(p->second);
2398     waiting_for_commit.erase(p);
2399     p = n;
2400   } 
2401
2402   // try drop dentries in this dirfrag if it's about to be purged
2403   if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
2404       inode->snaprealm)
2405     cache->maybe_eval_stray(inode, true);
2406
2407   // unpin if we kicked the last waiter.
2408   if (were_waiters &&
2409       waiting_for_commit.empty())
2410     auth_unpin(this);
2411 }
2412
2413
2414
2415
2416 // IMPORT/EXPORT
2417
2418 void CDir::encode_export(bufferlist& bl)
2419 {
2420   assert(!is_projected());
2421   ::encode(first, bl);
2422   ::encode(fnode, bl);
2423   ::encode(dirty_old_rstat, bl);
2424   ::encode(committed_version, bl);
2425
2426   ::encode(state, bl);
2427   ::encode(dir_rep, bl);
2428
2429   ::encode(pop_me, bl);
2430   ::encode(pop_auth_subtree, bl);
2431
2432   ::encode(dir_rep_by, bl);  
2433   ::encode(get_replicas(), bl);
2434
2435   get(PIN_TEMPEXPORTING);
2436 }
2437
2438 void CDir::finish_export(utime_t now)
2439 {
2440   state &= MASK_STATE_EXPORT_KEPT;
2441   pop_auth_subtree_nested.sub(now, cache->decayrate, pop_auth_subtree);
2442   pop_me.zero(now);
2443   pop_auth_subtree.zero(now);
2444   put(PIN_TEMPEXPORTING);
2445   dirty_old_rstat.clear();
2446 }
2447
2448 void CDir::decode_import(bufferlist::iterator& blp, utime_t now, LogSegment *ls)
2449 {
2450   ::decode(first, blp);
2451   ::decode(fnode, blp);
2452   ::decode(dirty_old_rstat, blp);
2453   projected_version = fnode.version;
2454   ::decode(committed_version, blp);
2455   committing_version = committed_version;
2456
2457   unsigned s;
2458   ::decode(s, blp);
2459   state &= MASK_STATE_IMPORT_KEPT;
2460   state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
2461
2462   if (is_dirty()) {
2463     get(PIN_DIRTY);
2464     _mark_dirty(ls);
2465   }
2466
2467   ::decode(dir_rep, blp);
2468
2469   ::decode(pop_me, now, blp);
2470   ::decode(pop_auth_subtree, now, blp);
2471   pop_auth_subtree_nested.add(now, cache->decayrate, pop_auth_subtree);
2472
2473   ::decode(dir_rep_by, blp);
2474   ::decode(get_replicas(), blp);
2475   if (is_replicated()) get(PIN_REPLICATED);
2476
2477   replica_nonce = 0;  // no longer defined
2478
2479   // did we import some dirty scatterlock data?
2480   if (dirty_old_rstat.size() ||
2481       !(fnode.rstat == fnode.accounted_rstat)) {
2482     cache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
2483     ls->dirty_dirfrag_nest.push_back(&inode->item_dirty_dirfrag_nest);
2484   }
2485   if (!(fnode.fragstat == fnode.accounted_fragstat)) {
2486     cache->mds->locker->mark_updated_scatterlock(&inode->filelock);
2487     ls->dirty_dirfrag_dir.push_back(&inode->item_dirty_dirfrag_dir);
2488   }
2489   if (is_dirty_dft()) {
2490     if (inode->dirfragtreelock.get_state() != LOCK_MIX &&
2491         inode->dirfragtreelock.is_stable()) {
2492       // clear stale dirtydft
2493       state_clear(STATE_DIRTYDFT);
2494     } else {
2495       cache->mds->locker->mark_updated_scatterlock(&inode->dirfragtreelock);
2496       ls->dirty_dirfrag_dirfragtree.push_back(&inode->item_dirty_dirfrag_dirfragtree);
2497     }
2498   }
2499 }
2500
2501
2502
2503
2504 /********************************
2505  * AUTHORITY
2506  */
2507
2508 /*
2509  * if dir_auth.first == parent, auth is same as inode.
2510  * unless .second != unknown, in which case that sticks.
2511  */
2512 mds_authority_t CDir::authority() const
2513 {
2514   if (is_subtree_root()) 
2515     return dir_auth;
2516   else
2517     return inode->authority();
2518 }
2519
2520 /** is_subtree_root()
2521  * true if this is an auth delegation point.  
2522  * that is, dir_auth != default (parent,unknown)
2523  *
2524  * some key observations:
2525  *  if i am auth:
2526  *    - any region bound will be an export, or frozen.
2527  *
2528  * note that this DOES heed dir_auth.pending
2529  */
2530 /*
2531 bool CDir::is_subtree_root()
2532 {
2533   if (dir_auth == CDIR_AUTH_DEFAULT) {
2534     //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2535     //<< " on " << ino() << dendl;
2536     return false;
2537   } else {
2538     //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2539     //<< " on " << ino() << dendl;
2540     return true;
2541   }
2542 }
2543 */
2544
2545 /** contains(x)
2546  * true if we are x, or an ancestor of x
2547  */
2548 bool CDir::contains(CDir *x)
2549 {
2550   while (1) {
2551     if (x == this)
2552       return true;
2553     x = x->get_inode()->get_projected_parent_dir();
2554     if (x == 0)
2555       return false;    
2556   }
2557 }
2558
2559
2560
2561 /** set_dir_auth
2562  */
2563 void CDir::set_dir_auth(mds_authority_t a)
2564
2565   dout(10) << "setting dir_auth=" << a
2566            << " from " << dir_auth
2567            << " on " << *this << dendl;
2568   
2569   bool was_subtree = is_subtree_root();
2570   bool was_ambiguous = dir_auth.second >= 0;
2571
2572   // set it.
2573   dir_auth = a;
2574
2575   // new subtree root?
2576   if (!was_subtree && is_subtree_root()) {
2577     dout(10) << " new subtree root, adjusting auth_pins" << dendl;
2578     
2579     // adjust nested auth pins
2580     if (get_cum_auth_pins())
2581       inode->adjust_nested_auth_pins(-1, NULL);
2582     
2583     // unpin parent of frozen dir/tree?
2584     if (inode->is_auth()) {
2585       assert(!is_frozen_tree_root());
2586       if (is_frozen_dir())
2587         inode->auth_unpin(this);
2588     }
2589   } 
2590   if (was_subtree && !is_subtree_root()) {
2591     dout(10) << " old subtree root, adjusting auth_pins" << dendl;
2592     
2593     // adjust nested auth pins
2594     if (get_cum_auth_pins())
2595       inode->adjust_nested_auth_pins(1, NULL);
2596
2597     // pin parent of frozen dir/tree?
2598     if (inode->is_auth()) {
2599       assert(!is_frozen_tree_root());
2600       if (is_frozen_dir())
2601         inode->auth_pin(this);
2602     }
2603   }
2604
2605   // newly single auth?
2606   if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
2607     list<MDSInternalContextBase*> ls;
2608     take_waiting(WAIT_SINGLEAUTH, ls);
2609     cache->mds->queue_waiters(ls);
2610   }
2611 }
2612
2613
2614 /*****************************************
2615  * AUTH PINS and FREEZING
2616  *
2617  * the basic plan is that auth_pins only exist in auth regions, and they
2618  * prevent a freeze (and subsequent auth change).  
2619  *
2620  * however, we also need to prevent a parent from freezing if a child is frozen.
2621  * for that reason, the parent inode of a frozen directory is auth_pinned.
2622  *
2623  * the oddity is when the frozen directory is a subtree root.  if that's the case,
2624  * the parent inode isn't frozen.  which means that when subtree authority is adjusted
2625  * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
2626  * time.
2627  *
2628  */
2629
2630 void CDir::auth_pin(void *by) 
2631 {
2632   if (auth_pins == 0)
2633     get(PIN_AUTHPIN);
2634   auth_pins++;
2635
2636 #ifdef MDS_AUTHPIN_SET
2637   auth_pin_set.insert(by);
2638 #endif
2639
2640   dout(10) << "auth_pin by " << by
2641            << " on " << *this
2642            << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2643
2644   // nest pins?
2645   if (!is_subtree_root() &&
2646       get_cum_auth_pins() == 1)
2647     inode->adjust_nested_auth_pins(1, by);
2648 }
2649
2650 void CDir::auth_unpin(void *by) 
2651 {
2652   auth_pins--;
2653
2654 #ifdef MDS_AUTHPIN_SET
2655   assert(auth_pin_set.count(by));
2656   auth_pin_set.erase(auth_pin_set.find(by));
2657 #endif
2658   if (auth_pins == 0)
2659     put(PIN_AUTHPIN);
2660
2661   dout(10) << "auth_unpin by " << by
2662            << " on " << *this
2663            << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2664   assert(auth_pins >= 0);
2665   
2666   int newcum = get_cum_auth_pins();
2667
2668   maybe_finish_freeze();  // pending freeze?
2669   
2670   // nest?
2671   if (!is_subtree_root() &&
2672       newcum == 0)
2673     inode->adjust_nested_auth_pins(-1, by);
2674 }
2675
2676 void CDir::adjust_nested_auth_pins(int inc, int dirinc, void *by)
2677 {
2678   assert(inc);
2679   nested_auth_pins += inc;
2680   dir_auth_pins += dirinc;
2681   
2682   dout(15) << "adjust_nested_auth_pins " << inc << "/" << dirinc << " on " << *this
2683            << " by " << by << " count now "
2684            << auth_pins << " + " << nested_auth_pins << dendl;
2685   assert(nested_auth_pins >= 0);
2686   assert(dir_auth_pins >= 0);
2687
2688   int newcum = get_cum_auth_pins();
2689
2690   maybe_finish_freeze();  // pending freeze?
2691   
2692   // nest?
2693   if (!is_subtree_root()) {
2694     if (newcum == 0)
2695       inode->adjust_nested_auth_pins(-1, by);
2696     else if (newcum == inc)      
2697       inode->adjust_nested_auth_pins(1, by);
2698   }
2699 }
2700
2701 #ifdef MDS_VERIFY_FRAGSTAT
2702 void CDir::verify_fragstat()
2703 {
2704   assert(is_complete());
2705   if (inode->is_stray())
2706     return;
2707
2708   frag_info_t c;
2709   memset(&c, 0, sizeof(c));
2710
2711   for (map_t::iterator it = items.begin();
2712        it != items.end();
2713        ++it) {
2714     CDentry *dn = it->second;
2715     if (dn->is_null())
2716       continue;
2717     
2718     dout(10) << " " << *dn << dendl;
2719     if (dn->is_primary())
2720       dout(10) << "     " << *dn->inode << dendl;
2721
2722     if (dn->is_primary()) {
2723       if (dn->inode->is_dir())
2724         c.nsubdirs++;
2725       else
2726         c.nfiles++;
2727     }
2728     if (dn->is_remote()) {
2729       if (dn->get_remote_d_type() == DT_DIR)
2730         c.nsubdirs++;
2731       else
2732         c.nfiles++;
2733     }
2734   }
2735
2736   if (c.nsubdirs != fnode.fragstat.nsubdirs ||
2737       c.nfiles != fnode.fragstat.nfiles) {
2738     dout(0) << "verify_fragstat failed " << fnode.fragstat << " on " << *this << dendl;
2739     dout(0) << "               i count " << c << dendl;
2740     ceph_abort();
2741   } else {
2742     dout(0) << "verify_fragstat ok " << fnode.fragstat << " on " << *this << dendl;
2743   }
2744 }
2745 #endif
2746
2747 /*****************************************************************************
2748  * FREEZING
2749  */
2750
2751 // FREEZE TREE
2752
2753 bool CDir::freeze_tree()
2754 {
2755   assert(!is_frozen());
2756   assert(!is_freezing());
2757
2758   auth_pin(this);
2759   if (is_freezeable(true)) {
2760     _freeze_tree();
2761     auth_unpin(this);
2762     return true;
2763   } else {
2764     state_set(STATE_FREEZINGTREE);
2765     ++num_freezing_trees;
2766     dout(10) << "freeze_tree waiting " << *this << dendl;
2767     return false;
2768   }
2769 }
2770
2771 void CDir::_freeze_tree()
2772 {
2773   dout(10) << "_freeze_tree " << *this << dendl;
2774   assert(is_freezeable(true));
2775
2776   // twiddle state
2777   if (state_test(STATE_FREEZINGTREE)) {
2778     state_clear(STATE_FREEZINGTREE);   // actually, this may get set again by next context?
2779     --num_freezing_trees;
2780   }
2781
2782   if (is_auth()) {
2783     mds_authority_t auth;
2784     bool was_subtree = is_subtree_root();
2785     if (was_subtree) {
2786       auth = get_dir_auth();
2787     } else {
2788       // temporarily prevent parent subtree from becoming frozen.
2789       inode->auth_pin(this);
2790       // create new subtree
2791       auth = authority();
2792     }
2793
2794     assert(auth.first >= 0);
2795     assert(auth.second == CDIR_AUTH_UNKNOWN);
2796     auth.second = auth.first;
2797     inode->mdcache->adjust_subtree_auth(this, auth);
2798     if (!was_subtree)
2799       inode->auth_unpin(this);
2800   }
2801
2802   state_set(STATE_FROZENTREE);
2803   ++num_frozen_trees;
2804   get(PIN_FROZEN);
2805 }
2806
2807 void CDir::unfreeze_tree()
2808 {
2809   dout(10) << "unfreeze_tree " << *this << dendl;
2810
2811   if (state_test(STATE_FROZENTREE)) {
2812     // frozen.  unfreeze.
2813     state_clear(STATE_FROZENTREE);
2814     --num_frozen_trees;
2815
2816     put(PIN_FROZEN);
2817
2818     if (is_auth()) {
2819       // must be subtree
2820       assert(is_subtree_root());
2821       // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
2822       mds_authority_t auth = get_dir_auth();
2823       assert(auth.first >= 0);
2824       assert(auth.second == auth.first);
2825       auth.second = CDIR_AUTH_UNKNOWN;
2826       inode->mdcache->adjust_subtree_auth(this, auth);
2827     }
2828
2829     // waiters?
2830     finish_waiting(WAIT_UNFREEZE);
2831   } else {
2832     finish_waiting(WAIT_FROZEN, -1);
2833
2834     // freezing.  stop it.
2835     assert(state_test(STATE_FREEZINGTREE));
2836     state_clear(STATE_FREEZINGTREE);
2837     --num_freezing_trees;
2838     auth_unpin(this);
2839     
2840     finish_waiting(WAIT_UNFREEZE);
2841   }
2842 }
2843
2844 bool CDir::is_freezing_tree() const
2845 {
2846   if (num_freezing_trees == 0)
2847     return false;
2848   const CDir *dir = this;
2849   while (1) {
2850     if (dir->is_freezing_tree_root()) return true;
2851     if (dir->is_subtree_root()) return false;
2852     if (dir->inode->parent)
2853       dir = dir->inode->parent->dir;
2854     else
2855       return false; // root on replica
2856   }
2857 }
2858
2859 bool CDir::is_frozen_tree() const
2860 {
2861   if (num_frozen_trees == 0)
2862     return false;
2863   const CDir *dir = this;
2864   while (1) {
2865     if (dir->is_frozen_tree_root()) return true;
2866     if (dir->is_subtree_root()) return false;
2867     if (dir->inode->parent)
2868       dir = dir->inode->parent->dir;
2869     else
2870       return false;  // root on replica
2871   }
2872 }
2873
2874 CDir *CDir::get_frozen_tree_root() 
2875 {
2876   assert(is_frozen());
2877   CDir *dir = this;
2878   while (1) {
2879     if (dir->is_frozen_tree_root()) 
2880       return dir;
2881     if (dir->inode->parent)
2882       dir = dir->inode->parent->dir;
2883     else
2884       ceph_abort();
2885   }
2886 }
2887
2888 class C_Dir_AuthUnpin : public CDirContext {
2889   public:
2890   explicit C_Dir_AuthUnpin(CDir *d) : CDirContext(d) {}
2891   void finish(int r) override {
2892     dir->auth_unpin(dir->get_inode());
2893   }
2894 };
2895
2896 void CDir::maybe_finish_freeze()
2897 {
2898   if (auth_pins != 1 || dir_auth_pins != 0)
2899     return;
2900
2901   // we can freeze the _dir_ even with nested pins...
2902   if (state_test(STATE_FREEZINGDIR)) {
2903     _freeze_dir();
2904     auth_unpin(this);
2905     finish_waiting(WAIT_FROZEN);
2906   }
2907
2908   if (nested_auth_pins != 0)
2909     return;
2910
2911   if (state_test(STATE_FREEZINGTREE)) {
2912     if (!is_subtree_root() && inode->is_frozen()) {
2913       dout(10) << "maybe_finish_freeze !subtree root and frozen inode, waiting for unfreeze on " << inode << dendl;
2914       // retake an auth_pin...
2915       auth_pin(inode);
2916       // and release it when the parent inode unfreezes
2917       inode->add_waiter(WAIT_UNFREEZE, new C_Dir_AuthUnpin(this));
2918       return;
2919     }
2920
2921     _freeze_tree();
2922     auth_unpin(this);
2923     finish_waiting(WAIT_FROZEN);
2924   }
2925 }
2926
2927
2928
2929 // FREEZE DIR
2930
2931 bool CDir::freeze_dir()
2932 {
2933   assert(!is_frozen());
2934   assert(!is_freezing());
2935   
2936   auth_pin(this);
2937   if (is_freezeable_dir(true)) {
2938     _freeze_dir();
2939     auth_unpin(this);
2940     return true;
2941   } else {
2942     state_set(STATE_FREEZINGDIR);
2943     dout(10) << "freeze_dir + wait " << *this << dendl;
2944     return false;
2945   } 
2946 }
2947
2948 void CDir::_freeze_dir()
2949 {
2950   dout(10) << "_freeze_dir " << *this << dendl;
2951   //assert(is_freezeable_dir(true));
2952   // not always true during split because the original fragment may have frozen a while
2953   // ago and we're just now getting around to breaking it up.
2954
2955   state_clear(STATE_FREEZINGDIR);
2956   state_set(STATE_FROZENDIR);
2957   get(PIN_FROZEN);
2958
2959   if (is_auth() && !is_subtree_root())
2960     inode->auth_pin(this);  // auth_pin for duration of freeze
2961 }
2962
2963
2964 void CDir::unfreeze_dir()
2965 {
2966   dout(10) << "unfreeze_dir " << *this << dendl;
2967
2968   if (state_test(STATE_FROZENDIR)) {
2969     state_clear(STATE_FROZENDIR);
2970     put(PIN_FROZEN);
2971
2972     // unpin  (may => FREEZEABLE)   FIXME: is this order good?
2973     if (is_auth() && !is_subtree_root())
2974       inode->auth_unpin(this);
2975
2976     finish_waiting(WAIT_UNFREEZE);
2977   } else {
2978     finish_waiting(WAIT_FROZEN, -1);
2979
2980     // still freezing. stop.
2981     assert(state_test(STATE_FREEZINGDIR));
2982     state_clear(STATE_FREEZINGDIR);
2983     auth_unpin(this);
2984     
2985     finish_waiting(WAIT_UNFREEZE);
2986   }
2987 }
2988
2989 /**
2990  * Slightly less complete than operator<<, because this is intended
2991  * for identifying a directory and its state rather than for dumping
2992  * debug output.
2993  */
2994 void CDir::dump(Formatter *f) const
2995 {
2996   assert(f != NULL);
2997
2998   f->dump_stream("path") << get_path();
2999
3000   f->dump_stream("dirfrag") << dirfrag();
3001   f->dump_int("snapid_first", first);
3002
3003   f->dump_stream("projected_version") << get_projected_version();
3004   f->dump_stream("version") << get_version();
3005   f->dump_stream("committing_version") << get_committing_version();
3006   f->dump_stream("committed_version") << get_committed_version();
3007
3008   f->dump_bool("is_rep", is_rep());
3009
3010   if (get_dir_auth() != CDIR_AUTH_DEFAULT) {
3011     if (get_dir_auth().second == CDIR_AUTH_UNKNOWN) {
3012       f->dump_stream("dir_auth") << get_dir_auth().first;
3013     } else {
3014       f->dump_stream("dir_auth") << get_dir_auth();
3015     }
3016   } else {
3017     f->dump_string("dir_auth", "");
3018   }
3019
3020   f->open_array_section("states");
3021   MDSCacheObject::dump_states(f);
3022   if (state_test(CDir::STATE_COMPLETE)) f->dump_string("state", "complete");
3023   if (state_test(CDir::STATE_FREEZINGTREE)) f->dump_string("state", "freezingtree");
3024   if (state_test(CDir::STATE_FROZENTREE)) f->dump_string("state", "frozentree");
3025   if (state_test(CDir::STATE_FROZENDIR)) f->dump_string("state", "frozendir");
3026   if (state_test(CDir::STATE_FREEZINGDIR)) f->dump_string("state", "freezingdir");
3027   if (state_test(CDir::STATE_EXPORTBOUND)) f->dump_string("state", "exportbound");
3028   if (state_test(CDir::STATE_IMPORTBOUND)) f->dump_string("state", "importbound");
3029   if (state_test(CDir::STATE_BADFRAG)) f->dump_string("state", "badfrag");
3030   f->close_section();
3031
3032   MDSCacheObject::dump(f);
3033 }
3034
3035 /****** Scrub Stuff *******/
3036
3037 void CDir::scrub_info_create() const
3038 {
3039   assert(!scrub_infop);
3040
3041   // break out of const-land to set up implicit initial state
3042   CDir *me = const_cast<CDir*>(this);
3043   fnode_t *fn = me->get_projected_fnode();
3044
3045   std::unique_ptr<scrub_info_t> si(new scrub_info_t());
3046
3047   si->last_recursive.version = si->recursive_start.version =
3048       fn->recursive_scrub_version;
3049   si->last_recursive.time = si->recursive_start.time =
3050       fn->recursive_scrub_stamp;
3051
3052   si->last_local.version = fn->localized_scrub_version;
3053   si->last_local.time = fn->localized_scrub_stamp;
3054
3055   me->scrub_infop.swap(si);
3056 }
3057
3058 void CDir::scrub_initialize(const ScrubHeaderRefConst& header)
3059 {
3060   dout(20) << __func__ << dendl;
3061   assert(is_complete());
3062   assert(header != nullptr);
3063
3064   // FIXME: weird implicit construction, is someone else meant
3065   // to be calling scrub_info_create first?
3066   scrub_info();
3067   assert(scrub_infop && !scrub_infop->directory_scrubbing);
3068
3069   scrub_infop->recursive_start.version = get_projected_version();
3070   scrub_infop->recursive_start.time = ceph_clock_now();
3071
3072   scrub_infop->directories_to_scrub.clear();
3073   scrub_infop->directories_scrubbing.clear();
3074   scrub_infop->directories_scrubbed.clear();
3075   scrub_infop->others_to_scrub.clear();
3076   scrub_infop->others_scrubbing.clear();
3077   scrub_infop->others_scrubbed.clear();
3078
3079   for (map_t::iterator i = items.begin();
3080       i != items.end();
3081       ++i) {
3082     // TODO: handle snapshot scrubbing
3083     if (i->first.snapid != CEPH_NOSNAP)
3084       continue;
3085
3086     CDentry::linkage_t *dnl = i->second->get_projected_linkage();
3087     if (dnl->is_primary()) {
3088       if (dnl->get_inode()->is_dir())
3089         scrub_infop->directories_to_scrub.insert(i->first);
3090       else
3091         scrub_infop->others_to_scrub.insert(i->first);
3092     } else if (dnl->is_remote()) {
3093       // TODO: check remote linkage
3094     }
3095   }
3096   scrub_infop->directory_scrubbing = true;
3097   scrub_infop->header = header;
3098 }
3099
3100 void CDir::scrub_finished()
3101 {
3102   dout(20) << __func__ << dendl;
3103   assert(scrub_infop && scrub_infop->directory_scrubbing);
3104
3105   assert(scrub_infop->directories_to_scrub.empty());
3106   assert(scrub_infop->directories_scrubbing.empty());
3107   scrub_infop->directories_scrubbed.clear();
3108   assert(scrub_infop->others_to_scrub.empty());
3109   assert(scrub_infop->others_scrubbing.empty());
3110   scrub_infop->others_scrubbed.clear();
3111   scrub_infop->directory_scrubbing = false;
3112
3113   scrub_infop->last_recursive = scrub_infop->recursive_start;
3114   scrub_infop->last_scrub_dirty = true;
3115 }
3116
3117 int CDir::_next_dentry_on_set(set<dentry_key_t>& dns, bool missing_okay,
3118                               MDSInternalContext *cb, CDentry **dnout)
3119 {
3120   dentry_key_t dnkey;
3121   CDentry *dn;
3122
3123   while (!dns.empty()) {
3124     set<dentry_key_t>::iterator front = dns.begin();
3125     dnkey = *front;
3126     dn = lookup(dnkey.name);
3127     if (!dn) {
3128       if (!is_complete() &&
3129           (!has_bloom() || is_in_bloom(dnkey.name))) {
3130         // need to re-read this dirfrag
3131         fetch(cb);
3132         return EAGAIN;
3133       }
3134       // okay, we lost it
3135       if (missing_okay) {
3136         dout(15) << " we no longer have directory dentry "
3137                  << dnkey.name << ", assuming it got renamed" << dendl;
3138         dns.erase(dnkey);
3139         continue;
3140       } else {
3141         dout(5) << " we lost dentry " << dnkey.name
3142                 << ", bailing out because that's impossible!" << dendl;
3143         ceph_abort();
3144       }
3145     }
3146     // okay, we got a  dentry
3147     dns.erase(dnkey);
3148
3149     if (dn->get_projected_version() < scrub_infop->last_recursive.version &&
3150         !(scrub_infop->header->get_force())) {
3151       dout(15) << " skip dentry " << dnkey.name
3152                << ", no change since last scrub" << dendl;
3153       continue;
3154     }
3155
3156     *dnout = dn;
3157     return 0;
3158   }
3159   *dnout = NULL;
3160   return ENOENT;
3161 }
3162
3163 int CDir::scrub_dentry_next(MDSInternalContext *cb, CDentry **dnout)
3164 {
3165   dout(20) << __func__ << dendl;
3166   assert(scrub_infop && scrub_infop->directory_scrubbing);
3167
3168   dout(20) << "trying to scrub directories underneath us" << dendl;
3169   int rval = _next_dentry_on_set(scrub_infop->directories_to_scrub, true,
3170                                  cb, dnout);
3171   if (rval == 0) {
3172     dout(20) << __func__ << " inserted to directories scrubbing: "
3173       << *dnout << dendl;
3174     scrub_infop->directories_scrubbing.insert((*dnout)->key());
3175   } else if (rval == EAGAIN) {
3176     // we don't need to do anything else
3177   } else { // we emptied out the directory scrub set
3178     assert(rval == ENOENT);
3179     dout(20) << "no directories left, moving on to other kinds of dentries"
3180              << dendl;
3181     
3182     rval = _next_dentry_on_set(scrub_infop->others_to_scrub, false, cb, dnout);
3183     if (rval == 0) {
3184       dout(20) << __func__ << " inserted to others scrubbing: "
3185         << *dnout << dendl;
3186       scrub_infop->others_scrubbing.insert((*dnout)->key());
3187     }
3188   }
3189   dout(20) << " returning " << rval << " with dn=" << *dnout << dendl;
3190   return rval;
3191 }
3192
3193 void CDir::scrub_dentries_scrubbing(list<CDentry*> *out_dentries)
3194 {
3195   dout(20) << __func__ << dendl;
3196   assert(scrub_infop && scrub_infop->directory_scrubbing);
3197
3198   for (set<dentry_key_t>::iterator i =
3199         scrub_infop->directories_scrubbing.begin();
3200       i != scrub_infop->directories_scrubbing.end();
3201       ++i) {
3202     CDentry *d = lookup(i->name, i->snapid);
3203     assert(d);
3204     out_dentries->push_back(d);
3205   }
3206   for (set<dentry_key_t>::iterator i = scrub_infop->others_scrubbing.begin();
3207       i != scrub_infop->others_scrubbing.end();
3208       ++i) {
3209     CDentry *d = lookup(i->name, i->snapid);
3210     assert(d);
3211     out_dentries->push_back(d);
3212   }
3213 }
3214
3215 void CDir::scrub_dentry_finished(CDentry *dn)
3216 {
3217   dout(20) << __func__ << " on dn " << *dn << dendl;
3218   assert(scrub_infop && scrub_infop->directory_scrubbing);
3219   dentry_key_t dn_key = dn->key();
3220   if (scrub_infop->directories_scrubbing.erase(dn_key)) {
3221     scrub_infop->directories_scrubbed.insert(dn_key);
3222   } else {
3223     assert(scrub_infop->others_scrubbing.count(dn_key));
3224     scrub_infop->others_scrubbing.erase(dn_key);
3225     scrub_infop->others_scrubbed.insert(dn_key);
3226   }
3227 }
3228
3229 void CDir::scrub_maybe_delete_info()
3230 {
3231   if (scrub_infop &&
3232       !scrub_infop->directory_scrubbing &&
3233       !scrub_infop->need_scrub_local &&
3234       !scrub_infop->last_scrub_dirty &&
3235       !scrub_infop->pending_scrub_error &&
3236       scrub_infop->dirty_scrub_stamps.empty()) {
3237     scrub_infop.reset();
3238   }
3239 }
3240
3241 bool CDir::scrub_local()
3242 {
3243   assert(is_complete());
3244   bool rval = check_rstats(true);
3245
3246   scrub_info();
3247   if (rval) {
3248     scrub_infop->last_local.time = ceph_clock_now();
3249     scrub_infop->last_local.version = get_projected_version();
3250     scrub_infop->pending_scrub_error = false;
3251     scrub_infop->last_scrub_dirty = true;
3252   } else {
3253     scrub_infop->pending_scrub_error = true;
3254     if (scrub_infop->header->get_repair())
3255       cache->repair_dirfrag_stats(this);
3256   }
3257   return rval;
3258 }
3259
3260 std::string CDir::get_path() const
3261 {
3262   std::string path;
3263   get_inode()->make_path_string(path, true);
3264   return path;
3265 }
3266
3267 bool CDir::should_split_fast() const
3268 {
3269   // Max size a fragment can be before trigger fast splitting
3270   int fast_limit = g_conf->mds_bal_split_size * g_conf->mds_bal_fragment_fast_factor;
3271
3272   // Fast path: the sum of accounted size and null dentries does not
3273   // exceed threshold: we definitely are not over it.
3274   if (get_frag_size() + get_num_head_null() <= fast_limit) {
3275     return false;
3276   }
3277
3278   // Fast path: the accounted size of the frag exceeds threshold: we
3279   // definitely are over it
3280   if (get_frag_size() > fast_limit) {
3281     return true;
3282   }
3283
3284   int64_t effective_size = 0;
3285
3286   for (const auto &p : items) {
3287     const CDentry *dn = p.second;
3288     if (!dn->get_projected_linkage()->is_null()) {
3289       effective_size++;
3290     }
3291   }
3292
3293   return effective_size > fast_limit;
3294 }
3295
3296 MEMPOOL_DEFINE_OBJECT_FACTORY(CDir, co_dir, mds_co);