Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / CInode.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "include/int_types.h"
16 #include "common/errno.h"
17
18 #include <string>
19 #include <stdio.h>
20
21 #include "CInode.h"
22 #include "CDir.h"
23 #include "CDentry.h"
24
25 #include "MDSRank.h"
26 #include "MDCache.h"
27 #include "MDLog.h"
28 #include "Locker.h"
29 #include "Mutation.h"
30
31 #include "events/EUpdate.h"
32
33 #include "osdc/Objecter.h"
34
35 #include "snap.h"
36
37 #include "LogSegment.h"
38
39 #include "common/Clock.h"
40
41 #include "messages/MLock.h"
42 #include "messages/MClientCaps.h"
43
44 #include "common/config.h"
45 #include "global/global_context.h"
46 #include "include/assert.h"
47
48 #include "mds/MDSContinuation.h"
49 #include "mds/InoTable.h"
50
51 #define dout_context g_ceph_context
52 #define dout_subsys ceph_subsys_mds
53 #undef dout_prefix
54 #define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") "
55
56
57 class CInodeIOContext : public MDSIOContextBase
58 {
59 protected:
60   CInode *in;
61   MDSRank *get_mds() override {return in->mdcache->mds;}
62 public:
63   explicit CInodeIOContext(CInode *in_) : in(in_) {
64     assert(in != NULL);
65   }
66 };
67
68
69 LockType CInode::versionlock_type(CEPH_LOCK_IVERSION);
70 LockType CInode::authlock_type(CEPH_LOCK_IAUTH);
71 LockType CInode::linklock_type(CEPH_LOCK_ILINK);
72 LockType CInode::dirfragtreelock_type(CEPH_LOCK_IDFT);
73 LockType CInode::filelock_type(CEPH_LOCK_IFILE);
74 LockType CInode::xattrlock_type(CEPH_LOCK_IXATTR);
75 LockType CInode::snaplock_type(CEPH_LOCK_ISNAP);
76 LockType CInode::nestlock_type(CEPH_LOCK_INEST);
77 LockType CInode::flocklock_type(CEPH_LOCK_IFLOCK);
78 LockType CInode::policylock_type(CEPH_LOCK_IPOLICY);
79
80 //int cinode_pins[CINODE_NUM_PINS];  // counts
81 ostream& CInode::print_db_line_prefix(ostream& out)
82 {
83   return out << ceph_clock_now() << " mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") ";
84 }
85
86 /*
87  * write caps and lock ids
88  */
89 struct cinode_lock_info_t cinode_lock_info[] = {
90   { CEPH_LOCK_IFILE, CEPH_CAP_ANY_FILE_WR },
91   { CEPH_LOCK_IAUTH, CEPH_CAP_AUTH_EXCL },
92   { CEPH_LOCK_ILINK, CEPH_CAP_LINK_EXCL },
93   { CEPH_LOCK_IXATTR, CEPH_CAP_XATTR_EXCL },
94 };
95 int num_cinode_locks = sizeof(cinode_lock_info) / sizeof(cinode_lock_info[0]);
96
97
98
99 ostream& operator<<(ostream& out, const CInode& in)
100 {
101   string path;
102   in.make_path_string(path, true);
103
104   out << "[inode " << in.inode.ino;
105   out << " [" 
106       << (in.is_multiversion() ? "...":"")
107       << in.first << "," << in.last << "]";
108   out << " " << path << (in.is_dir() ? "/":"");
109
110   if (in.is_auth()) {
111     out << " auth";
112     if (in.is_replicated()) 
113       out << in.get_replicas();
114   } else {
115     mds_authority_t a = in.authority();
116     out << " rep@" << a.first;
117     if (a.second != CDIR_AUTH_UNKNOWN)
118       out << "," << a.second;
119     out << "." << in.get_replica_nonce();
120   }
121
122   if (in.is_symlink())
123     out << " symlink='" << in.symlink << "'";
124   if (in.is_dir() && !in.dirfragtree.empty())
125     out << " " << in.dirfragtree;
126   
127   out << " v" << in.get_version();
128   if (in.get_projected_version() > in.get_version())
129     out << " pv" << in.get_projected_version();
130
131   if (in.is_auth_pinned()) {
132     out << " ap=" << in.get_num_auth_pins() << "+" << in.get_num_nested_auth_pins();
133 #ifdef MDS_AUTHPIN_SET
134     out << "(" << in.auth_pin_set << ")";
135 #endif
136   }
137
138   if (in.snaprealm)
139     out << " snaprealm=" << in.snaprealm;
140
141   if (in.state_test(CInode::STATE_AMBIGUOUSAUTH)) out << " AMBIGAUTH";
142   if (in.state_test(CInode::STATE_NEEDSRECOVER)) out << " needsrecover";
143   if (in.state_test(CInode::STATE_RECOVERING)) out << " recovering";
144   if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " dirtyparent";
145   if (in.state_test(CInode::STATE_MISSINGOBJS)) out << " missingobjs";
146   if (in.is_freezing_inode()) out << " FREEZING=" << in.auth_pin_freeze_allowance;
147   if (in.is_frozen_inode()) out << " FROZEN";
148   if (in.is_frozen_auth_pin()) out << " FROZEN_AUTHPIN";
149
150   const inode_t *pi = in.get_projected_inode();
151   if (pi->is_truncating())
152     out << " truncating(" << pi->truncate_from << " to " << pi->truncate_size << ")";
153
154   if (in.inode.is_dir()) {
155     out << " " << in.inode.dirstat;
156     if (g_conf->mds_debug_scatterstat && in.is_projected()) {
157       const inode_t *pi = in.get_projected_inode();
158       out << "->" << pi->dirstat;
159     }
160   } else {
161     out << " s=" << in.inode.size;
162     if (in.inode.nlink != 1)
163       out << " nl=" << in.inode.nlink;
164   }
165
166   // rstat
167   out << " " << in.inode.rstat;
168   if (!(in.inode.rstat == in.inode.accounted_rstat))
169     out << "/" << in.inode.accounted_rstat;
170   if (g_conf->mds_debug_scatterstat && in.is_projected()) {
171     const inode_t *pi = in.get_projected_inode();
172     out << "->" << pi->rstat;
173     if (!(pi->rstat == pi->accounted_rstat))
174       out << "/" << pi->accounted_rstat;
175   }
176
177   if (!in.client_need_snapflush.empty())
178     out << " need_snapflush=" << in.client_need_snapflush;
179
180
181   // locks
182   if (!in.authlock.is_sync_and_unlocked())
183     out << " " << in.authlock;
184   if (!in.linklock.is_sync_and_unlocked())
185     out << " " << in.linklock;
186   if (in.inode.is_dir()) {
187     if (!in.dirfragtreelock.is_sync_and_unlocked())
188       out << " " << in.dirfragtreelock;
189     if (!in.snaplock.is_sync_and_unlocked())
190       out << " " << in.snaplock;
191     if (!in.nestlock.is_sync_and_unlocked())
192       out << " " << in.nestlock;
193     if (!in.policylock.is_sync_and_unlocked())
194       out << " " << in.policylock;
195   } else  {
196     if (!in.flocklock.is_sync_and_unlocked())
197       out << " " << in.flocklock;
198   }
199   if (!in.filelock.is_sync_and_unlocked())
200     out << " " << in.filelock;
201   if (!in.xattrlock.is_sync_and_unlocked())
202     out << " " << in.xattrlock;
203   if (!in.versionlock.is_sync_and_unlocked())  
204     out << " " << in.versionlock;
205
206   // hack: spit out crap on which clients have caps
207   if (in.inode.client_ranges.size())
208     out << " cr=" << in.inode.client_ranges;
209
210   if (!in.get_client_caps().empty()) {
211     out << " caps={";
212     for (map<client_t,Capability*>::const_iterator it = in.get_client_caps().begin();
213          it != in.get_client_caps().end();
214          ++it) {
215       if (it != in.get_client_caps().begin()) out << ",";
216       out << it->first << "="
217           << ccap_string(it->second->pending());
218       if (it->second->issued() != it->second->pending())
219         out << "/" << ccap_string(it->second->issued());
220       out << "/" << ccap_string(it->second->wanted())
221           << "@" << it->second->get_last_sent();
222     }
223     out << "}";
224     if (in.get_loner() >= 0 || in.get_wanted_loner() >= 0) {
225       out << ",l=" << in.get_loner();
226       if (in.get_loner() != in.get_wanted_loner())
227         out << "(" << in.get_wanted_loner() << ")";
228     }
229   }
230   if (!in.get_mds_caps_wanted().empty()) {
231     out << " mcw={";
232     for (compact_map<int,int>::const_iterator p = in.get_mds_caps_wanted().begin();
233          p != in.get_mds_caps_wanted().end();
234          ++p) {
235       if (p != in.get_mds_caps_wanted().begin())
236         out << ',';
237       out << p->first << '=' << ccap_string(p->second);
238     }
239     out << '}';
240   }
241
242   if (in.get_num_ref()) {
243     out << " |";
244     in.print_pin_set(out);
245   }
246
247   if (in.inode.export_pin != MDS_RANK_NONE) {
248     out << " export_pin=" << in.inode.export_pin;
249   }
250
251   out << " " << &in;
252   out << "]";
253   return out;
254 }
255
256 ostream& operator<<(ostream& out, const CInode::scrub_stamp_info_t& si)
257 {
258   out << "{scrub_start_version: " << si.scrub_start_version
259       << ", scrub_start_stamp: " << si.scrub_start_stamp
260       << ", last_scrub_version: " << si.last_scrub_version
261       << ", last_scrub_stamp: " << si.last_scrub_stamp;
262   return out;
263 }
264
265
266
267 void CInode::print(ostream& out)
268 {
269   out << *this;
270 }
271
272
273
274 void CInode::add_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
275 {
276   dout(10) << "add_need_snapflush client." << client << " snapid " << snapid << " on " << snapin << dendl;
277
278   if (client_need_snapflush.empty()) {
279     get(CInode::PIN_NEEDSNAPFLUSH);
280
281     // FIXME: this is non-optimal, as we'll block freezes/migrations for potentially
282     // long periods waiting for clients to flush their snaps.
283     auth_pin(this);   // pin head inode...
284   }
285
286   set<client_t>& clients = client_need_snapflush[snapid];
287   if (clients.empty())
288     snapin->auth_pin(this);  // ...and pin snapped/old inode!
289   
290   clients.insert(client);
291 }
292
293 void CInode::remove_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
294 {
295   dout(10) << "remove_need_snapflush client." << client << " snapid " << snapid << " on " << snapin << dendl;
296   compact_map<snapid_t, std::set<client_t> >::iterator p = client_need_snapflush.find(snapid);
297   if (p == client_need_snapflush.end()) {
298     dout(10) << " snapid not found" << dendl;
299     return;
300   }
301   if (!p->second.count(client)) {
302     dout(10) << " client not found" << dendl;
303     return;
304   }
305   p->second.erase(client);
306   if (p->second.empty()) {
307     client_need_snapflush.erase(p);
308     snapin->auth_unpin(this);
309
310     if (client_need_snapflush.empty()) {
311       put(CInode::PIN_NEEDSNAPFLUSH);
312       auth_unpin(this);
313     }
314   }
315 }
316
317 bool CInode::split_need_snapflush(CInode *cowin, CInode *in)
318 {
319   dout(10) << "split_need_snapflush [" << cowin->first << "," << cowin->last << "] for " << *cowin << dendl;
320   bool need_flush = false;
321   for (compact_map<snapid_t, set<client_t> >::iterator p = client_need_snapflush.lower_bound(cowin->first);
322        p != client_need_snapflush.end() && p->first < in->first; ) {
323     compact_map<snapid_t, set<client_t> >::iterator q = p;
324     ++p;
325     assert(!q->second.empty());
326     if (cowin->last >= q->first) {
327       cowin->auth_pin(this);
328       need_flush = true;
329     } else
330       client_need_snapflush.erase(q);
331     in->auth_unpin(this);
332   }
333   return need_flush;
334 }
335
336 void CInode::mark_dirty_rstat()
337 {
338   if (!state_test(STATE_DIRTYRSTAT)) {
339     dout(10) << "mark_dirty_rstat" << dendl;
340     state_set(STATE_DIRTYRSTAT);
341     get(PIN_DIRTYRSTAT);
342     CDentry *pdn = get_projected_parent_dn();
343     if (pdn->is_auth()) {
344       CDir *pdir = pdn->dir;
345       pdir->dirty_rstat_inodes.push_back(&dirty_rstat_item);
346       mdcache->mds->locker->mark_updated_scatterlock(&pdir->inode->nestlock);
347     } else {
348       // under cross-MDS rename.
349       // DIRTYRSTAT flag will get cleared when rename finishes
350       assert(state_test(STATE_AMBIGUOUSAUTH));
351     }
352   }
353 }
354 void CInode::clear_dirty_rstat()
355 {
356   if (state_test(STATE_DIRTYRSTAT)) {
357     dout(10) << "clear_dirty_rstat" << dendl;
358     state_clear(STATE_DIRTYRSTAT);
359     put(PIN_DIRTYRSTAT);
360     dirty_rstat_item.remove_myself();
361   }
362 }
363
364 inode_t *CInode::project_inode(map<string,bufferptr> *px) 
365 {
366   if (projected_nodes.empty()) {
367     projected_nodes.push_back(new projected_inode_t(new inode_t(inode)));
368     if (px)
369       *px = xattrs;
370   } else {
371     projected_nodes.push_back(new projected_inode_t(
372         new inode_t(*projected_nodes.back()->inode)));
373     if (px)
374       *px = *get_projected_xattrs();
375   }
376
377   projected_inode_t &pi = *projected_nodes.back();
378
379   if (px) {
380     pi.xattrs = px;
381     ++num_projected_xattrs;
382   }
383
384   if (scrub_infop && scrub_infop->last_scrub_dirty) {
385     pi.inode->last_scrub_stamp = scrub_infop->last_scrub_stamp;
386     pi.inode->last_scrub_version = scrub_infop->last_scrub_version;
387     scrub_infop->last_scrub_dirty = false;
388     scrub_maybe_delete_info();
389   }
390   dout(15) << "project_inode " << pi.inode << dendl;
391   return pi.inode;
392 }
393
394 void CInode::pop_and_dirty_projected_inode(LogSegment *ls) 
395 {
396   assert(!projected_nodes.empty());
397   dout(15) << "pop_and_dirty_projected_inode " << projected_nodes.front()->inode
398            << " v" << projected_nodes.front()->inode->version << dendl;
399   int64_t old_pool = inode.layout.pool_id;
400
401   mark_dirty(projected_nodes.front()->inode->version, ls);
402   inode = *projected_nodes.front()->inode;
403
404   if (inode.is_backtrace_updated())
405     _mark_dirty_parent(ls, old_pool != inode.layout.pool_id);
406
407   map<string,bufferptr> *px = projected_nodes.front()->xattrs;
408   if (px) {
409     --num_projected_xattrs;
410     xattrs = *px;
411     delete px;
412   }
413
414   if (projected_nodes.front()->snapnode) {
415     pop_projected_snaprealm(projected_nodes.front()->snapnode);
416     --num_projected_srnodes;
417   }
418
419   delete projected_nodes.front()->inode;
420   delete projected_nodes.front();
421
422   projected_nodes.pop_front();
423 }
424
425 sr_t *CInode::project_snaprealm(snapid_t snapid)
426 {
427   sr_t *cur_srnode = get_projected_srnode();
428   sr_t *new_srnode;
429
430   if (cur_srnode) {
431     new_srnode = new sr_t(*cur_srnode);
432   } else {
433     new_srnode = new sr_t();
434     new_srnode->created = snapid;
435     new_srnode->current_parent_since = get_oldest_snap();
436   }
437   dout(10) << "project_snaprealm " << new_srnode << dendl;
438   projected_nodes.back()->snapnode = new_srnode;
439   ++num_projected_srnodes;
440   return new_srnode;
441 }
442
443 /* if newparent != parent, add parent to past_parents
444  if parent DNE, we need to find what the parent actually is and fill that in */
445 void CInode::project_past_snaprealm_parent(SnapRealm *newparent)
446 {
447   sr_t *new_snap = project_snaprealm();
448   SnapRealm *oldparent;
449   if (!snaprealm) {
450     oldparent = find_snaprealm();
451     new_snap->seq = oldparent->get_newest_seq();
452   }
453   else
454     oldparent = snaprealm->parent;
455
456   if (newparent != oldparent) {
457     snapid_t oldparentseq = oldparent->get_newest_seq();
458     if (oldparentseq + 1 > new_snap->current_parent_since) {
459       new_snap->past_parents[oldparentseq].ino = oldparent->inode->ino();
460       new_snap->past_parents[oldparentseq].first = new_snap->current_parent_since;
461     }
462     new_snap->current_parent_since = MAX(oldparentseq, newparent->get_last_created()) + 1;
463   }
464 }
465
466 void CInode::pop_projected_snaprealm(sr_t *next_snaprealm)
467 {
468   assert(next_snaprealm);
469   dout(10) << "pop_projected_snaprealm " << next_snaprealm
470           << " seq" << next_snaprealm->seq << dendl;
471   bool invalidate_cached_snaps = false;
472   if (!snaprealm) {
473     open_snaprealm();
474   } else if (next_snaprealm->past_parents.size() !=
475              snaprealm->srnode.past_parents.size()) {
476     invalidate_cached_snaps = true;
477     // re-open past parents
478     snaprealm->_close_parents();
479
480     dout(10) << " realm " << *snaprealm << " past_parents " << snaprealm->srnode.past_parents
481              << " -> " << next_snaprealm->past_parents << dendl;
482   }
483   snaprealm->srnode = *next_snaprealm;
484   delete next_snaprealm;
485
486   // we should be able to open these up (or have them already be open).
487   bool ok = snaprealm->_open_parents(NULL);
488   assert(ok);
489
490   if (invalidate_cached_snaps)
491     snaprealm->invalidate_cached_snaps();
492
493   if (snaprealm->parent)
494     dout(10) << " realm " << *snaprealm << " parent " << *snaprealm->parent << dendl;
495 }
496
497
498 // ====== CInode =======
499
500 // dirfrags
501
502 __u32 InodeStoreBase::hash_dentry_name(const string &dn)
503 {
504   int which = inode.dir_layout.dl_dir_hash;
505   if (!which)
506     which = CEPH_STR_HASH_LINUX;
507   assert(ceph_str_hash_valid(which));
508   return ceph_str_hash(which, dn.data(), dn.length());
509 }
510
511 frag_t InodeStoreBase::pick_dirfrag(const string& dn)
512 {
513   if (dirfragtree.empty())
514     return frag_t();          // avoid the string hash if we can.
515
516   __u32 h = hash_dentry_name(dn);
517   return dirfragtree[h];
518 }
519
520 bool CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls)
521 {
522   bool all = true;
523   list<frag_t> fglist;
524   dirfragtree.get_leaves_under(fg, fglist);
525   for (list<frag_t>::iterator p = fglist.begin(); p != fglist.end(); ++p)
526     if (dirfrags.count(*p))
527       ls.push_back(dirfrags[*p]);
528     else 
529       all = false;
530
531   if (all)
532     return all;
533
534   fragtree_t tmpdft;
535   tmpdft.force_to_leaf(g_ceph_context, fg);
536   for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) {
537     tmpdft.force_to_leaf(g_ceph_context, p->first);
538     if (fg.contains(p->first) && !dirfragtree.is_leaf(p->first))
539       ls.push_back(p->second);
540   }
541
542   all = true;
543   tmpdft.get_leaves_under(fg, fglist);
544   for (list<frag_t>::iterator p = fglist.begin(); p != fglist.end(); ++p)
545     if (!dirfrags.count(*p)) {
546       all = false;
547       break;
548     }
549
550   return all;
551 }
552
553 void CInode::verify_dirfrags()
554 {
555   bool bad = false;
556   for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) {
557     if (!dirfragtree.is_leaf(p->first)) {
558       dout(0) << "have open dirfrag " << p->first << " but not leaf in " << dirfragtree
559               << ": " << *p->second << dendl;
560       bad = true;
561     }
562   }
563   assert(!bad);
564 }
565
566 void CInode::force_dirfrags()
567 {
568   bool bad = false;
569   for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) {
570     if (!dirfragtree.is_leaf(p->first)) {
571       dout(0) << "have open dirfrag " << p->first << " but not leaf in " << dirfragtree
572               << ": " << *p->second << dendl;
573       bad = true;
574     }
575   }
576
577   if (bad) {
578     list<frag_t> leaves;
579     dirfragtree.get_leaves(leaves);
580     for (list<frag_t>::iterator p = leaves.begin(); p != leaves.end(); ++p)
581       mdcache->get_force_dirfrag(dirfrag_t(ino(),*p), true);
582   }
583
584   verify_dirfrags();
585 }
586
587 CDir *CInode::get_approx_dirfrag(frag_t fg)
588 {
589   CDir *dir = get_dirfrag(fg);
590   if (dir) return dir;
591
592   // find a child?
593   list<CDir*> ls;
594   get_dirfrags_under(fg, ls);
595   if (!ls.empty()) 
596     return ls.front();
597
598   // try parents?
599   while (fg.bits() > 0) {
600     fg = fg.parent();
601     dir = get_dirfrag(fg);
602     if (dir) return dir;
603   }
604   return NULL;
605 }       
606
607 void CInode::get_dirfrags(list<CDir*>& ls) 
608 {
609   // all dirfrags
610   for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
611        p != dirfrags.end();
612        ++p)
613     ls.push_back(p->second);
614 }
615 void CInode::get_nested_dirfrags(list<CDir*>& ls) 
616 {  
617   // dirfrags in same subtree
618   for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
619        p != dirfrags.end();
620        ++p)
621     if (!p->second->is_subtree_root())
622       ls.push_back(p->second);
623 }
624 void CInode::get_subtree_dirfrags(list<CDir*>& ls) 
625
626   // dirfrags that are roots of new subtrees
627   for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
628        p != dirfrags.end();
629        ++p)
630     if (p->second->is_subtree_root())
631       ls.push_back(p->second);
632 }
633
634
635 CDir *CInode::get_or_open_dirfrag(MDCache *mdcache, frag_t fg)
636 {
637   assert(is_dir());
638
639   // have it?
640   CDir *dir = get_dirfrag(fg);
641   if (!dir) {
642     // create it.
643     assert(is_auth() || mdcache->mds->is_any_replay());
644     dir = new CDir(this, fg, mdcache, is_auth());
645     add_dirfrag(dir);
646   }
647   return dir;
648 }
649
650 CDir *CInode::add_dirfrag(CDir *dir)
651 {
652   assert(dirfrags.count(dir->dirfrag().frag) == 0);
653   dirfrags[dir->dirfrag().frag] = dir;
654
655   if (stickydir_ref > 0) {
656     dir->state_set(CDir::STATE_STICKY);
657     dir->get(CDir::PIN_STICKY);
658   }
659
660   maybe_export_pin();
661
662   return dir;
663 }
664
665 void CInode::close_dirfrag(frag_t fg)
666 {
667   dout(14) << "close_dirfrag " << fg << dendl;
668   assert(dirfrags.count(fg));
669   
670   CDir *dir = dirfrags[fg];
671   dir->remove_null_dentries();
672   
673   // clear dirty flag
674   if (dir->is_dirty())
675     dir->mark_clean();
676   
677   if (stickydir_ref > 0) {
678     dir->state_clear(CDir::STATE_STICKY);
679     dir->put(CDir::PIN_STICKY);
680   }
681   
682   // dump any remaining dentries, for debugging purposes
683   for (CDir::map_t::iterator p = dir->items.begin();
684        p != dir->items.end();
685        ++p) 
686     dout(14) << "close_dirfrag LEFTOVER dn " << *p->second << dendl;
687
688   assert(dir->get_num_ref() == 0);
689   delete dir;
690   dirfrags.erase(fg);
691 }
692
693 void CInode::close_dirfrags()
694 {
695   while (!dirfrags.empty()) 
696     close_dirfrag(dirfrags.begin()->first);
697 }
698
699 bool CInode::has_subtree_root_dirfrag(int auth)
700 {
701   for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
702        p != dirfrags.end();
703        ++p)
704     if (p->second->is_subtree_root() &&
705         (auth == -1 || p->second->dir_auth.first == auth))
706       return true;
707   return false;
708 }
709
710 bool CInode::has_subtree_or_exporting_dirfrag()
711 {
712   for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
713        p != dirfrags.end();
714        ++p)
715     if (p->second->is_subtree_root() ||
716         p->second->state_test(CDir::STATE_EXPORTING))
717       return true;
718   return false;
719 }
720
721 void CInode::get_stickydirs()
722 {
723   if (stickydir_ref == 0) {
724     get(PIN_STICKYDIRS);
725     for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
726          p != dirfrags.end();
727          ++p) {
728       p->second->state_set(CDir::STATE_STICKY);
729       p->second->get(CDir::PIN_STICKY);
730     }
731   }
732   stickydir_ref++;
733 }
734
735 void CInode::put_stickydirs()
736 {
737   assert(stickydir_ref > 0);
738   stickydir_ref--;
739   if (stickydir_ref == 0) {
740     put(PIN_STICKYDIRS);
741     for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
742          p != dirfrags.end();
743          ++p) {
744       p->second->state_clear(CDir::STATE_STICKY);
745       p->second->put(CDir::PIN_STICKY);
746     }
747   }
748 }
749
750
751
752
753
754 // pins
755
756 void CInode::first_get()
757 {
758   // pin my dentry?
759   if (parent) 
760     parent->get(CDentry::PIN_INODEPIN);
761 }
762
763 void CInode::last_put() 
764 {
765   // unpin my dentry?
766   if (parent) 
767     parent->put(CDentry::PIN_INODEPIN);
768 }
769
770 void CInode::_put()
771 {
772   if (get_num_ref() == (int)is_dirty() + (int)is_dirty_parent())
773     mdcache->maybe_eval_stray(this, true);
774 }
775
776 void CInode::add_remote_parent(CDentry *p) 
777 {
778   if (remote_parents.empty())
779     get(PIN_REMOTEPARENT);
780   remote_parents.insert(p);
781 }
782 void CInode::remove_remote_parent(CDentry *p) 
783 {
784   remote_parents.erase(p);
785   if (remote_parents.empty())
786     put(PIN_REMOTEPARENT);
787 }
788
789
790
791
792 CDir *CInode::get_parent_dir()
793 {
794   if (parent)
795     return parent->dir;
796   return NULL;
797 }
798 CDir *CInode::get_projected_parent_dir()
799 {
800   CDentry *p = get_projected_parent_dn();
801   if (p)
802     return p->dir;
803   return NULL;
804 }
805 CInode *CInode::get_parent_inode() 
806 {
807   if (parent) 
808     return parent->dir->inode;
809   return NULL;
810 }
811
812 bool CInode::is_projected_ancestor_of(CInode *other)
813 {
814   while (other) {
815     if (other == this)
816       return true;
817     if (!other->get_projected_parent_dn())
818       break;
819     other = other->get_projected_parent_dn()->get_dir()->get_inode();
820   }
821   return false;
822 }
823
824 /*
825  * Because a non-directory inode may have multiple links, the use_parent
826  * argument allows selecting which parent to use for path construction. This
827  * argument is only meaningful for the final component (i.e. the first of the
828  * nested calls) because directories cannot have multiple hard links. If
829  * use_parent is NULL and projected is true, the primary parent's projected
830  * inode is used all the way up the path chain. Otherwise the primary parent
831  * stable inode is used.
832  */
833 void CInode::make_path_string(string& s, bool projected, const CDentry *use_parent) const
834 {
835   if (!use_parent) {
836     use_parent = projected ? get_projected_parent_dn() : parent;
837   }
838
839   if (use_parent) {
840     use_parent->make_path_string(s, projected);
841   } else if (is_root()) {
842     s = "";
843   } else if (is_mdsdir()) {
844     char t[40];
845     uint64_t eino(ino());
846     eino -= MDS_INO_MDSDIR_OFFSET;
847     snprintf(t, sizeof(t), "~mds%" PRId64, eino);
848     s = t;
849   } else {
850     char n[40];
851     uint64_t eino(ino());
852     snprintf(n, sizeof(n), "#%" PRIx64, eino);
853     s += n;
854   }
855 }
856
857 void CInode::make_path(filepath& fp, bool projected) const
858 {
859   const CDentry *use_parent = projected ? get_projected_parent_dn() : parent;
860   if (use_parent) {
861     assert(!is_base());
862     use_parent->make_path(fp, projected);
863   } else {
864     fp = filepath(ino());
865   }
866 }
867
868 void CInode::name_stray_dentry(string& dname)
869 {
870   char s[20];
871   snprintf(s, sizeof(s), "%llx", (unsigned long long)inode.ino.val);
872   dname = s;
873 }
874
875 version_t CInode::pre_dirty()
876 {
877   version_t pv;
878   CDentry* _cdentry = get_projected_parent_dn(); 
879   if (_cdentry) {
880     pv = _cdentry->pre_dirty(get_projected_version());
881     dout(10) << "pre_dirty " << pv << " (current v " << inode.version << ")" << dendl;
882   } else {
883     assert(is_base());
884     pv = get_projected_version() + 1;
885   }
886   // force update backtrace for old format inode (see inode_t::decode)
887   if (inode.backtrace_version == 0 && !projected_nodes.empty()) {
888     inode_t *pi = projected_nodes.back()->inode;
889     if (pi->backtrace_version == 0)
890       pi->update_backtrace(pv);
891   }
892   return pv;
893 }
894
895 void CInode::_mark_dirty(LogSegment *ls)
896 {
897   if (!state_test(STATE_DIRTY)) {
898     state_set(STATE_DIRTY);
899     get(PIN_DIRTY);
900     assert(ls);
901   }
902   
903   // move myself to this segment's dirty list
904   if (ls) 
905     ls->dirty_inodes.push_back(&item_dirty);
906 }
907
908 void CInode::mark_dirty(version_t pv, LogSegment *ls) {
909   
910   dout(10) << "mark_dirty " << *this << dendl;
911
912   /*
913     NOTE: I may already be dirty, but this fn _still_ needs to be called so that
914     the directory is (perhaps newly) dirtied, and so that parent_dir_version is 
915     updated below.
916   */
917   
918   // only auth can get dirty.  "dirty" async data in replicas is relative to
919   // filelock state, not the dirty flag.
920   assert(is_auth());
921   
922   // touch my private version
923   assert(inode.version < pv);
924   inode.version = pv;
925   _mark_dirty(ls);
926
927   // mark dentry too
928   if (parent)
929     parent->mark_dirty(pv, ls);
930 }
931
932
933 void CInode::mark_clean()
934 {
935   dout(10) << " mark_clean " << *this << dendl;
936   if (state_test(STATE_DIRTY)) {
937     state_clear(STATE_DIRTY);
938     put(PIN_DIRTY);
939     
940     // remove myself from ls dirty list
941     item_dirty.remove_myself();
942   }
943 }    
944
945
946 // --------------
947 // per-inode storage
948 // (currently for root inode only)
949
950 struct C_IO_Inode_Stored : public CInodeIOContext {
951   version_t version;
952   Context *fin;
953   C_IO_Inode_Stored(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
954   void finish(int r) override {
955     in->_stored(r, version, fin);
956   }
957 };
958
959 object_t InodeStoreBase::get_object_name(inodeno_t ino, frag_t fg, const char *suffix)
960 {
961   char n[60];
962   snprintf(n, sizeof(n), "%llx.%08llx%s", (long long unsigned)ino, (long long unsigned)fg, suffix ? suffix : "");
963   return object_t(n);
964 }
965
966 void CInode::store(MDSInternalContextBase *fin)
967 {
968   dout(10) << "store " << get_version() << dendl;
969   assert(is_base());
970
971   if (snaprealm)
972     purge_stale_snap_data(snaprealm->get_snaps());
973
974   // encode
975   bufferlist bl;
976   string magic = CEPH_FS_ONDISK_MAGIC;
977   ::encode(magic, bl);
978   encode_store(bl, mdcache->mds->mdsmap->get_up_features());
979
980   // write it.
981   SnapContext snapc;
982   ObjectOperation m;
983   m.write_full(bl);
984
985   object_t oid = CInode::get_object_name(ino(), frag_t(), ".inode");
986   object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
987
988   Context *newfin =
989     new C_OnFinisher(new C_IO_Inode_Stored(this, get_version(), fin),
990                      mdcache->mds->finisher);
991   mdcache->mds->objecter->mutate(oid, oloc, m, snapc,
992                                  ceph::real_clock::now(), 0,
993                                  newfin);
994 }
995
996 void CInode::_stored(int r, version_t v, Context *fin)
997 {
998   if (r < 0) {
999     dout(1) << "store error " << r << " v " << v << " on " << *this << dendl;
1000     mdcache->mds->clog->error() << "failed to store inode " << ino()
1001                                 << " object: " << cpp_strerror(r);
1002     mdcache->mds->handle_write_error(r);
1003     fin->complete(r);
1004     return;
1005   }
1006
1007   dout(10) << "_stored " << v << " on " << *this << dendl;
1008   if (v == get_projected_version())
1009     mark_clean();
1010
1011   fin->complete(0);
1012 }
1013
1014 void CInode::flush(MDSInternalContextBase *fin)
1015 {
1016   dout(10) << "flush " << *this << dendl;
1017   assert(is_auth() && can_auth_pin());
1018
1019   MDSGatherBuilder gather(g_ceph_context);
1020
1021   if (is_dirty_parent()) {
1022     store_backtrace(gather.new_sub());
1023   }
1024   if (is_dirty()) {
1025     if (is_base()) {
1026       store(gather.new_sub());
1027     } else {
1028       parent->dir->commit(0, gather.new_sub());
1029     }
1030   }
1031
1032   if (gather.has_subs()) {
1033     gather.set_finisher(fin);
1034     gather.activate();
1035   } else {
1036     fin->complete(0);
1037   }
1038 }
1039
1040 struct C_IO_Inode_Fetched : public CInodeIOContext {
1041   bufferlist bl, bl2;
1042   Context *fin;
1043   C_IO_Inode_Fetched(CInode *i, Context *f) : CInodeIOContext(i), fin(f) {}
1044   void finish(int r) override {
1045     // Ignore 'r', because we fetch from two places, so r is usually ENOENT
1046     in->_fetched(bl, bl2, fin);
1047   }
1048 };
1049
1050 void CInode::fetch(MDSInternalContextBase *fin)
1051 {
1052   dout(10) << "fetch" << dendl;
1053
1054   C_IO_Inode_Fetched *c = new C_IO_Inode_Fetched(this, fin);
1055   C_GatherBuilder gather(g_ceph_context, new C_OnFinisher(c, mdcache->mds->finisher));
1056
1057   object_t oid = CInode::get_object_name(ino(), frag_t(), "");
1058   object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
1059
1060   // Old on-disk format: inode stored in xattr of a dirfrag
1061   ObjectOperation rd;
1062   rd.getxattr("inode", &c->bl, NULL);
1063   mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, (bufferlist*)NULL, 0, gather.new_sub());
1064
1065   // Current on-disk format: inode stored in a .inode object
1066   object_t oid2 = CInode::get_object_name(ino(), frag_t(), ".inode");
1067   mdcache->mds->objecter->read(oid2, oloc, 0, 0, CEPH_NOSNAP, &c->bl2, 0, gather.new_sub());
1068
1069   gather.activate();
1070 }
1071
1072 void CInode::_fetched(bufferlist& bl, bufferlist& bl2, Context *fin)
1073 {
1074   dout(10) << "_fetched got " << bl.length() << " and " << bl2.length() << dendl;
1075   bufferlist::iterator p;
1076   if (bl2.length()) {
1077     p = bl2.begin();
1078   } else if (bl.length()) {
1079     p = bl.begin();
1080   } else {
1081     derr << "No data while reading inode " << ino() << dendl;
1082     fin->complete(-ENOENT);
1083     return;
1084   }
1085
1086   // Attempt decode
1087   try {
1088     string magic;
1089     ::decode(magic, p);
1090     dout(10) << " magic is '" << magic << "' (expecting '"
1091              << CEPH_FS_ONDISK_MAGIC << "')" << dendl;
1092     if (magic != CEPH_FS_ONDISK_MAGIC) {
1093       dout(0) << "on disk magic '" << magic << "' != my magic '" << CEPH_FS_ONDISK_MAGIC
1094               << "'" << dendl;
1095       fin->complete(-EINVAL);
1096     } else {
1097       decode_store(p);
1098       dout(10) << "_fetched " << *this << dendl;
1099       fin->complete(0);
1100     }
1101   } catch (buffer::error &err) {
1102     derr << "Corrupt inode " << ino() << ": " << err << dendl;
1103     fin->complete(-EINVAL);
1104     return;
1105   }
1106 }
1107
1108 void CInode::build_backtrace(int64_t pool, inode_backtrace_t& bt)
1109 {
1110   bt.ino = inode.ino;
1111   bt.ancestors.clear();
1112   bt.pool = pool;
1113
1114   CInode *in = this;
1115   CDentry *pdn = get_parent_dn();
1116   while (pdn) {
1117     CInode *diri = pdn->get_dir()->get_inode();
1118     bt.ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->name, in->inode.version));
1119     in = diri;
1120     pdn = in->get_parent_dn();
1121   }
1122   for (compact_set<int64_t>::iterator i = inode.old_pools.begin();
1123        i != inode.old_pools.end();
1124        ++i) {
1125     // don't add our own pool id to old_pools to avoid looping (e.g. setlayout 0, 1, 0)
1126     if (*i != pool)
1127       bt.old_pools.insert(*i);
1128   }
1129 }
1130
1131 struct C_IO_Inode_StoredBacktrace : public CInodeIOContext {
1132   version_t version;
1133   Context *fin;
1134   C_IO_Inode_StoredBacktrace(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
1135   void finish(int r) override {
1136     in->_stored_backtrace(r, version, fin);
1137   }
1138 };
1139
1140 void CInode::store_backtrace(MDSInternalContextBase *fin, int op_prio)
1141 {
1142   dout(10) << "store_backtrace on " << *this << dendl;
1143   assert(is_dirty_parent());
1144
1145   if (op_prio < 0)
1146     op_prio = CEPH_MSG_PRIO_DEFAULT;
1147
1148   auth_pin(this);
1149
1150   const int64_t pool = get_backtrace_pool();
1151   inode_backtrace_t bt;
1152   build_backtrace(pool, bt);
1153   bufferlist parent_bl;
1154   ::encode(bt, parent_bl);
1155
1156   ObjectOperation op;
1157   op.priority = op_prio;
1158   op.create(false);
1159   op.setxattr("parent", parent_bl);
1160
1161   bufferlist layout_bl;
1162   ::encode(inode.layout, layout_bl, mdcache->mds->mdsmap->get_up_features());
1163   op.setxattr("layout", layout_bl);
1164
1165   SnapContext snapc;
1166   object_t oid = get_object_name(ino(), frag_t(), "");
1167   object_locator_t oloc(pool);
1168   Context *fin2 = new C_OnFinisher(
1169     new C_IO_Inode_StoredBacktrace(this, inode.backtrace_version, fin),
1170     mdcache->mds->finisher);
1171
1172   if (!state_test(STATE_DIRTYPOOL) || inode.old_pools.empty()) {
1173     dout(20) << __func__ << ": no dirtypool or no old pools" << dendl;
1174     mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1175                                    ceph::real_clock::now(),
1176                                    0, fin2);
1177     return;
1178   }
1179
1180   C_GatherBuilder gather(g_ceph_context, fin2);
1181   mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1182                                  ceph::real_clock::now(),
1183                                  0, gather.new_sub());
1184
1185   // In the case where DIRTYPOOL is set, we update all old pools backtraces
1186   // such that anyone reading them will see the new pool ID in
1187   // inode_backtrace_t::pool and go read everything else from there.
1188   for (compact_set<int64_t>::iterator p = inode.old_pools.begin();
1189        p != inode.old_pools.end();
1190        ++p) {
1191     if (*p == pool)
1192       continue;
1193
1194     dout(20) << __func__ << ": updating old pool " << *p << dendl;
1195
1196     ObjectOperation op;
1197     op.priority = op_prio;
1198     op.create(false);
1199     op.setxattr("parent", parent_bl);
1200
1201     object_locator_t oloc(*p);
1202     mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1203                                    ceph::real_clock::now(),
1204                                    0, gather.new_sub());
1205   }
1206   gather.activate();
1207 }
1208
1209 void CInode::_stored_backtrace(int r, version_t v, Context *fin)
1210 {
1211   if (r == -ENOENT) {
1212     const int64_t pool = get_backtrace_pool();
1213     bool exists = mdcache->mds->objecter->with_osdmap(
1214         [pool](const OSDMap &osd_map) {
1215           return osd_map.have_pg_pool(pool);
1216         });
1217
1218     // This ENOENT is because the pool doesn't exist (the user deleted it
1219     // out from under us), so the backtrace can never be written, so pretend
1220     // to succeed so that the user can proceed to e.g. delete the file.
1221     if (!exists) {
1222       dout(4) << "store_backtrace got ENOENT: a data pool was deleted "
1223                  "beneath us!" << dendl;
1224       r = 0;
1225     }
1226   }
1227
1228   if (r < 0) {
1229     dout(1) << "store backtrace error " << r << " v " << v << dendl;
1230     mdcache->mds->clog->error() << "failed to store backtrace on ino "
1231                                 << ino() << " object"
1232                                 << ", pool " << get_backtrace_pool()
1233                                 << ", errno " << r;
1234     mdcache->mds->handle_write_error(r);
1235     if (fin)
1236       fin->complete(r);
1237     return;
1238   }
1239
1240   dout(10) << "_stored_backtrace v " << v <<  dendl;
1241
1242   auth_unpin(this);
1243   if (v == inode.backtrace_version)
1244     clear_dirty_parent();
1245   if (fin)
1246     fin->complete(0);
1247 }
1248
1249 void CInode::fetch_backtrace(Context *fin, bufferlist *backtrace)
1250 {
1251   mdcache->fetch_backtrace(inode.ino, get_backtrace_pool(), *backtrace, fin);
1252 }
1253
1254 void CInode::_mark_dirty_parent(LogSegment *ls, bool dirty_pool)
1255 {
1256   if (!state_test(STATE_DIRTYPARENT)) {
1257     dout(10) << "mark_dirty_parent" << dendl;
1258     state_set(STATE_DIRTYPARENT);
1259     get(PIN_DIRTYPARENT);
1260     assert(ls);
1261   }
1262   if (dirty_pool)
1263     state_set(STATE_DIRTYPOOL);
1264   if (ls)
1265     ls->dirty_parent_inodes.push_back(&item_dirty_parent);
1266 }
1267
1268 void CInode::clear_dirty_parent()
1269 {
1270   if (state_test(STATE_DIRTYPARENT)) {
1271     dout(10) << "clear_dirty_parent" << dendl;
1272     state_clear(STATE_DIRTYPARENT);
1273     state_clear(STATE_DIRTYPOOL);
1274     put(PIN_DIRTYPARENT);
1275     item_dirty_parent.remove_myself();
1276   }
1277 }
1278
1279 void CInode::verify_diri_backtrace(bufferlist &bl, int err)
1280 {
1281   if (is_base() || is_dirty_parent() || !is_auth())
1282     return;
1283
1284   dout(10) << "verify_diri_backtrace" << dendl;
1285
1286   if (err == 0) {
1287     inode_backtrace_t backtrace;
1288     ::decode(backtrace, bl);
1289     CDentry *pdn = get_parent_dn();
1290     if (backtrace.ancestors.empty() ||
1291         backtrace.ancestors[0].dname != pdn->name ||
1292         backtrace.ancestors[0].dirino != pdn->get_dir()->ino())
1293       err = -EINVAL;
1294   }
1295
1296   if (err) {
1297     MDSRank *mds = mdcache->mds;
1298     mds->clog->error() << "bad backtrace on directory inode " << ino();
1299     assert(!"bad backtrace" == (g_conf->mds_verify_backtrace > 1));
1300
1301     _mark_dirty_parent(mds->mdlog->get_current_segment(), false);
1302     mds->mdlog->flush();
1303   }
1304 }
1305
1306 // ------------------
1307 // parent dir
1308
1309
1310 void InodeStoreBase::encode_bare(bufferlist &bl, uint64_t features,
1311                                  const bufferlist *snap_blob) const
1312 {
1313   ::encode(inode, bl, features);
1314   if (is_symlink())
1315     ::encode(symlink, bl);
1316   ::encode(dirfragtree, bl);
1317   ::encode(xattrs, bl);
1318   if (snap_blob)
1319     ::encode(*snap_blob, bl);
1320   else
1321     ::encode(bufferlist(), bl);
1322   ::encode(old_inodes, bl, features);
1323   ::encode(oldest_snap, bl);
1324   ::encode(damage_flags, bl);
1325 }
1326
1327 void InodeStoreBase::encode(bufferlist &bl, uint64_t features,
1328                             const bufferlist *snap_blob) const
1329 {
1330   ENCODE_START(6, 4, bl);
1331   encode_bare(bl, features, snap_blob);
1332   ENCODE_FINISH(bl);
1333 }
1334
1335 void CInode::encode_store(bufferlist& bl, uint64_t features)
1336 {
1337   bufferlist snap_blob;
1338   encode_snap_blob(snap_blob);
1339   InodeStoreBase::encode(bl, mdcache->mds->mdsmap->get_up_features(),
1340                          &snap_blob);
1341 }
1342
1343 void InodeStoreBase::decode_bare(bufferlist::iterator &bl,
1344                               bufferlist& snap_blob, __u8 struct_v)
1345 {
1346   ::decode(inode, bl);
1347   if (is_symlink())
1348     ::decode(symlink, bl);
1349   ::decode(dirfragtree, bl);
1350   ::decode(xattrs, bl);
1351   ::decode(snap_blob, bl);
1352
1353   ::decode(old_inodes, bl);
1354   if (struct_v == 2 && inode.is_dir()) {
1355     bool default_layout_exists;
1356     ::decode(default_layout_exists, bl);
1357     if (default_layout_exists) {
1358       ::decode(struct_v, bl); // this was a default_file_layout
1359       ::decode(inode.layout, bl); // but we only care about the layout portion
1360     }
1361   }
1362
1363   if (struct_v >= 5) {
1364     // InodeStore is embedded in dentries without proper versioning, so
1365     // we consume up to the end of the buffer
1366     if (!bl.end()) {
1367       ::decode(oldest_snap, bl);
1368     }
1369
1370     if (!bl.end()) {
1371       ::decode(damage_flags, bl);
1372     }
1373   }
1374 }
1375
1376
1377 void InodeStoreBase::decode(bufferlist::iterator &bl, bufferlist& snap_blob)
1378 {
1379   DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl);
1380   decode_bare(bl, snap_blob, struct_v);
1381   DECODE_FINISH(bl);
1382 }
1383
1384 void CInode::decode_store(bufferlist::iterator& bl)
1385 {
1386   bufferlist snap_blob;
1387   InodeStoreBase::decode(bl, snap_blob);
1388   decode_snap_blob(snap_blob);
1389 }
1390
1391 // ------------------
1392 // locking
1393
1394 void CInode::set_object_info(MDSCacheObjectInfo &info)
1395 {
1396   info.ino = ino();
1397   info.snapid = last;
1398 }
1399
1400 void CInode::encode_lock_state(int type, bufferlist& bl)
1401 {
1402   ::encode(first, bl);
1403
1404   switch (type) {
1405   case CEPH_LOCK_IAUTH:
1406     ::encode(inode.version, bl);
1407     ::encode(inode.ctime, bl);
1408     ::encode(inode.mode, bl);
1409     ::encode(inode.uid, bl);
1410     ::encode(inode.gid, bl);  
1411     break;
1412     
1413   case CEPH_LOCK_ILINK:
1414     ::encode(inode.version, bl);
1415     ::encode(inode.ctime, bl);
1416     ::encode(inode.nlink, bl);
1417     break;
1418     
1419   case CEPH_LOCK_IDFT:
1420     if (is_auth()) {
1421       ::encode(inode.version, bl);
1422     } else {
1423       // treat flushing as dirty when rejoining cache
1424       bool dirty = dirfragtreelock.is_dirty_or_flushing();
1425       ::encode(dirty, bl);
1426     }
1427     {
1428       // encode the raw tree
1429       ::encode(dirfragtree, bl);
1430
1431       // also specify which frags are mine
1432       set<frag_t> myfrags;
1433       list<CDir*> dfls;
1434       get_dirfrags(dfls);
1435       for (list<CDir*>::iterator p = dfls.begin(); p != dfls.end(); ++p) 
1436         if ((*p)->is_auth()) {
1437           frag_t fg = (*p)->get_frag();
1438           myfrags.insert(fg);
1439         }
1440       ::encode(myfrags, bl);
1441     }
1442     break;
1443     
1444   case CEPH_LOCK_IFILE:
1445     if (is_auth()) {
1446       ::encode(inode.version, bl);
1447       ::encode(inode.ctime, bl);
1448       ::encode(inode.mtime, bl);
1449       ::encode(inode.atime, bl);
1450       ::encode(inode.time_warp_seq, bl);
1451       if (!is_dir()) {
1452         ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1453         ::encode(inode.size, bl);
1454         ::encode(inode.truncate_seq, bl);
1455         ::encode(inode.truncate_size, bl);
1456         ::encode(inode.client_ranges, bl);
1457         ::encode(inode.inline_data, bl);
1458       }
1459     } else {
1460       // treat flushing as dirty when rejoining cache
1461       bool dirty = filelock.is_dirty_or_flushing();
1462       ::encode(dirty, bl);
1463     }
1464
1465     {
1466       dout(15) << "encode_lock_state inode.dirstat is " << inode.dirstat << dendl;
1467       ::encode(inode.dirstat, bl);  // only meaningful if i am auth.
1468       bufferlist tmp;
1469       __u32 n = 0;
1470       for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1471            p != dirfrags.end();
1472            ++p) {
1473         frag_t fg = p->first;
1474         CDir *dir = p->second;
1475         if (is_auth() || dir->is_auth()) {
1476           fnode_t *pf = dir->get_projected_fnode();
1477           dout(15) << fg << " " << *dir << dendl;
1478           dout(20) << fg << "           fragstat " << pf->fragstat << dendl;
1479           dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
1480           ::encode(fg, tmp);
1481           ::encode(dir->first, tmp);
1482           ::encode(pf->fragstat, tmp);
1483           ::encode(pf->accounted_fragstat, tmp);
1484           n++;
1485         }
1486       }
1487       ::encode(n, bl);
1488       bl.claim_append(tmp);
1489     }
1490     break;
1491
1492   case CEPH_LOCK_INEST:
1493     if (is_auth()) {
1494       ::encode(inode.version, bl);
1495     } else {
1496       // treat flushing as dirty when rejoining cache
1497       bool dirty = nestlock.is_dirty_or_flushing();
1498       ::encode(dirty, bl);
1499     }
1500     {
1501       dout(15) << "encode_lock_state inode.rstat is " << inode.rstat << dendl;
1502       ::encode(inode.rstat, bl);  // only meaningful if i am auth.
1503       bufferlist tmp;
1504       __u32 n = 0;
1505       for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1506            p != dirfrags.end();
1507            ++p) {
1508         frag_t fg = p->first;
1509         CDir *dir = p->second;
1510         if (is_auth() || dir->is_auth()) {
1511           fnode_t *pf = dir->get_projected_fnode();
1512           dout(10) << fg << " " << *dir << dendl;
1513           dout(10) << fg << " " << pf->rstat << dendl;
1514           dout(10) << fg << " " << pf->rstat << dendl;
1515           dout(10) << fg << " " << dir->dirty_old_rstat << dendl;
1516           ::encode(fg, tmp);
1517           ::encode(dir->first, tmp);
1518           ::encode(pf->rstat, tmp);
1519           ::encode(pf->accounted_rstat, tmp);
1520           ::encode(dir->dirty_old_rstat, tmp);
1521           n++;
1522         }
1523       }
1524       ::encode(n, bl);
1525       bl.claim_append(tmp);
1526     }
1527     break;
1528     
1529   case CEPH_LOCK_IXATTR:
1530     ::encode(inode.version, bl);
1531     ::encode(inode.ctime, bl);
1532     ::encode(xattrs, bl);
1533     break;
1534
1535   case CEPH_LOCK_ISNAP:
1536     ::encode(inode.version, bl);
1537     ::encode(inode.ctime, bl);
1538     encode_snap(bl);
1539     break;
1540
1541   case CEPH_LOCK_IFLOCK:
1542     ::encode(inode.version, bl);
1543     _encode_file_locks(bl);
1544     break;
1545
1546   case CEPH_LOCK_IPOLICY:
1547     if (inode.is_dir()) {
1548       ::encode(inode.version, bl);
1549       ::encode(inode.ctime, bl);
1550       ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1551       ::encode(inode.quota, bl);
1552       ::encode(inode.export_pin, bl);
1553     }
1554     break;
1555   
1556   default:
1557     ceph_abort();
1558   }
1559 }
1560
1561
1562 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1563
1564 void CInode::decode_lock_state(int type, bufferlist& bl)
1565 {
1566   bufferlist::iterator p = bl.begin();
1567   utime_t tm;
1568
1569   snapid_t newfirst;
1570   ::decode(newfirst, p);
1571
1572   if (!is_auth() && newfirst != first) {
1573     dout(10) << "decode_lock_state first " << first << " -> " << newfirst << dendl;
1574     assert(newfirst > first);
1575     if (!is_multiversion() && parent) {
1576       assert(parent->first == first);
1577       parent->first = newfirst;
1578     }
1579     first = newfirst;
1580   }
1581
1582   switch (type) {
1583   case CEPH_LOCK_IAUTH:
1584     ::decode(inode.version, p);
1585     ::decode(tm, p);
1586     if (inode.ctime < tm) inode.ctime = tm;
1587     ::decode(inode.mode, p);
1588     ::decode(inode.uid, p);
1589     ::decode(inode.gid, p);
1590     break;
1591
1592   case CEPH_LOCK_ILINK:
1593     ::decode(inode.version, p);
1594     ::decode(tm, p);
1595     if (inode.ctime < tm) inode.ctime = tm;
1596     ::decode(inode.nlink, p);
1597     break;
1598
1599   case CEPH_LOCK_IDFT:
1600     if (is_auth()) {
1601       bool replica_dirty;
1602       ::decode(replica_dirty, p);
1603       if (replica_dirty) {
1604         dout(10) << "decode_lock_state setting dftlock dirty flag" << dendl;
1605         dirfragtreelock.mark_dirty();  // ok bc we're auth and caller will handle
1606       }
1607     } else {
1608       ::decode(inode.version, p);
1609     }
1610     {
1611       fragtree_t temp;
1612       ::decode(temp, p);
1613       set<frag_t> authfrags;
1614       ::decode(authfrags, p);
1615       if (is_auth()) {
1616         // auth.  believe replica's auth frags only.
1617         for (set<frag_t>::iterator p = authfrags.begin(); p != authfrags.end(); ++p)
1618           if (!dirfragtree.is_leaf(*p)) {
1619             dout(10) << " forcing frag " << *p << " to leaf (split|merge)" << dendl;
1620             dirfragtree.force_to_leaf(g_ceph_context, *p);
1621             dirfragtreelock.mark_dirty();  // ok bc we're auth and caller will handle
1622           }
1623       } else {
1624         // replica.  take the new tree, BUT make sure any open
1625         //  dirfrags remain leaves (they may have split _after_ this
1626         //  dft was scattered, or we may still be be waiting on the
1627         //  notify from the auth)
1628         dirfragtree.swap(temp);
1629         for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1630              p != dirfrags.end();
1631              ++p) {
1632           if (!dirfragtree.is_leaf(p->first)) {
1633             dout(10) << " forcing open dirfrag " << p->first << " to leaf (racing with split|merge)" << dendl;
1634             dirfragtree.force_to_leaf(g_ceph_context, p->first);
1635           }
1636           if (p->second->is_auth())
1637             p->second->state_clear(CDir::STATE_DIRTYDFT);
1638         }
1639       }
1640       if (g_conf->mds_debug_frag)
1641         verify_dirfrags();
1642     }
1643     break;
1644
1645   case CEPH_LOCK_IFILE:
1646     if (!is_auth()) {
1647       ::decode(inode.version, p);
1648       ::decode(tm, p);
1649       if (inode.ctime < tm) inode.ctime = tm;
1650       ::decode(inode.mtime, p);
1651       ::decode(inode.atime, p);
1652       ::decode(inode.time_warp_seq, p);
1653       if (!is_dir()) {
1654         ::decode(inode.layout, p);
1655         ::decode(inode.size, p);
1656         ::decode(inode.truncate_seq, p);
1657         ::decode(inode.truncate_size, p);
1658         ::decode(inode.client_ranges, p);
1659         ::decode(inode.inline_data, p);
1660       }
1661     } else {
1662       bool replica_dirty;
1663       ::decode(replica_dirty, p);
1664       if (replica_dirty) {
1665         dout(10) << "decode_lock_state setting filelock dirty flag" << dendl;
1666         filelock.mark_dirty();  // ok bc we're auth and caller will handle
1667       }
1668     }
1669     {
1670       frag_info_t dirstat;
1671       ::decode(dirstat, p);
1672       if (!is_auth()) {
1673         dout(10) << " taking inode dirstat " << dirstat << " for " << *this << dendl;
1674         inode.dirstat = dirstat;    // take inode summation if replica
1675       }
1676       __u32 n;
1677       ::decode(n, p);
1678       dout(10) << " ...got " << n << " fragstats on " << *this << dendl;
1679       while (n--) {
1680         frag_t fg;
1681         snapid_t fgfirst;
1682         frag_info_t fragstat;
1683         frag_info_t accounted_fragstat;
1684         ::decode(fg, p);
1685         ::decode(fgfirst, p);
1686         ::decode(fragstat, p);
1687         ::decode(accounted_fragstat, p);
1688         dout(10) << fg << " [" << fgfirst << ",head] " << dendl;
1689         dout(10) << fg << "           fragstat " << fragstat << dendl;
1690         dout(20) << fg << " accounted_fragstat " << accounted_fragstat << dendl;
1691
1692         CDir *dir = get_dirfrag(fg);
1693         if (is_auth()) {
1694           assert(dir);                // i am auth; i had better have this dir open
1695           dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1696                    << " on " << *dir << dendl;
1697           dir->first = fgfirst;
1698           dir->fnode.fragstat = fragstat;
1699           dir->fnode.accounted_fragstat = accounted_fragstat;
1700           dir->first = fgfirst;
1701           if (!(fragstat == accounted_fragstat)) {
1702             dout(10) << fg << " setting filelock updated flag" << dendl;
1703             filelock.mark_dirty();  // ok bc we're auth and caller will handle
1704           }
1705         } else {
1706           if (dir && dir->is_auth()) {
1707             dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1708                      << " on " << *dir << dendl;
1709             dir->first = fgfirst;
1710             fnode_t *pf = dir->get_projected_fnode();
1711             finish_scatter_update(&filelock, dir,
1712                                   inode.dirstat.version, pf->accounted_fragstat.version);
1713           }
1714         }
1715       }
1716     }
1717     break;
1718
1719   case CEPH_LOCK_INEST:
1720     if (is_auth()) {
1721       bool replica_dirty;
1722       ::decode(replica_dirty, p);
1723       if (replica_dirty) {
1724         dout(10) << "decode_lock_state setting nestlock dirty flag" << dendl;
1725         nestlock.mark_dirty();  // ok bc we're auth and caller will handle
1726       }
1727     } else {
1728       ::decode(inode.version, p);
1729     }
1730     {
1731       nest_info_t rstat;
1732       ::decode(rstat, p);
1733       if (!is_auth()) {
1734         dout(10) << " taking inode rstat " << rstat << " for " << *this << dendl;
1735         inode.rstat = rstat;    // take inode summation if replica
1736       }
1737       __u32 n;
1738       ::decode(n, p);
1739       while (n--) {
1740         frag_t fg;
1741         snapid_t fgfirst;
1742         nest_info_t rstat;
1743         nest_info_t accounted_rstat;
1744         compact_map<snapid_t,old_rstat_t> dirty_old_rstat;
1745         ::decode(fg, p);
1746         ::decode(fgfirst, p);
1747         ::decode(rstat, p);
1748         ::decode(accounted_rstat, p);
1749         ::decode(dirty_old_rstat, p);
1750         dout(10) << fg << " [" << fgfirst << ",head]" << dendl;
1751         dout(10) << fg << "               rstat " << rstat << dendl;
1752         dout(10) << fg << "     accounted_rstat " << accounted_rstat << dendl;
1753         dout(10) << fg << "     dirty_old_rstat " << dirty_old_rstat << dendl;
1754
1755         CDir *dir = get_dirfrag(fg);
1756         if (is_auth()) {
1757           assert(dir);                // i am auth; i had better have this dir open
1758           dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1759                    << " on " << *dir << dendl;
1760           dir->first = fgfirst;
1761           dir->fnode.rstat = rstat;
1762           dir->fnode.accounted_rstat = accounted_rstat;
1763           dir->dirty_old_rstat.swap(dirty_old_rstat);
1764           if (!(rstat == accounted_rstat) || !dir->dirty_old_rstat.empty()) {
1765             dout(10) << fg << " setting nestlock updated flag" << dendl;
1766             nestlock.mark_dirty();  // ok bc we're auth and caller will handle
1767           }
1768         } else {
1769           if (dir && dir->is_auth()) {
1770             dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1771                      << " on " << *dir << dendl;
1772             dir->first = fgfirst;
1773             fnode_t *pf = dir->get_projected_fnode();
1774             finish_scatter_update(&nestlock, dir,
1775                                   inode.rstat.version, pf->accounted_rstat.version);
1776           }
1777         }
1778       }
1779     }
1780     break;
1781
1782   case CEPH_LOCK_IXATTR:
1783     ::decode(inode.version, p);
1784     ::decode(tm, p);
1785     if (inode.ctime < tm) inode.ctime = tm;
1786     ::decode(xattrs, p);
1787     break;
1788
1789   case CEPH_LOCK_ISNAP:
1790     {
1791       ::decode(inode.version, p);
1792       ::decode(tm, p);
1793       if (inode.ctime < tm) inode.ctime = tm;
1794       snapid_t seq = 0;
1795       if (snaprealm)
1796         seq = snaprealm->srnode.seq;
1797       decode_snap(p);
1798       if (snaprealm && snaprealm->srnode.seq != seq)
1799         mdcache->do_realm_invalidate_and_update_notify(this, seq ? CEPH_SNAP_OP_UPDATE:CEPH_SNAP_OP_SPLIT);
1800     }
1801     break;
1802
1803   case CEPH_LOCK_IFLOCK:
1804     ::decode(inode.version, p);
1805     _decode_file_locks(p);
1806     break;
1807
1808   case CEPH_LOCK_IPOLICY:
1809     if (inode.is_dir()) {
1810       ::decode(inode.version, p);
1811       ::decode(tm, p);
1812       if (inode.ctime < tm) inode.ctime = tm;
1813       ::decode(inode.layout, p);
1814       ::decode(inode.quota, p);
1815       mds_rank_t old_pin = inode.export_pin;
1816       ::decode(inode.export_pin, p);
1817       maybe_export_pin(old_pin != inode.export_pin);
1818     }
1819     break;
1820
1821   default:
1822     ceph_abort();
1823   }
1824 }
1825
1826
1827 bool CInode::is_dirty_scattered()
1828 {
1829   return
1830     filelock.is_dirty_or_flushing() ||
1831     nestlock.is_dirty_or_flushing() ||
1832     dirfragtreelock.is_dirty_or_flushing();
1833 }
1834
1835 void CInode::clear_scatter_dirty()
1836 {
1837   filelock.remove_dirty();
1838   nestlock.remove_dirty();
1839   dirfragtreelock.remove_dirty();
1840 }
1841
1842 void CInode::clear_dirty_scattered(int type)
1843 {
1844   dout(10) << "clear_dirty_scattered " << type << " on " << *this << dendl;
1845   switch (type) {
1846   case CEPH_LOCK_IFILE:
1847     item_dirty_dirfrag_dir.remove_myself();
1848     break;
1849
1850   case CEPH_LOCK_INEST:
1851     item_dirty_dirfrag_nest.remove_myself();
1852     break;
1853
1854   case CEPH_LOCK_IDFT:
1855     item_dirty_dirfrag_dirfragtree.remove_myself();
1856     break;
1857
1858   default:
1859     ceph_abort();
1860   }
1861 }
1862
1863
1864 /*
1865  * when we initially scatter a lock, we need to check if any of the dirfrags
1866  * have out of date accounted_rstat/fragstat.  if so, mark the lock stale.
1867  */
1868 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1869 void CInode::start_scatter(ScatterLock *lock)
1870 {
1871   dout(10) << "start_scatter " << *lock << " on " << *this << dendl;
1872   assert(is_auth());
1873   inode_t *pi = get_projected_inode();
1874
1875   for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1876        p != dirfrags.end();
1877        ++p) {
1878     frag_t fg = p->first;
1879     CDir *dir = p->second;
1880     fnode_t *pf = dir->get_projected_fnode();
1881     dout(20) << fg << " " << *dir << dendl;
1882
1883     if (!dir->is_auth())
1884       continue;
1885
1886     switch (lock->get_type()) {
1887     case CEPH_LOCK_IFILE:
1888       finish_scatter_update(lock, dir, pi->dirstat.version, pf->accounted_fragstat.version);
1889       break;
1890
1891     case CEPH_LOCK_INEST:
1892       finish_scatter_update(lock, dir, pi->rstat.version, pf->accounted_rstat.version);
1893       break;
1894
1895     case CEPH_LOCK_IDFT:
1896       dir->state_clear(CDir::STATE_DIRTYDFT);
1897       break;
1898     }
1899   }
1900 }
1901
1902
1903 class C_Inode_FragUpdate : public MDSLogContextBase {
1904 protected:
1905   CInode *in;
1906   CDir *dir;
1907   MutationRef mut;
1908   MDSRank *get_mds() override {return in->mdcache->mds;}
1909   void finish(int r) override {
1910     in->_finish_frag_update(dir, mut);
1911   }    
1912
1913 public:
1914   C_Inode_FragUpdate(CInode *i, CDir *d, MutationRef& m) : in(i), dir(d), mut(m) {}
1915 };
1916
1917 void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
1918                                    version_t inode_version, version_t dir_accounted_version)
1919 {
1920   frag_t fg = dir->get_frag();
1921   assert(dir->is_auth());
1922
1923   if (dir->is_frozen()) {
1924     dout(10) << "finish_scatter_update " << fg << " frozen, marking " << *lock << " stale " << *dir << dendl;
1925   } else if (dir->get_version() == 0) {
1926     dout(10) << "finish_scatter_update " << fg << " not loaded, marking " << *lock << " stale " << *dir << dendl;
1927   } else {
1928     if (dir_accounted_version != inode_version) {
1929       dout(10) << "finish_scatter_update " << fg << " journaling accounted scatterstat update v" << inode_version << dendl;
1930
1931       MDLog *mdlog = mdcache->mds->mdlog;
1932       MutationRef mut(new MutationImpl());
1933       mut->ls = mdlog->get_current_segment();
1934
1935       inode_t *pi = get_projected_inode();
1936       fnode_t *pf = dir->project_fnode();
1937
1938       const char *ename = 0;
1939       switch (lock->get_type()) {
1940       case CEPH_LOCK_IFILE:
1941         pf->fragstat.version = pi->dirstat.version;
1942         pf->accounted_fragstat = pf->fragstat;
1943         ename = "lock ifile accounted scatter stat update";
1944         break;
1945       case CEPH_LOCK_INEST:
1946         pf->rstat.version = pi->rstat.version;
1947         pf->accounted_rstat = pf->rstat;
1948         ename = "lock inest accounted scatter stat update";
1949
1950         if (!is_auth() && lock->get_state() == LOCK_MIX) {
1951           dout(10) << "finish_scatter_update try to assimilate dirty rstat on " 
1952             << *dir << dendl; 
1953           dir->assimilate_dirty_rstat_inodes();
1954        }
1955
1956         break;
1957       default:
1958         ceph_abort();
1959       }
1960         
1961       pf->version = dir->pre_dirty();
1962       mut->add_projected_fnode(dir);
1963
1964       EUpdate *le = new EUpdate(mdlog, ename);
1965       mdlog->start_entry(le);
1966       le->metablob.add_dir_context(dir);
1967       le->metablob.add_dir(dir, true);
1968       
1969       assert(!dir->is_frozen());
1970       mut->auth_pin(dir);
1971
1972       if (lock->get_type() == CEPH_LOCK_INEST && 
1973           !is_auth() && lock->get_state() == LOCK_MIX) {
1974         dout(10) << "finish_scatter_update finish assimilating dirty rstat on " 
1975           << *dir << dendl; 
1976         dir->assimilate_dirty_rstat_inodes_finish(mut, &le->metablob);
1977
1978         if (!(pf->rstat == pf->accounted_rstat)) {
1979           if (mut->wrlocks.count(&nestlock) == 0) {
1980             mdcache->mds->locker->wrlock_force(&nestlock, mut);
1981           }
1982
1983           mdcache->mds->locker->mark_updated_scatterlock(&nestlock);
1984           mut->ls->dirty_dirfrag_nest.push_back(&item_dirty_dirfrag_nest);
1985         }
1986       }
1987       
1988       mdlog->submit_entry(le, new C_Inode_FragUpdate(this, dir, mut));
1989     } else {
1990       dout(10) << "finish_scatter_update " << fg << " accounted " << *lock
1991                << " scatter stat unchanged at v" << dir_accounted_version << dendl;
1992     }
1993   }
1994 }
1995
1996 void CInode::_finish_frag_update(CDir *dir, MutationRef& mut)
1997 {
1998   dout(10) << "_finish_frag_update on " << *dir << dendl;
1999   mut->apply();
2000   mdcache->mds->locker->drop_locks(mut.get());
2001   mut->cleanup();
2002 }
2003
2004
2005 /*
2006  * when we gather a lock, we need to assimilate dirfrag changes into the inode
2007  * state.  it's possible we can't update the dirfrag accounted_rstat/fragstat
2008  * because the frag is auth and frozen, or that the replica couldn't for the same
2009  * reason.  hopefully it will get updated the next time the lock cycles.
2010  *
2011  * we have two dimensions of behavior:
2012  *  - we may be (auth and !frozen), and able to update, or not.
2013  *  - the frag may be stale, or not.
2014  *
2015  * if the frag is non-stale, we want to assimilate the diff into the
2016  * inode, regardless of whether it's auth or updateable.
2017  *
2018  * if we update the frag, we want to set accounted_fragstat = frag,
2019  * both if we took the diff or it was stale and we are making it
2020  * un-stale.
2021  */
2022 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
2023 void CInode::finish_scatter_gather_update(int type)
2024 {
2025   LogChannelRef clog = mdcache->mds->clog;
2026
2027   dout(10) << "finish_scatter_gather_update " << type << " on " << *this << dendl;
2028   assert(is_auth());
2029
2030   switch (type) {
2031   case CEPH_LOCK_IFILE:
2032     {
2033       fragtree_t tmpdft = dirfragtree;
2034       struct frag_info_t dirstat;
2035       bool dirstat_valid = true;
2036
2037       // adjust summation
2038       assert(is_auth());
2039       inode_t *pi = get_projected_inode();
2040
2041       bool touched_mtime = false, touched_chattr = false;
2042       dout(20) << "  orig dirstat " << pi->dirstat << dendl;
2043       pi->dirstat.version++;
2044       for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2045            p != dirfrags.end();
2046            ++p) {
2047         frag_t fg = p->first;
2048         CDir *dir = p->second;
2049         dout(20) << fg << " " << *dir << dendl;
2050
2051         bool update;
2052         if (dir->get_version() != 0) {
2053           update = dir->is_auth() && !dir->is_frozen();
2054         } else {
2055           update = false;
2056           dirstat_valid = false;
2057         }
2058
2059         fnode_t *pf = dir->get_projected_fnode();
2060         if (update)
2061           pf = dir->project_fnode();
2062
2063         if (pf->accounted_fragstat.version == pi->dirstat.version - 1) {
2064           dout(20) << fg << "           fragstat " << pf->fragstat << dendl;
2065           dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
2066           pi->dirstat.add_delta(pf->fragstat, pf->accounted_fragstat, &touched_mtime, &touched_chattr);
2067         } else {
2068           dout(20) << fg << " skipping STALE accounted_fragstat " << pf->accounted_fragstat << dendl;
2069         }
2070
2071         if (pf->fragstat.nfiles < 0 ||
2072             pf->fragstat.nsubdirs < 0) {
2073           clog->error() << "bad/negative dir size on "
2074               << dir->dirfrag() << " " << pf->fragstat;
2075           assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2076           
2077           if (pf->fragstat.nfiles < 0)
2078             pf->fragstat.nfiles = 0;
2079           if (pf->fragstat.nsubdirs < 0)
2080             pf->fragstat.nsubdirs = 0;
2081         }
2082
2083         if (update) {
2084           pf->accounted_fragstat = pf->fragstat;
2085           pf->fragstat.version = pf->accounted_fragstat.version = pi->dirstat.version;
2086           dout(10) << fg << " updated accounted_fragstat " << pf->fragstat << " on " << *dir << dendl;
2087         }
2088
2089         tmpdft.force_to_leaf(g_ceph_context, fg);
2090         dirstat.add(pf->fragstat);
2091       }
2092       if (touched_mtime)
2093         pi->mtime = pi->ctime = pi->dirstat.mtime;
2094       if (touched_chattr)
2095         pi->change_attr = pi->dirstat.change_attr;
2096       dout(20) << " final dirstat " << pi->dirstat << dendl;
2097
2098       if (dirstat_valid && !dirstat.same_sums(pi->dirstat)) {
2099         list<frag_t> ls;
2100         tmpdft.get_leaves_under(frag_t(), ls);
2101         for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2102           if (!dirfrags.count(*p)) {
2103             dirstat_valid = false;
2104             break;
2105           }
2106         if (dirstat_valid) {
2107           if (state_test(CInode::STATE_REPAIRSTATS)) {
2108             dout(20) << " dirstat mismatch, fixing" << dendl;
2109           } else {
2110             clog->error() << "unmatched fragstat on " << ino() << ", inode has "
2111                           << pi->dirstat << ", dirfrags have " << dirstat;
2112             assert(!"unmatched fragstat" == g_conf->mds_verify_scatter);
2113           }
2114           // trust the dirfrags for now
2115           version_t v = pi->dirstat.version;
2116           if (pi->dirstat.mtime > dirstat.mtime)
2117             dirstat.mtime = pi->dirstat.mtime;
2118           if (pi->dirstat.change_attr > dirstat.change_attr)
2119             dirstat.change_attr = pi->dirstat.change_attr;
2120           pi->dirstat = dirstat;
2121           pi->dirstat.version = v;
2122         }
2123       }
2124
2125       if (pi->dirstat.nfiles < 0 || pi->dirstat.nsubdirs < 0)
2126       {
2127         std::string path;
2128         make_path_string(path);
2129         clog->error() << "Inconsistent statistics detected: fragstat on inode "
2130                       << ino() << " (" << path << "), inode has " << pi->dirstat;
2131         assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2132
2133         if (pi->dirstat.nfiles < 0)
2134           pi->dirstat.nfiles = 0;
2135         if (pi->dirstat.nsubdirs < 0)
2136           pi->dirstat.nsubdirs = 0;
2137       }
2138     }
2139     break;
2140
2141   case CEPH_LOCK_INEST:
2142     {
2143       fragtree_t tmpdft = dirfragtree;
2144       nest_info_t rstat;
2145       rstat.rsubdirs = 1;
2146       bool rstat_valid = true;
2147
2148       // adjust summation
2149       assert(is_auth());
2150       inode_t *pi = get_projected_inode();
2151       dout(20) << "  orig rstat " << pi->rstat << dendl;
2152       pi->rstat.version++;
2153       for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2154            p != dirfrags.end();
2155            ++p) {
2156         frag_t fg = p->first;
2157         CDir *dir = p->second;
2158         dout(20) << fg << " " << *dir << dendl;
2159
2160         bool update;
2161         if (dir->get_version() != 0) {
2162           update = dir->is_auth() && !dir->is_frozen();
2163         } else {
2164           update = false;
2165           rstat_valid = false;
2166         }
2167
2168         fnode_t *pf = dir->get_projected_fnode();
2169         if (update)
2170           pf = dir->project_fnode();
2171
2172         if (pf->accounted_rstat.version == pi->rstat.version-1) {
2173           // only pull this frag's dirty rstat inodes into the frag if
2174           // the frag is non-stale and updateable.  if it's stale,
2175           // that info will just get thrown out!
2176           if (update)
2177             dir->assimilate_dirty_rstat_inodes();
2178
2179           dout(20) << fg << "           rstat " << pf->rstat << dendl;
2180           dout(20) << fg << " accounted_rstat " << pf->accounted_rstat << dendl;
2181           dout(20) << fg << " dirty_old_rstat " << dir->dirty_old_rstat << dendl;
2182           mdcache->project_rstat_frag_to_inode(pf->rstat, pf->accounted_rstat,
2183                                                dir->first, CEPH_NOSNAP, this, true);
2184           for (compact_map<snapid_t,old_rstat_t>::iterator q = dir->dirty_old_rstat.begin();
2185                q != dir->dirty_old_rstat.end();
2186                ++q)
2187             mdcache->project_rstat_frag_to_inode(q->second.rstat, q->second.accounted_rstat,
2188                                                  q->second.first, q->first, this, true);
2189           if (update)  // dir contents not valid if frozen or non-auth
2190             dir->check_rstats();
2191         } else {
2192           dout(20) << fg << " skipping STALE accounted_rstat " << pf->accounted_rstat << dendl;
2193         }
2194         if (update) {
2195           pf->accounted_rstat = pf->rstat;
2196           dir->dirty_old_rstat.clear();
2197           pf->rstat.version = pf->accounted_rstat.version = pi->rstat.version;
2198           dir->check_rstats();
2199           dout(10) << fg << " updated accounted_rstat " << pf->rstat << " on " << *dir << dendl;
2200         }
2201
2202         tmpdft.force_to_leaf(g_ceph_context, fg);
2203         rstat.add(pf->rstat);
2204       }
2205       dout(20) << " final rstat " << pi->rstat << dendl;
2206
2207       if (rstat_valid && !rstat.same_sums(pi->rstat)) {
2208         list<frag_t> ls;
2209         tmpdft.get_leaves_under(frag_t(), ls);
2210         for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2211           if (!dirfrags.count(*p)) {
2212             rstat_valid = false;
2213             break;
2214           }
2215         if (rstat_valid) {
2216           if (state_test(CInode::STATE_REPAIRSTATS)) {
2217             dout(20) << " rstat mismatch, fixing" << dendl;
2218           } else {
2219             clog->error() << "inconsistent rstat on inode " << ino()
2220                           << ", inode has " << pi->rstat
2221                           << ", directory fragments have " << rstat;
2222             assert(!"unmatched rstat" == g_conf->mds_verify_scatter);
2223           }
2224           // trust the dirfrag for now
2225           version_t v = pi->rstat.version;
2226           if (pi->rstat.rctime > rstat.rctime)
2227             rstat.rctime = pi->rstat.rctime;
2228           pi->rstat = rstat;
2229           pi->rstat.version = v;
2230         }
2231       }
2232
2233       mdcache->broadcast_quota_to_client(this);
2234     }
2235     break;
2236
2237   case CEPH_LOCK_IDFT:
2238     break;
2239
2240   default:
2241     ceph_abort();
2242   }
2243 }
2244
2245 void CInode::finish_scatter_gather_update_accounted(int type, MutationRef& mut, EMetaBlob *metablob)
2246 {
2247   dout(10) << "finish_scatter_gather_update_accounted " << type << " on " << *this << dendl;
2248   assert(is_auth());
2249
2250   for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2251        p != dirfrags.end();
2252        ++p) {
2253     CDir *dir = p->second;
2254     if (!dir->is_auth() || dir->get_version() == 0 || dir->is_frozen())
2255       continue;
2256     
2257     if (type == CEPH_LOCK_IDFT)
2258       continue;  // nothing to do.
2259
2260     dout(10) << " journaling updated frag accounted_ on " << *dir << dendl;
2261     assert(dir->is_projected());
2262     fnode_t *pf = dir->get_projected_fnode();
2263     pf->version = dir->pre_dirty();
2264     mut->add_projected_fnode(dir);
2265     metablob->add_dir(dir, true);
2266     mut->auth_pin(dir);
2267
2268     if (type == CEPH_LOCK_INEST)
2269       dir->assimilate_dirty_rstat_inodes_finish(mut, metablob);
2270   }
2271 }
2272
2273 // waiting
2274
2275 bool CInode::is_frozen() const
2276 {
2277   if (is_frozen_inode()) return true;
2278   if (parent && parent->dir->is_frozen()) return true;
2279   return false;
2280 }
2281
2282 bool CInode::is_frozen_dir() const
2283 {
2284   if (parent && parent->dir->is_frozen_dir()) return true;
2285   return false;
2286 }
2287
2288 bool CInode::is_freezing() const
2289 {
2290   if (is_freezing_inode()) return true;
2291   if (parent && parent->dir->is_freezing()) return true;
2292   return false;
2293 }
2294
2295 void CInode::add_dir_waiter(frag_t fg, MDSInternalContextBase *c)
2296 {
2297   if (waiting_on_dir.empty())
2298     get(PIN_DIRWAITER);
2299   waiting_on_dir[fg].push_back(c);
2300   dout(10) << "add_dir_waiter frag " << fg << " " << c << " on " << *this << dendl;
2301 }
2302
2303 void CInode::take_dir_waiting(frag_t fg, list<MDSInternalContextBase*>& ls)
2304 {
2305   if (waiting_on_dir.empty())
2306     return;
2307
2308   compact_map<frag_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dir.find(fg);
2309   if (p != waiting_on_dir.end()) {
2310     dout(10) << "take_dir_waiting frag " << fg << " on " << *this << dendl;
2311     ls.splice(ls.end(), p->second);
2312     waiting_on_dir.erase(p);
2313
2314     if (waiting_on_dir.empty())
2315       put(PIN_DIRWAITER);
2316   }
2317 }
2318
2319 void CInode::add_waiter(uint64_t tag, MDSInternalContextBase *c) 
2320 {
2321   dout(10) << "add_waiter tag " << std::hex << tag << std::dec << " " << c
2322            << " !ambig " << !state_test(STATE_AMBIGUOUSAUTH)
2323            << " !frozen " << !is_frozen_inode()
2324            << " !freezing " << !is_freezing_inode()
2325            << dendl;
2326   // wait on the directory?
2327   //  make sure its not the inode that is explicitly ambiguous|freezing|frozen
2328   if (((tag & WAIT_SINGLEAUTH) && !state_test(STATE_AMBIGUOUSAUTH)) ||
2329       ((tag & WAIT_UNFREEZE) &&
2330        !is_frozen_inode() && !is_freezing_inode() && !is_frozen_auth_pin())) {
2331     dout(15) << "passing waiter up tree" << dendl;
2332     parent->dir->add_waiter(tag, c);
2333     return;
2334   }
2335   dout(15) << "taking waiter here" << dendl;
2336   MDSCacheObject::add_waiter(tag, c);
2337 }
2338
2339 void CInode::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
2340 {
2341   if ((mask & WAIT_DIR) && !waiting_on_dir.empty()) {
2342     // take all dentry waiters
2343     while (!waiting_on_dir.empty()) {
2344       compact_map<frag_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dir.begin();
2345       dout(10) << "take_waiting dirfrag " << p->first << " on " << *this << dendl;
2346       ls.splice(ls.end(), p->second);
2347       waiting_on_dir.erase(p);
2348     }
2349     put(PIN_DIRWAITER);
2350   }
2351
2352   // waiting
2353   MDSCacheObject::take_waiting(mask, ls);
2354 }
2355
2356 bool CInode::freeze_inode(int auth_pin_allowance)
2357 {
2358   assert(auth_pin_allowance > 0);  // otherwise we need to adjust parent's nested_auth_pins
2359   assert(auth_pins >= auth_pin_allowance);
2360   if (auth_pins > auth_pin_allowance) {
2361     dout(10) << "freeze_inode - waiting for auth_pins to drop to " << auth_pin_allowance << dendl;
2362     auth_pin_freeze_allowance = auth_pin_allowance;
2363     get(PIN_FREEZING);
2364     state_set(STATE_FREEZING);
2365     return false;
2366   }
2367
2368   dout(10) << "freeze_inode - frozen" << dendl;
2369   assert(auth_pins == auth_pin_allowance);
2370   if (!state_test(STATE_FROZEN)) {
2371     get(PIN_FROZEN);
2372     state_set(STATE_FROZEN);
2373   }
2374   return true;
2375 }
2376
2377 void CInode::unfreeze_inode(list<MDSInternalContextBase*>& finished) 
2378 {
2379   dout(10) << "unfreeze_inode" << dendl;
2380   if (state_test(STATE_FREEZING)) {
2381     state_clear(STATE_FREEZING);
2382     put(PIN_FREEZING);
2383   } else if (state_test(STATE_FROZEN)) {
2384     state_clear(STATE_FROZEN);
2385     put(PIN_FROZEN);
2386   } else 
2387     ceph_abort();
2388   take_waiting(WAIT_UNFREEZE, finished);
2389 }
2390
2391 void CInode::unfreeze_inode()
2392 {
2393     list<MDSInternalContextBase*> finished;
2394     unfreeze_inode(finished);
2395     mdcache->mds->queue_waiters(finished);
2396 }
2397
2398 void CInode::freeze_auth_pin()
2399 {
2400   assert(state_test(CInode::STATE_FROZEN));
2401   state_set(CInode::STATE_FROZENAUTHPIN);
2402 }
2403
2404 void CInode::unfreeze_auth_pin()
2405 {
2406   assert(state_test(CInode::STATE_FROZENAUTHPIN));
2407   state_clear(CInode::STATE_FROZENAUTHPIN);
2408   if (!state_test(STATE_FREEZING|STATE_FROZEN)) {
2409     list<MDSInternalContextBase*> finished;
2410     take_waiting(WAIT_UNFREEZE, finished);
2411     mdcache->mds->queue_waiters(finished);
2412   }
2413 }
2414
2415 void CInode::clear_ambiguous_auth(list<MDSInternalContextBase*>& finished)
2416 {
2417   assert(state_test(CInode::STATE_AMBIGUOUSAUTH));
2418   state_clear(CInode::STATE_AMBIGUOUSAUTH);
2419   take_waiting(CInode::WAIT_SINGLEAUTH, finished);
2420 }
2421
2422 void CInode::clear_ambiguous_auth()
2423 {
2424   list<MDSInternalContextBase*> finished;
2425   clear_ambiguous_auth(finished);
2426   mdcache->mds->queue_waiters(finished);
2427 }
2428
2429 // auth_pins
2430 bool CInode::can_auth_pin() const {
2431   if (!is_auth() || is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin())
2432     return false;
2433   if (parent)
2434     return parent->can_auth_pin();
2435   return true;
2436 }
2437
2438 void CInode::auth_pin(void *by) 
2439 {
2440   if (auth_pins == 0)
2441     get(PIN_AUTHPIN);
2442   auth_pins++;
2443
2444 #ifdef MDS_AUTHPIN_SET
2445   auth_pin_set.insert(by);
2446 #endif
2447
2448   dout(10) << "auth_pin by " << by << " on " << *this
2449            << " now " << auth_pins << "+" << nested_auth_pins
2450            << dendl;
2451   
2452   if (parent)
2453     parent->adjust_nested_auth_pins(1, 1, this);
2454 }
2455
2456 void CInode::auth_unpin(void *by) 
2457 {
2458   auth_pins--;
2459
2460 #ifdef MDS_AUTHPIN_SET
2461   assert(auth_pin_set.count(by));
2462   auth_pin_set.erase(auth_pin_set.find(by));
2463 #endif
2464
2465   if (auth_pins == 0)
2466     put(PIN_AUTHPIN);
2467   
2468   dout(10) << "auth_unpin by " << by << " on " << *this
2469            << " now " << auth_pins << "+" << nested_auth_pins
2470            << dendl;
2471   
2472   assert(auth_pins >= 0);
2473
2474   if (parent)
2475     parent->adjust_nested_auth_pins(-1, -1, by);
2476
2477   if (is_freezing_inode() &&
2478       auth_pins == auth_pin_freeze_allowance) {
2479     dout(10) << "auth_unpin freezing!" << dendl;
2480     get(PIN_FROZEN);
2481     put(PIN_FREEZING);
2482     state_clear(STATE_FREEZING);
2483     state_set(STATE_FROZEN);
2484     finish_waiting(WAIT_FROZEN);
2485   }  
2486 }
2487
2488 void CInode::adjust_nested_auth_pins(int a, void *by)
2489 {
2490   assert(a);
2491   nested_auth_pins += a;
2492   dout(35) << "adjust_nested_auth_pins by " << by
2493            << " change " << a << " yields "
2494            << auth_pins << "+" << nested_auth_pins << dendl;
2495   assert(nested_auth_pins >= 0);
2496
2497   if (g_conf->mds_debug_auth_pins) {
2498     // audit
2499     int s = 0;
2500     for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2501          p != dirfrags.end();
2502          ++p) {
2503       CDir *dir = p->second;
2504       if (!dir->is_subtree_root() && dir->get_cum_auth_pins())
2505         s++;
2506     } 
2507     assert(s == nested_auth_pins);
2508   } 
2509
2510   if (parent)
2511     parent->adjust_nested_auth_pins(a, 0, by);
2512 }
2513
2514
2515 // authority
2516
2517 mds_authority_t CInode::authority() const
2518 {
2519   if (inode_auth.first >= 0) 
2520     return inode_auth;
2521
2522   if (parent)
2523     return parent->dir->authority();
2524
2525   // new items that are not yet linked in (in the committed plane) belong
2526   // to their first parent.
2527   if (!projected_parent.empty())
2528     return projected_parent.front()->dir->authority();
2529
2530   return CDIR_AUTH_UNDEF;
2531 }
2532
2533
2534 // SNAP
2535
2536 snapid_t CInode::get_oldest_snap()
2537 {
2538   snapid_t t = first;
2539   if (!old_inodes.empty())
2540     t = old_inodes.begin()->second.first;
2541   return MIN(t, oldest_snap);
2542 }
2543
2544 old_inode_t& CInode::cow_old_inode(snapid_t follows, bool cow_head)
2545 {
2546   assert(follows >= first);
2547
2548   inode_t *pi = cow_head ? get_projected_inode() : get_previous_projected_inode();
2549   map<string,bufferptr> *px = cow_head ? get_projected_xattrs() : get_previous_projected_xattrs();
2550
2551   old_inode_t &old = old_inodes[follows];
2552   old.first = first;
2553   old.inode = *pi;
2554   old.xattrs = *px;
2555
2556   if (first < oldest_snap)
2557     oldest_snap = first;
2558   
2559   dout(10) << " " << px->size() << " xattrs cowed, " << *px << dendl;
2560
2561   old.inode.trim_client_ranges(follows);
2562
2563   if (g_conf->mds_snap_rstat &&
2564       !(old.inode.rstat == old.inode.accounted_rstat))
2565     dirty_old_rstats.insert(follows);
2566   
2567   first = follows+1;
2568
2569   dout(10) << "cow_old_inode " << (cow_head ? "head" : "previous_head" )
2570            << " to [" << old.first << "," << follows << "] on "
2571            << *this << dendl;
2572
2573   return old;
2574 }
2575
2576 void CInode::split_old_inode(snapid_t snap)
2577 {
2578   compact_map<snapid_t, old_inode_t>::iterator p = old_inodes.lower_bound(snap);
2579   assert(p != old_inodes.end() && p->second.first < snap);
2580
2581   old_inode_t &old = old_inodes[snap - 1];
2582   old = p->second;
2583
2584   p->second.first = snap;
2585   dout(10) << "split_old_inode " << "[" << old.first << "," << p->first
2586            << "] to [" << snap << "," << p->first << "] on " << *this << dendl;
2587 }
2588
2589 void CInode::pre_cow_old_inode()
2590 {
2591   snapid_t follows = find_snaprealm()->get_newest_seq();
2592   if (first <= follows)
2593     cow_old_inode(follows, true);
2594 }
2595
2596 void CInode::purge_stale_snap_data(const set<snapid_t>& snaps)
2597 {
2598   dout(10) << "purge_stale_snap_data " << snaps << dendl;
2599
2600   if (old_inodes.empty())
2601     return;
2602
2603   compact_map<snapid_t,old_inode_t>::iterator p = old_inodes.begin();
2604   while (p != old_inodes.end()) {
2605     set<snapid_t>::const_iterator q = snaps.lower_bound(p->second.first);
2606     if (q == snaps.end() || *q > p->first) {
2607       dout(10) << " purging old_inode [" << p->second.first << "," << p->first << "]" << dendl;
2608       old_inodes.erase(p++);
2609     } else
2610       ++p;
2611   }
2612 }
2613
2614 /*
2615  * pick/create an old_inode
2616  */
2617 old_inode_t * CInode::pick_old_inode(snapid_t snap)
2618 {
2619   compact_map<snapid_t, old_inode_t>::iterator p = old_inodes.lower_bound(snap);  // p is first key >= to snap
2620   if (p != old_inodes.end() && p->second.first <= snap) {
2621     dout(10) << "pick_old_inode snap " << snap << " -> [" << p->second.first << "," << p->first << "]" << dendl;
2622     return &p->second;
2623   }
2624   dout(10) << "pick_old_inode snap " << snap << " -> nothing" << dendl;
2625   return NULL;
2626 }
2627
2628 void CInode::open_snaprealm(bool nosplit)
2629 {
2630   if (!snaprealm) {
2631     SnapRealm *parent = find_snaprealm();
2632     snaprealm = new SnapRealm(mdcache, this);
2633     if (parent) {
2634       dout(10) << "open_snaprealm " << snaprealm
2635                << " parent is " << parent
2636                << dendl;
2637       dout(30) << " siblings are " << parent->open_children << dendl;
2638       snaprealm->parent = parent;
2639       if (!nosplit)
2640         parent->split_at(snaprealm);
2641       parent->open_children.insert(snaprealm);
2642     }
2643   }
2644 }
2645 void CInode::close_snaprealm(bool nojoin)
2646 {
2647   if (snaprealm) {
2648     dout(15) << "close_snaprealm " << *snaprealm << dendl;
2649     snaprealm->close_parents();
2650     if (snaprealm->parent) {
2651       snaprealm->parent->open_children.erase(snaprealm);
2652       //if (!nojoin)
2653       //snaprealm->parent->join(snaprealm);
2654     }
2655     delete snaprealm;
2656     snaprealm = 0;
2657   }
2658 }
2659
2660 SnapRealm *CInode::find_snaprealm() const
2661 {
2662   const CInode *cur = this;
2663   while (!cur->snaprealm) {
2664     if (cur->get_parent_dn())
2665       cur = cur->get_parent_dn()->get_dir()->get_inode();
2666     else if (get_projected_parent_dn())
2667       cur = cur->get_projected_parent_dn()->get_dir()->get_inode();
2668     else
2669       break;
2670   }
2671   return cur->snaprealm;
2672 }
2673
2674 void CInode::encode_snap_blob(bufferlist &snapbl)
2675 {
2676   if (snaprealm) {
2677     ::encode(snaprealm->srnode, snapbl);
2678     dout(20) << "encode_snap_blob " << *snaprealm << dendl;
2679   }
2680 }
2681 void CInode::decode_snap_blob(bufferlist& snapbl)
2682 {
2683   if (snapbl.length()) {
2684     open_snaprealm();
2685     bufferlist::iterator p = snapbl.begin();
2686     ::decode(snaprealm->srnode, p);
2687     if (is_base()) {
2688       bool ok = snaprealm->_open_parents(NULL);
2689       assert(ok);
2690     }
2691     dout(20) << "decode_snap_blob " << *snaprealm << dendl;
2692   }
2693 }
2694
2695 void CInode::encode_snap(bufferlist& bl)
2696 {
2697   bufferlist snapbl;
2698   encode_snap_blob(snapbl);
2699   ::encode(snapbl, bl);
2700   ::encode(oldest_snap, bl);
2701 }    
2702
2703 void CInode::decode_snap(bufferlist::iterator& p)
2704 {
2705   bufferlist snapbl;
2706   ::decode(snapbl, p);
2707   ::decode(oldest_snap, p);
2708   decode_snap_blob(snapbl);
2709 }
2710
2711 // =============================================
2712
2713 client_t CInode::calc_ideal_loner()
2714 {
2715   if (mdcache->is_readonly())
2716     return -1;
2717   if (!mds_caps_wanted.empty())
2718     return -1;
2719   
2720   int n = 0;
2721   client_t loner = -1;
2722   for (map<client_t,Capability*>::iterator it = client_caps.begin();
2723        it != client_caps.end();
2724        ++it) 
2725     if (!it->second->is_stale() &&
2726         ((it->second->wanted() & (CEPH_CAP_ANY_WR|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_RD)) ||
2727          (inode.is_dir() && !has_subtree_root_dirfrag()))) {
2728       if (n)
2729         return -1;
2730       n++;
2731       loner = it->first;
2732     }
2733   return loner;
2734 }
2735
2736 client_t CInode::choose_ideal_loner()
2737 {
2738   want_loner_cap = calc_ideal_loner();
2739   return want_loner_cap;
2740 }
2741
2742 bool CInode::try_set_loner()
2743 {
2744   assert(want_loner_cap >= 0);
2745   if (loner_cap >= 0 && loner_cap != want_loner_cap)
2746     return false;
2747   set_loner_cap(want_loner_cap);
2748   return true;
2749 }
2750
2751 void CInode::set_loner_cap(client_t l)
2752 {
2753   loner_cap = l;
2754   authlock.set_excl_client(loner_cap);
2755   filelock.set_excl_client(loner_cap);
2756   linklock.set_excl_client(loner_cap);
2757   xattrlock.set_excl_client(loner_cap);
2758 }
2759
2760 bool CInode::try_drop_loner()
2761 {
2762   if (loner_cap < 0)
2763     return true;
2764
2765   int other_allowed = get_caps_allowed_by_type(CAP_ANY);
2766   Capability *cap = get_client_cap(loner_cap);
2767   if (!cap ||
2768       (cap->issued() & ~other_allowed) == 0) {
2769     set_loner_cap(-1);
2770     return true;
2771   }
2772   return false;
2773 }
2774
2775
2776 // choose new lock state during recovery, based on issued caps
2777 void CInode::choose_lock_state(SimpleLock *lock, int allissued)
2778 {
2779   int shift = lock->get_cap_shift();
2780   int issued = (allissued >> shift) & lock->get_cap_mask();
2781   if (is_auth()) {
2782     if (lock->is_xlocked()) {
2783       // do nothing here
2784     } else if (lock->get_state() != LOCK_MIX) {
2785       if (issued & (CEPH_CAP_GEXCL | CEPH_CAP_GBUFFER))
2786         lock->set_state(LOCK_EXCL);
2787       else if (issued & CEPH_CAP_GWR)
2788         lock->set_state(LOCK_MIX);
2789       else if (lock->is_dirty()) {
2790         if (is_replicated())
2791           lock->set_state(LOCK_MIX);
2792         else
2793           lock->set_state(LOCK_LOCK);
2794       } else
2795         lock->set_state(LOCK_SYNC);
2796     }
2797   } else {
2798     // our states have already been chosen during rejoin.
2799     if (lock->is_xlocked())
2800       assert(lock->get_state() == LOCK_LOCK);
2801   }
2802 }
2803  
2804 void CInode::choose_lock_states(int dirty_caps)
2805 {
2806   int issued = get_caps_issued() | dirty_caps;
2807   if (is_auth() && (issued & (CEPH_CAP_ANY_EXCL|CEPH_CAP_ANY_WR)) &&
2808       choose_ideal_loner() >= 0)
2809     try_set_loner();
2810   choose_lock_state(&filelock, issued);
2811   choose_lock_state(&nestlock, issued);
2812   choose_lock_state(&dirfragtreelock, issued);
2813   choose_lock_state(&authlock, issued);
2814   choose_lock_state(&xattrlock, issued);
2815   choose_lock_state(&linklock, issued);
2816 }
2817
2818 Capability *CInode::add_client_cap(client_t client, Session *session, SnapRealm *conrealm)
2819 {
2820   if (client_caps.empty()) {
2821     get(PIN_CAPS);
2822     if (conrealm)
2823       containing_realm = conrealm;
2824     else
2825       containing_realm = find_snaprealm();
2826     containing_realm->inodes_with_caps.push_back(&item_caps);
2827     dout(10) << "add_client_cap first cap, joining realm " << *containing_realm << dendl;
2828   }
2829
2830   if (client_caps.empty())
2831     mdcache->num_inodes_with_caps++;
2832   
2833   Capability *cap = new Capability(this, ++mdcache->last_cap_id, client);
2834   assert(client_caps.count(client) == 0);
2835   client_caps[client] = cap;
2836
2837   session->add_cap(cap);
2838   if (session->is_stale())
2839     cap->mark_stale();
2840   
2841   cap->client_follows = first-1;
2842   
2843   containing_realm->add_cap(client, cap);
2844   
2845   return cap;
2846 }
2847
2848 void CInode::remove_client_cap(client_t client)
2849 {
2850   assert(client_caps.count(client) == 1);
2851   Capability *cap = client_caps[client];
2852   
2853   cap->item_session_caps.remove_myself();
2854   cap->item_revoking_caps.remove_myself();
2855   cap->item_client_revoking_caps.remove_myself();
2856   containing_realm->remove_cap(client, cap);
2857   
2858   if (client == loner_cap)
2859     loner_cap = -1;
2860
2861   delete cap;
2862   client_caps.erase(client);
2863   if (client_caps.empty()) {
2864     dout(10) << "remove_client_cap last cap, leaving realm " << *containing_realm << dendl;
2865     put(PIN_CAPS);
2866     item_caps.remove_myself();
2867     containing_realm = NULL;
2868     item_open_file.remove_myself();  // unpin logsegment
2869     mdcache->num_inodes_with_caps--;
2870   }
2871
2872   //clean up advisory locks
2873   bool fcntl_removed = fcntl_locks ? fcntl_locks->remove_all_from(client) : false;
2874   bool flock_removed = flock_locks ? flock_locks->remove_all_from(client) : false; 
2875   if (fcntl_removed || flock_removed) {
2876     list<MDSInternalContextBase*> waiters;
2877     take_waiting(CInode::WAIT_FLOCK, waiters);
2878     mdcache->mds->queue_waiters(waiters);
2879   }
2880 }
2881
2882 void CInode::move_to_realm(SnapRealm *realm)
2883 {
2884   dout(10) << "move_to_realm joining realm " << *realm
2885            << ", leaving realm " << *containing_realm << dendl;
2886   for (map<client_t,Capability*>::iterator q = client_caps.begin();
2887        q != client_caps.end();
2888        ++q) {
2889     containing_realm->remove_cap(q->first, q->second);
2890     realm->add_cap(q->first, q->second);
2891   }
2892   item_caps.remove_myself();
2893   realm->inodes_with_caps.push_back(&item_caps);
2894   containing_realm = realm;
2895 }
2896
2897 Capability *CInode::reconnect_cap(client_t client, const cap_reconnect_t& icr, Session *session)
2898 {
2899   Capability *cap = get_client_cap(client);
2900   if (cap) {
2901     // FIXME?
2902     cap->merge(icr.capinfo.wanted, icr.capinfo.issued);
2903   } else {
2904     cap = add_client_cap(client, session);
2905     cap->set_cap_id(icr.capinfo.cap_id);
2906     cap->set_wanted(icr.capinfo.wanted);
2907     cap->issue_norevoke(icr.capinfo.issued);
2908     cap->reset_seq();
2909   }
2910   cap->set_last_issue_stamp(ceph_clock_now());
2911   return cap;
2912 }
2913
2914 void CInode::clear_client_caps_after_export()
2915 {
2916   while (!client_caps.empty())
2917     remove_client_cap(client_caps.begin()->first);
2918   loner_cap = -1;
2919   want_loner_cap = -1;
2920   mds_caps_wanted.clear();
2921 }
2922
2923 void CInode::export_client_caps(map<client_t,Capability::Export>& cl)
2924 {
2925   for (map<client_t,Capability*>::iterator it = client_caps.begin();
2926        it != client_caps.end();
2927        ++it) {
2928     cl[it->first] = it->second->make_export();
2929   }
2930 }
2931
2932   // caps allowed
2933 int CInode::get_caps_liked() const
2934 {
2935   if (is_dir())
2936     return CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED;  // but not, say, FILE_RD|WR|WRBUFFER
2937   else
2938     return CEPH_CAP_ANY & ~CEPH_CAP_FILE_LAZYIO;
2939 }
2940
2941 int CInode::get_caps_allowed_ever() const
2942 {
2943   int allowed;
2944   if (is_dir())
2945     allowed = CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED;
2946   else
2947     allowed = CEPH_CAP_ANY;
2948   return allowed & 
2949     (CEPH_CAP_PIN |
2950      (filelock.gcaps_allowed_ever() << filelock.get_cap_shift()) |
2951      (authlock.gcaps_allowed_ever() << authlock.get_cap_shift()) |
2952      (xattrlock.gcaps_allowed_ever() << xattrlock.get_cap_shift()) |
2953      (linklock.gcaps_allowed_ever() << linklock.get_cap_shift()));
2954 }
2955
2956 int CInode::get_caps_allowed_by_type(int type) const
2957 {
2958   return 
2959     CEPH_CAP_PIN |
2960     (filelock.gcaps_allowed(type) << filelock.get_cap_shift()) |
2961     (authlock.gcaps_allowed(type) << authlock.get_cap_shift()) |
2962     (xattrlock.gcaps_allowed(type) << xattrlock.get_cap_shift()) |
2963     (linklock.gcaps_allowed(type) << linklock.get_cap_shift());
2964 }
2965
2966 int CInode::get_caps_careful() const
2967 {
2968   return 
2969     (filelock.gcaps_careful() << filelock.get_cap_shift()) |
2970     (authlock.gcaps_careful() << authlock.get_cap_shift()) |
2971     (xattrlock.gcaps_careful() << xattrlock.get_cap_shift()) |
2972     (linklock.gcaps_careful() << linklock.get_cap_shift());
2973 }
2974
2975 int CInode::get_xlocker_mask(client_t client) const
2976 {
2977   return 
2978     (filelock.gcaps_xlocker_mask(client) << filelock.get_cap_shift()) |
2979     (authlock.gcaps_xlocker_mask(client) << authlock.get_cap_shift()) |
2980     (xattrlock.gcaps_xlocker_mask(client) << xattrlock.get_cap_shift()) |
2981     (linklock.gcaps_xlocker_mask(client) << linklock.get_cap_shift());
2982 }
2983
2984 int CInode::get_caps_allowed_for_client(Session *session, inode_t *file_i) const
2985 {
2986   client_t client = session->info.inst.name.num();
2987   int allowed;
2988   if (client == get_loner()) {
2989     // as the loner, we get the loner_caps AND any xlocker_caps for things we have xlocked
2990     allowed =
2991       get_caps_allowed_by_type(CAP_LONER) |
2992       (get_caps_allowed_by_type(CAP_XLOCKER) & get_xlocker_mask(client));
2993   } else {
2994     allowed = get_caps_allowed_by_type(CAP_ANY);
2995   }
2996
2997   if (!is_dir()) {
2998     if ((file_i->inline_data.version != CEPH_INLINE_NONE &&
2999          !session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) ||
3000         (!file_i->layout.pool_ns.empty() &&
3001          !session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)))
3002       allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
3003   }
3004   return allowed;
3005 }
3006
3007 // caps issued, wanted
3008 int CInode::get_caps_issued(int *ploner, int *pother, int *pxlocker,
3009                             int shift, int mask)
3010 {
3011   int c = 0;
3012   int loner = 0, other = 0, xlocker = 0;
3013   if (!is_auth()) {
3014     loner_cap = -1;
3015   }
3016
3017   for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3018        it != client_caps.end();
3019        ++it) {
3020     int i = it->second->issued();
3021     c |= i;
3022     if (it->first == loner_cap)
3023       loner |= i;
3024     else
3025       other |= i;
3026     xlocker |= get_xlocker_mask(it->first) & i;
3027   }
3028   if (ploner) *ploner = (loner >> shift) & mask;
3029   if (pother) *pother = (other >> shift) & mask;
3030   if (pxlocker) *pxlocker = (xlocker >> shift) & mask;
3031   return (c >> shift) & mask;
3032 }
3033
3034 bool CInode::is_any_caps_wanted() const
3035 {
3036   for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3037        it != client_caps.end();
3038        ++it)
3039     if (it->second->wanted())
3040       return true;
3041   return false;
3042 }
3043
3044 int CInode::get_caps_wanted(int *ploner, int *pother, int shift, int mask) const
3045 {
3046   int w = 0;
3047   int loner = 0, other = 0;
3048   for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3049        it != client_caps.end();
3050        ++it) {
3051     if (!it->second->is_stale()) {
3052       int t = it->second->wanted();
3053       w |= t;
3054       if (it->first == loner_cap)
3055         loner |= t;
3056       else
3057         other |= t;     
3058     }
3059     //cout << " get_caps_wanted client " << it->first << " " << cap_string(it->second.wanted()) << endl;
3060   }
3061   if (is_auth())
3062     for (compact_map<int,int>::const_iterator it = mds_caps_wanted.begin();
3063          it != mds_caps_wanted.end();
3064          ++it) {
3065       w |= it->second;
3066       other |= it->second;
3067       //cout << " get_caps_wanted mds " << it->first << " " << cap_string(it->second) << endl;
3068     }
3069   if (ploner) *ploner = (loner >> shift) & mask;
3070   if (pother) *pother = (other >> shift) & mask;
3071   return (w >> shift) & mask;
3072 }
3073
3074 bool CInode::issued_caps_need_gather(SimpleLock *lock)
3075 {
3076   int loner_issued, other_issued, xlocker_issued;
3077   get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
3078                   lock->get_cap_shift(), lock->get_cap_mask());
3079   if ((loner_issued & ~lock->gcaps_allowed(CAP_LONER)) ||
3080       (other_issued & ~lock->gcaps_allowed(CAP_ANY)) ||
3081       (xlocker_issued & ~lock->gcaps_allowed(CAP_XLOCKER)))
3082     return true;
3083   return false;
3084 }
3085
3086 void CInode::replicate_relax_locks()
3087 {
3088   //dout(10) << " relaxing locks on " << *this << dendl;
3089   assert(is_auth());
3090   assert(!is_replicated());
3091   
3092   authlock.replicate_relax();
3093   linklock.replicate_relax();
3094   dirfragtreelock.replicate_relax();
3095   filelock.replicate_relax();
3096   xattrlock.replicate_relax();
3097   snaplock.replicate_relax();
3098   nestlock.replicate_relax();
3099   flocklock.replicate_relax();
3100   policylock.replicate_relax();
3101 }
3102
3103
3104
3105 // =============================================
3106
3107 int CInode::encode_inodestat(bufferlist& bl, Session *session,
3108                              SnapRealm *dir_realm,
3109                              snapid_t snapid,
3110                              unsigned max_bytes,
3111                              int getattr_caps)
3112 {
3113   client_t client = session->info.inst.name.num();
3114   assert(snapid);
3115   assert(session->connection);
3116   
3117   bool valid = true;
3118
3119   // pick a version!
3120   inode_t *oi = &inode;
3121   inode_t *pi = get_projected_inode();
3122
3123   map<string, bufferptr> *pxattrs = 0;
3124
3125   if (snapid != CEPH_NOSNAP) {
3126
3127     // for now at least, old_inodes is only defined/valid on the auth
3128     if (!is_auth())
3129       valid = false;
3130
3131     if (is_multiversion()) {
3132       compact_map<snapid_t,old_inode_t>::iterator p = old_inodes.lower_bound(snapid);
3133       if (p != old_inodes.end()) {
3134         if (p->second.first > snapid) {
3135           if  (p != old_inodes.begin())
3136             --p;
3137         }
3138         if (p->second.first <= snapid && snapid <= p->first) {
3139           dout(15) << "encode_inodestat snapid " << snapid
3140                    << " to old_inode [" << p->second.first << "," << p->first << "]"
3141                    << " " << p->second.inode.rstat
3142                    << dendl;
3143           pi = oi = &p->second.inode;
3144           pxattrs = &p->second.xattrs;
3145         } else {
3146           // snapshoted remote dentry can result this
3147           dout(0) << "encode_inodestat old_inode for snapid " << snapid
3148                   << " not found" << dendl;
3149         }
3150       }
3151     } else if (snapid < first || snapid > last) {
3152       // snapshoted remote dentry can result this
3153       dout(0) << "encode_inodestat [" << first << "," << last << "]"
3154               << " not match snapid " << snapid << dendl;
3155     }
3156   }
3157
3158   SnapRealm *realm = find_snaprealm();
3159
3160   bool no_caps = !valid ||
3161                  session->is_stale() ||
3162                  (dir_realm && realm != dir_realm) ||
3163                  is_frozen() ||
3164                  state_test(CInode::STATE_EXPORTINGCAPS);
3165   if (no_caps)
3166     dout(20) << "encode_inodestat no caps"
3167              << (!valid?", !valid":"")
3168              << (session->is_stale()?", session stale ":"")
3169              << ((dir_realm && realm != dir_realm)?", snaprealm differs ":"")
3170              << (is_frozen()?", frozen inode":"")
3171              << (state_test(CInode::STATE_EXPORTINGCAPS)?", exporting caps":"")
3172              << dendl;
3173
3174   
3175   // "fake" a version that is old (stable) version, +1 if projected.
3176   version_t version = (oi->version * 2) + is_projected();
3177
3178   Capability *cap = get_client_cap(client);
3179   bool pfile = filelock.is_xlocked_by_client(client) || get_loner() == client;
3180   //(cap && (cap->issued() & CEPH_CAP_FILE_EXCL));
3181   bool pauth = authlock.is_xlocked_by_client(client) || get_loner() == client;
3182   bool plink = linklock.is_xlocked_by_client(client) || get_loner() == client;
3183   bool pxattr = xattrlock.is_xlocked_by_client(client) || get_loner() == client;
3184
3185   bool plocal = versionlock.get_last_wrlock_client() == client;
3186   bool ppolicy = policylock.is_xlocked_by_client(client) || get_loner()==client;
3187   
3188   inode_t *any_i = (pfile|pauth|plink|pxattr|plocal) ? pi : oi;
3189   
3190   dout(20) << " pfile " << pfile << " pauth " << pauth
3191            << " plink " << plink << " pxattr " << pxattr
3192            << " plocal " << plocal
3193            << " ctime " << any_i->ctime
3194            << " valid=" << valid << dendl;
3195
3196   // file
3197   inode_t *file_i = pfile ? pi:oi;
3198   file_layout_t layout;
3199   if (is_dir()) {
3200     layout = (ppolicy ? pi : oi)->layout;
3201   } else {
3202     layout = file_i->layout;
3203   }
3204
3205   // max_size is min of projected, actual
3206   uint64_t max_size =
3207     MIN(oi->client_ranges.count(client) ?
3208         oi->client_ranges[client].range.last : 0,
3209         pi->client_ranges.count(client) ?
3210         pi->client_ranges[client].range.last : 0);
3211
3212   // inline data
3213   version_t inline_version = 0;
3214   bufferlist inline_data;
3215   if (file_i->inline_data.version == CEPH_INLINE_NONE) {
3216     inline_version = CEPH_INLINE_NONE;
3217   } else if ((!cap && !no_caps) ||
3218              (cap && cap->client_inline_version < file_i->inline_data.version) ||
3219              (getattr_caps & CEPH_CAP_FILE_RD)) { // client requests inline data
3220     inline_version = file_i->inline_data.version;
3221     if (file_i->inline_data.length() > 0)
3222       inline_data = file_i->inline_data.get_data();
3223   }
3224
3225   // nest (do same as file... :/)
3226   if (cap) {
3227     cap->last_rbytes = file_i->rstat.rbytes;
3228     cap->last_rsize = file_i->rstat.rsize();
3229   }
3230
3231   // auth
3232   inode_t *auth_i = pauth ? pi:oi;
3233
3234   // link
3235   inode_t *link_i = plink ? pi:oi;
3236   
3237   // xattr
3238   inode_t *xattr_i = pxattr ? pi:oi;
3239
3240   // xattr
3241   bufferlist xbl;
3242   version_t xattr_version;
3243   if ((!cap && !no_caps) ||
3244       (cap && cap->client_xattr_version < xattr_i->xattr_version) ||
3245       (getattr_caps & CEPH_CAP_XATTR_SHARED)) { // client requests xattrs
3246     if (!pxattrs)
3247       pxattrs = pxattr ? get_projected_xattrs() : &xattrs;
3248     ::encode(*pxattrs, xbl);
3249     xattr_version = xattr_i->xattr_version;
3250   } else {
3251     xattr_version = 0;
3252   }
3253   
3254   // do we have room?
3255   if (max_bytes) {
3256     unsigned bytes = 8 + 8 + 4 + 8 + 8 + sizeof(ceph_mds_reply_cap) +
3257       sizeof(struct ceph_file_layout) + 4 + layout.pool_ns.size() +
3258       sizeof(struct ceph_timespec) * 3 +
3259       4 + 8 + 8 + 8 + 4 + 4 + 4 + 4 + 4 +
3260       8 + 8 + 8 + 8 + 8 + sizeof(struct ceph_timespec) +
3261       4;
3262     bytes += sizeof(__u32);
3263     bytes += (sizeof(__u32) + sizeof(__u32)) * dirfragtree._splits.size();
3264     bytes += sizeof(__u32) + symlink.length();
3265     bytes += sizeof(__u32) + xbl.length();
3266     bytes += sizeof(version_t) + sizeof(__u32) + inline_data.length();
3267     if (bytes > max_bytes)
3268       return -ENOSPC;
3269   }
3270
3271
3272   // encode caps
3273   struct ceph_mds_reply_cap ecap;
3274   if (snapid != CEPH_NOSNAP) {
3275     /*
3276      * snapped inodes (files or dirs) only get read-only caps.  always
3277      * issue everything possible, since it is read only.
3278      *
3279      * if a snapped inode has caps, limit issued caps based on the
3280      * lock state.
3281      *
3282      * if it is a live inode, limit issued caps based on the lock
3283      * state.
3284      *
3285      * do NOT adjust cap issued state, because the client always
3286      * tracks caps per-snap and the mds does either per-interval or
3287      * multiversion.
3288      */
3289     ecap.caps = valid ? get_caps_allowed_by_type(CAP_ANY) : CEPH_STAT_CAP_INODE;
3290     if (last == CEPH_NOSNAP || is_any_caps())
3291       ecap.caps = ecap.caps & get_caps_allowed_for_client(session, file_i);
3292     ecap.seq = 0;
3293     ecap.mseq = 0;
3294     ecap.realm = 0;
3295   } else {
3296     if (!no_caps && !cap) {
3297       // add a new cap
3298       cap = add_client_cap(client, session, realm);
3299       if (is_auth()) {
3300         if (choose_ideal_loner() >= 0)
3301           try_set_loner();
3302         else if (get_wanted_loner() < 0)
3303           try_drop_loner();
3304       }
3305     }
3306
3307     int issue = 0;
3308     if (!no_caps && cap) {
3309       int likes = get_caps_liked();
3310       int allowed = get_caps_allowed_for_client(session, file_i);
3311       issue = (cap->wanted() | likes) & allowed;
3312       cap->issue_norevoke(issue);
3313       issue = cap->pending();
3314       dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3315                << " seq " << cap->get_last_seq() << dendl;
3316     } else if (cap && cap->is_new() && !dir_realm) {
3317       // alway issue new caps to client, otherwise the caps get lost
3318       assert(cap->is_stale());
3319       issue = cap->pending() | CEPH_CAP_PIN;
3320       cap->issue_norevoke(issue);
3321       dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3322                << " seq " << cap->get_last_seq()
3323                << "(stale|new caps)" << dendl;
3324     }
3325
3326     if (issue) {
3327       cap->set_last_issue();
3328       cap->set_last_issue_stamp(ceph_clock_now());
3329       cap->clear_new();
3330       ecap.caps = issue;
3331       ecap.wanted = cap->wanted();
3332       ecap.cap_id = cap->get_cap_id();
3333       ecap.seq = cap->get_last_seq();
3334       ecap.mseq = cap->get_mseq();
3335       ecap.realm = realm->inode->ino();
3336     } else {
3337       ecap.cap_id = 0;
3338       ecap.caps = 0;
3339       ecap.seq = 0;
3340       ecap.mseq = 0;
3341       ecap.realm = 0;
3342       ecap.wanted = 0;
3343     }
3344   }
3345   ecap.flags = is_auth() ? CEPH_CAP_FLAG_AUTH : 0;
3346   dout(10) << "encode_inodestat caps " << ccap_string(ecap.caps)
3347            << " seq " << ecap.seq << " mseq " << ecap.mseq
3348            << " xattrv " << xattr_version << " len " << xbl.length()
3349            << dendl;
3350
3351   if (inline_data.length() && cap) {
3352     if ((cap->pending() | getattr_caps) & CEPH_CAP_FILE_SHARED) {
3353       dout(10) << "including inline version " << inline_version << dendl;
3354       cap->client_inline_version = inline_version;
3355     } else {
3356       dout(10) << "dropping inline version " << inline_version << dendl;
3357       inline_version = 0;
3358       inline_data.clear();
3359     }
3360   }
3361
3362   // include those xattrs?
3363   if (xbl.length() && cap) {
3364     if ((cap->pending() | getattr_caps) & CEPH_CAP_XATTR_SHARED) {
3365       dout(10) << "including xattrs version " << xattr_i->xattr_version << dendl;
3366       cap->client_xattr_version = xattr_i->xattr_version;
3367     } else {
3368       dout(10) << "dropping xattrs version " << xattr_i->xattr_version << dendl;
3369       xbl.clear(); // no xattrs .. XXX what's this about?!?
3370       xattr_version = 0;
3371     }
3372   }
3373
3374   /*
3375    * note: encoding matches MClientReply::InodeStat
3376    */
3377   ::encode(oi->ino, bl);
3378   ::encode(snapid, bl);
3379   ::encode(oi->rdev, bl);
3380   ::encode(version, bl);
3381
3382   ::encode(xattr_version, bl);
3383
3384   ::encode(ecap, bl);
3385   {
3386     ceph_file_layout legacy_layout;
3387     layout.to_legacy(&legacy_layout);
3388     ::encode(legacy_layout, bl);
3389   }
3390   ::encode(any_i->ctime, bl);
3391   ::encode(file_i->mtime, bl);
3392   ::encode(file_i->atime, bl);
3393   ::encode(file_i->time_warp_seq, bl);
3394   ::encode(file_i->size, bl);
3395   ::encode(max_size, bl);
3396   ::encode(file_i->truncate_size, bl);
3397   ::encode(file_i->truncate_seq, bl);
3398
3399   ::encode(auth_i->mode, bl);
3400   ::encode((uint32_t)auth_i->uid, bl);
3401   ::encode((uint32_t)auth_i->gid, bl);
3402
3403   ::encode(link_i->nlink, bl);
3404
3405   ::encode(file_i->dirstat.nfiles, bl);
3406   ::encode(file_i->dirstat.nsubdirs, bl);
3407   ::encode(file_i->rstat.rbytes, bl);
3408   ::encode(file_i->rstat.rfiles, bl);
3409   ::encode(file_i->rstat.rsubdirs, bl);
3410   ::encode(file_i->rstat.rctime, bl);
3411
3412   dirfragtree.encode(bl);
3413
3414   ::encode(symlink, bl);
3415   if (session->connection->has_feature(CEPH_FEATURE_DIRLAYOUTHASH)) {
3416     ::encode(file_i->dir_layout, bl);
3417   }
3418   ::encode(xbl, bl);
3419   if (session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
3420     ::encode(inline_version, bl);
3421     ::encode(inline_data, bl);
3422   }
3423   if (session->connection->has_feature(CEPH_FEATURE_MDS_QUOTA)) {
3424     inode_t *policy_i = ppolicy ? pi : oi;
3425     ::encode(policy_i->quota, bl);
3426   }
3427   if (session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
3428     ::encode(layout.pool_ns, bl);
3429   }
3430   if (session->connection->has_feature(CEPH_FEATURE_FS_BTIME)) {
3431     ::encode(any_i->btime, bl);
3432     ::encode(any_i->change_attr, bl);
3433   }
3434
3435   return valid;
3436 }
3437
3438 void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
3439 {
3440   assert(cap);
3441
3442   client_t client = cap->get_client();
3443
3444   bool pfile = filelock.is_xlocked_by_client(client) || (cap->issued() & CEPH_CAP_FILE_EXCL);
3445   bool pauth = authlock.is_xlocked_by_client(client);
3446   bool plink = linklock.is_xlocked_by_client(client);
3447   bool pxattr = xattrlock.is_xlocked_by_client(client);
3448  
3449   inode_t *oi = &inode;
3450   inode_t *pi = get_projected_inode();
3451   inode_t *i = (pfile|pauth|plink|pxattr) ? pi : oi;
3452
3453   dout(20) << "encode_cap_message pfile " << pfile
3454            << " pauth " << pauth << " plink " << plink << " pxattr " << pxattr
3455            << " ctime " << i->ctime << dendl;
3456
3457   i = pfile ? pi:oi;
3458   m->set_layout(i->layout);
3459   m->size = i->size;
3460   m->truncate_seq = i->truncate_seq;
3461   m->truncate_size = i->truncate_size;
3462   m->mtime = i->mtime;
3463   m->atime = i->atime;
3464   m->ctime = i->ctime;
3465   m->change_attr = i->change_attr;
3466   m->time_warp_seq = i->time_warp_seq;
3467
3468   if (cap->client_inline_version < i->inline_data.version) {
3469     m->inline_version = cap->client_inline_version = i->inline_data.version;
3470     if (i->inline_data.length() > 0)
3471       m->inline_data = i->inline_data.get_data();
3472   } else {
3473     m->inline_version = 0;
3474   }
3475
3476   // max_size is min of projected, actual.
3477   uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0;
3478   uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0;
3479   m->max_size = MIN(oldms, newms);
3480
3481   i = pauth ? pi:oi;
3482   m->head.mode = i->mode;
3483   m->head.uid = i->uid;
3484   m->head.gid = i->gid;
3485
3486   i = plink ? pi:oi;
3487   m->head.nlink = i->nlink;
3488
3489   i = pxattr ? pi:oi;
3490   map<string,bufferptr> *ix = pxattr ? get_projected_xattrs() : &xattrs;
3491   if ((cap->pending() & CEPH_CAP_XATTR_SHARED) &&
3492       i->xattr_version > cap->client_xattr_version) {
3493     dout(10) << "    including xattrs v " << i->xattr_version << dendl;
3494     ::encode(*ix, m->xattrbl);
3495     m->head.xattr_version = i->xattr_version;
3496     cap->client_xattr_version = i->xattr_version;
3497   }
3498 }
3499
3500
3501
3502 void CInode::_encode_base(bufferlist& bl, uint64_t features)
3503 {
3504   ::encode(first, bl);
3505   ::encode(inode, bl, features);
3506   ::encode(symlink, bl);
3507   ::encode(dirfragtree, bl);
3508   ::encode(xattrs, bl);
3509   ::encode(old_inodes, bl, features);
3510   ::encode(damage_flags, bl);
3511   encode_snap(bl);
3512 }
3513 void CInode::_decode_base(bufferlist::iterator& p)
3514 {
3515   ::decode(first, p);
3516   ::decode(inode, p);
3517   ::decode(symlink, p);
3518   ::decode(dirfragtree, p);
3519   ::decode(xattrs, p);
3520   ::decode(old_inodes, p);
3521   ::decode(damage_flags, p);
3522   decode_snap(p);
3523 }
3524
3525 void CInode::_encode_locks_full(bufferlist& bl)
3526 {
3527   ::encode(authlock, bl);
3528   ::encode(linklock, bl);
3529   ::encode(dirfragtreelock, bl);
3530   ::encode(filelock, bl);
3531   ::encode(xattrlock, bl);
3532   ::encode(snaplock, bl);
3533   ::encode(nestlock, bl);
3534   ::encode(flocklock, bl);
3535   ::encode(policylock, bl);
3536
3537   ::encode(loner_cap, bl);
3538 }
3539 void CInode::_decode_locks_full(bufferlist::iterator& p)
3540 {
3541   ::decode(authlock, p);
3542   ::decode(linklock, p);
3543   ::decode(dirfragtreelock, p);
3544   ::decode(filelock, p);
3545   ::decode(xattrlock, p);
3546   ::decode(snaplock, p);
3547   ::decode(nestlock, p);
3548   ::decode(flocklock, p);
3549   ::decode(policylock, p);
3550
3551   ::decode(loner_cap, p);
3552   set_loner_cap(loner_cap);
3553   want_loner_cap = loner_cap;  // for now, we'll eval() shortly.
3554 }
3555
3556 void CInode::_encode_locks_state_for_replica(bufferlist& bl)
3557 {
3558   authlock.encode_state_for_replica(bl);
3559   linklock.encode_state_for_replica(bl);
3560   dirfragtreelock.encode_state_for_replica(bl);
3561   filelock.encode_state_for_replica(bl);
3562   nestlock.encode_state_for_replica(bl);
3563   xattrlock.encode_state_for_replica(bl);
3564   snaplock.encode_state_for_replica(bl);
3565   flocklock.encode_state_for_replica(bl);
3566   policylock.encode_state_for_replica(bl);
3567 }
3568 void CInode::_encode_locks_state_for_rejoin(bufferlist& bl, int rep)
3569 {
3570   authlock.encode_state_for_replica(bl);
3571   linklock.encode_state_for_replica(bl);
3572   dirfragtreelock.encode_state_for_rejoin(bl, rep);
3573   filelock.encode_state_for_rejoin(bl, rep);
3574   nestlock.encode_state_for_rejoin(bl, rep);
3575   xattrlock.encode_state_for_replica(bl);
3576   snaplock.encode_state_for_replica(bl);
3577   flocklock.encode_state_for_replica(bl);
3578   policylock.encode_state_for_replica(bl);
3579 }
3580 void CInode::_decode_locks_state(bufferlist::iterator& p, bool is_new)
3581 {
3582   authlock.decode_state(p, is_new);
3583   linklock.decode_state(p, is_new);
3584   dirfragtreelock.decode_state(p, is_new);
3585   filelock.decode_state(p, is_new);
3586   nestlock.decode_state(p, is_new);
3587   xattrlock.decode_state(p, is_new);
3588   snaplock.decode_state(p, is_new);
3589   flocklock.decode_state(p, is_new);
3590   policylock.decode_state(p, is_new);
3591 }
3592 void CInode::_decode_locks_rejoin(bufferlist::iterator& p, list<MDSInternalContextBase*>& waiters,
3593                                   list<SimpleLock*>& eval_locks)
3594 {
3595   authlock.decode_state_rejoin(p, waiters);
3596   linklock.decode_state_rejoin(p, waiters);
3597   dirfragtreelock.decode_state_rejoin(p, waiters);
3598   filelock.decode_state_rejoin(p, waiters);
3599   nestlock.decode_state_rejoin(p, waiters);
3600   xattrlock.decode_state_rejoin(p, waiters);
3601   snaplock.decode_state_rejoin(p, waiters);
3602   flocklock.decode_state_rejoin(p, waiters);
3603   policylock.decode_state_rejoin(p, waiters);
3604
3605   if (!dirfragtreelock.is_stable() && !dirfragtreelock.is_wrlocked())
3606     eval_locks.push_back(&dirfragtreelock);
3607   if (!filelock.is_stable() && !filelock.is_wrlocked())
3608     eval_locks.push_back(&filelock);
3609   if (!nestlock.is_stable() && !nestlock.is_wrlocked())
3610     eval_locks.push_back(&nestlock);
3611 }
3612
3613
3614 // IMPORT/EXPORT
3615
3616 void CInode::encode_export(bufferlist& bl)
3617 {
3618   ENCODE_START(5, 4, bl);
3619   _encode_base(bl, mdcache->mds->mdsmap->get_up_features());
3620
3621   ::encode(state, bl);
3622
3623   ::encode(pop, bl);
3624
3625   ::encode(get_replicas(), bl);
3626
3627   // include scatterlock info for any bounding CDirs
3628   bufferlist bounding;
3629   if (inode.is_dir())
3630     for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
3631          p != dirfrags.end();
3632          ++p) {
3633       CDir *dir = p->second;
3634       if (dir->state_test(CDir::STATE_EXPORTBOUND)) {
3635         ::encode(p->first, bounding);
3636         ::encode(dir->fnode.fragstat, bounding);
3637         ::encode(dir->fnode.accounted_fragstat, bounding);
3638         ::encode(dir->fnode.rstat, bounding);
3639         ::encode(dir->fnode.accounted_rstat, bounding);
3640         dout(10) << " encoded fragstat/rstat info for " << *dir << dendl;
3641       }
3642     }
3643   ::encode(bounding, bl);
3644
3645   _encode_locks_full(bl);
3646
3647   _encode_file_locks(bl);
3648
3649   ENCODE_FINISH(bl);
3650
3651   get(PIN_TEMPEXPORTING);
3652 }
3653
3654 void CInode::finish_export(utime_t now)
3655 {
3656   state &= MASK_STATE_EXPORT_KEPT;
3657
3658   pop.zero(now);
3659
3660   // just in case!
3661   //dirlock.clear_updated();
3662
3663   loner_cap = -1;
3664
3665   put(PIN_TEMPEXPORTING);
3666 }
3667
3668 void CInode::decode_import(bufferlist::iterator& p,
3669                            LogSegment *ls)
3670 {
3671   DECODE_START(5, p);
3672
3673   _decode_base(p);
3674
3675   unsigned s;
3676   ::decode(s, p);
3677   state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
3678
3679   if (is_dirty()) {
3680     get(PIN_DIRTY);
3681     _mark_dirty(ls);
3682   }
3683   if (is_dirty_parent()) {
3684     get(PIN_DIRTYPARENT);
3685     _mark_dirty_parent(ls);
3686   }
3687
3688   ::decode(pop, ceph_clock_now(), p);
3689
3690   ::decode(get_replicas(), p);
3691   if (is_replicated())
3692     get(PIN_REPLICATED);
3693   replica_nonce = 0;
3694
3695   // decode fragstat info on bounding cdirs
3696   bufferlist bounding;
3697   ::decode(bounding, p);
3698   bufferlist::iterator q = bounding.begin();
3699   while (!q.end()) {
3700     frag_t fg;
3701     ::decode(fg, q);
3702     CDir *dir = get_dirfrag(fg);
3703     assert(dir);  // we should have all bounds open
3704
3705     // Only take the remote's fragstat/rstat if we are non-auth for
3706     // this dirfrag AND the lock is NOT in a scattered (MIX) state.
3707     // We know lock is stable, and MIX is the only state in which
3708     // the inode auth (who sent us this data) may not have the best
3709     // info.
3710
3711     // HMM: Are there cases where dir->is_auth() is an insufficient
3712     // check because the dirfrag is under migration?  That implies
3713     // it is frozen (and in a SYNC or LOCK state).  FIXME.
3714
3715     if (dir->is_auth() ||
3716         filelock.get_state() == LOCK_MIX) {
3717       dout(10) << " skipped fragstat info for " << *dir << dendl;
3718       frag_info_t f;
3719       ::decode(f, q);
3720       ::decode(f, q);
3721     } else {
3722       ::decode(dir->fnode.fragstat, q);
3723       ::decode(dir->fnode.accounted_fragstat, q);
3724       dout(10) << " took fragstat info for " << *dir << dendl;
3725     }
3726     if (dir->is_auth() ||
3727         nestlock.get_state() == LOCK_MIX) {
3728       dout(10) << " skipped rstat info for " << *dir << dendl;
3729       nest_info_t n;
3730       ::decode(n, q);
3731       ::decode(n, q);
3732     } else {
3733       ::decode(dir->fnode.rstat, q);
3734       ::decode(dir->fnode.accounted_rstat, q);
3735       dout(10) << " took rstat info for " << *dir << dendl;
3736     }
3737   }
3738
3739   _decode_locks_full(p);
3740
3741   _decode_file_locks(p);
3742
3743   DECODE_FINISH(p);
3744 }
3745
3746
3747 void InodeStoreBase::dump(Formatter *f) const
3748 {
3749   inode.dump(f);
3750   f->dump_string("symlink", symlink);
3751   f->open_array_section("old_inodes");
3752   for (compact_map<snapid_t, old_inode_t>::const_iterator i = old_inodes.begin();
3753       i != old_inodes.end(); ++i) {
3754     f->open_object_section("old_inode");
3755     {
3756       // The key is the last snapid, the first is in the old_inode_t
3757       f->dump_int("last", i->first);
3758       i->second.dump(f);
3759     }
3760     f->close_section();  // old_inode
3761   }
3762   f->close_section();  // old_inodes
3763
3764   f->open_object_section("dirfragtree");
3765   dirfragtree.dump(f);
3766   f->close_section(); // dirfragtree
3767 }
3768
3769
3770 void InodeStore::generate_test_instances(list<InodeStore*> &ls)
3771 {
3772   InodeStore *populated = new InodeStore;
3773   populated->inode.ino = 0xdeadbeef;
3774   populated->symlink = "rhubarb";
3775   ls.push_back(populated);
3776 }
3777
3778 void CInode::validate_disk_state(CInode::validated_data *results,
3779                                  MDSInternalContext *fin)
3780 {
3781   class ValidationContinuation : public MDSContinuation {
3782   public:
3783     MDSInternalContext *fin;
3784     CInode *in;
3785     CInode::validated_data *results;
3786     bufferlist bl;
3787     CInode *shadow_in;
3788
3789     enum {
3790       START = 0,
3791       BACKTRACE,
3792       INODE,
3793       DIRFRAGS
3794     };
3795
3796     ValidationContinuation(CInode *i,
3797                            CInode::validated_data *data_r,
3798                            MDSInternalContext *fin_) :
3799                              MDSContinuation(i->mdcache->mds->server),
3800                              fin(fin_),
3801                              in(i),
3802                              results(data_r),
3803                              shadow_in(NULL) {
3804       set_callback(START, static_cast<Continuation::stagePtr>(&ValidationContinuation::_start));
3805       set_callback(BACKTRACE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_backtrace));
3806       set_callback(INODE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_inode_disk));
3807       set_callback(DIRFRAGS, static_cast<Continuation::stagePtr>(&ValidationContinuation::_dirfrags));
3808     }
3809
3810     ~ValidationContinuation() override {
3811       delete shadow_in;
3812     }
3813
3814     /**
3815      * Fetch backtrace and set tag if tag is non-empty
3816      */
3817     void fetch_backtrace_and_tag(CInode *in, std::string tag,
3818                                  Context *fin, int *bt_r, bufferlist *bt)
3819     {
3820       const int64_t pool = in->get_backtrace_pool();
3821       object_t oid = CInode::get_object_name(in->ino(), frag_t(), "");
3822
3823       ObjectOperation fetch;
3824       fetch.getxattr("parent", bt, bt_r);
3825       in->mdcache->mds->objecter->read(oid, object_locator_t(pool), fetch, CEPH_NOSNAP,
3826                                        NULL, 0, fin);
3827       if (!tag.empty()) {
3828         ObjectOperation scrub_tag;
3829         bufferlist tag_bl;
3830         ::encode(tag, tag_bl);
3831         scrub_tag.setxattr("scrub_tag", tag_bl);
3832         SnapContext snapc;
3833         in->mdcache->mds->objecter->mutate(oid, object_locator_t(pool), scrub_tag, snapc,
3834                                            ceph::real_clock::now(),
3835                                            0, NULL);
3836       }
3837     }
3838
3839     bool _start(int rval) {
3840       if (in->is_dirty()) {
3841         MDCache *mdcache = in->mdcache;
3842         inode_t& inode = in->inode;
3843         dout(20) << "validating a dirty CInode; results will be inconclusive"
3844                  << dendl;
3845       }
3846       if (in->is_symlink()) {
3847         // there's nothing to do for symlinks!
3848         return true;
3849       }
3850
3851       C_OnFinisher *conf = new C_OnFinisher(get_io_callback(BACKTRACE),
3852                                             in->mdcache->mds->finisher);
3853
3854       // Whether we have a tag to apply depends on ScrubHeader (if one is
3855       // present)
3856       if (in->scrub_infop) {
3857         // I'm a non-orphan, so look up my ScrubHeader via my linkage
3858         const std::string &tag = in->scrub_infop->header->get_tag();
3859         // Rather than using the usual CInode::fetch_backtrace,
3860         // use a special variant that optionally writes a tag in the same
3861         // operation.
3862         fetch_backtrace_and_tag(in, tag, conf,
3863                                 &results->backtrace.ondisk_read_retval, &bl);
3864       } else {
3865         // When we're invoked outside of ScrubStack we might be called
3866         // on an orphaned inode like /
3867         fetch_backtrace_and_tag(in, {}, conf,
3868                                 &results->backtrace.ondisk_read_retval, &bl);
3869       }
3870       return false;
3871     }
3872
3873     bool _backtrace(int rval) {
3874       // set up basic result reporting and make sure we got the data
3875       results->performed_validation = true; // at least, some of it!
3876       results->backtrace.checked = true;
3877
3878       const int64_t pool = in->get_backtrace_pool();
3879       inode_backtrace_t& memory_backtrace = results->backtrace.memory_value;
3880       in->build_backtrace(pool, memory_backtrace);
3881       bool equivalent, divergent;
3882       int memory_newer;
3883
3884       MDCache *mdcache = in->mdcache;  // For the benefit of dout
3885       const inode_t& inode = in->inode;  // For the benefit of dout
3886
3887       // Ignore rval because it's the result of a FAILOK operation
3888       // from fetch_backtrace_and_tag: the real result is in
3889       // backtrace.ondisk_read_retval
3890       dout(20) << "ondisk_read_retval: " << results->backtrace.ondisk_read_retval << dendl;
3891       if (results->backtrace.ondisk_read_retval != 0) {
3892         results->backtrace.error_str << "failed to read off disk; see retval";
3893         goto next;
3894       }
3895
3896       // extract the backtrace, and compare it to a newly-constructed one
3897       try {
3898         bufferlist::iterator p = bl.begin();
3899         ::decode(results->backtrace.ondisk_value, p);
3900         dout(10) << "decoded " << bl.length() << " bytes of backtrace successfully" << dendl;
3901       } catch (buffer::error&) {
3902         if (results->backtrace.ondisk_read_retval == 0 && rval != 0) {
3903           // Cases where something has clearly gone wrong with the overall
3904           // fetch op, though we didn't get a nonzero rc from the getxattr
3905           // operation.  e.g. object missing.
3906           results->backtrace.ondisk_read_retval = rval;
3907         }
3908         results->backtrace.error_str << "failed to decode on-disk backtrace ("
3909                                      << bl.length() << " bytes)!";
3910         goto next;
3911       }
3912
3913       memory_newer = memory_backtrace.compare(results->backtrace.ondisk_value,
3914                                               &equivalent, &divergent);
3915
3916       if (divergent || memory_newer < 0) {
3917         // we're divergent, or on-disk version is newer
3918         results->backtrace.error_str << "On-disk backtrace is divergent or newer";
3919       } else {
3920         results->backtrace.passed = true;
3921       }
3922 next:
3923
3924       if (!results->backtrace.passed && in->scrub_infop->header->get_repair()) {
3925         std::string path;
3926         in->make_path_string(path);
3927         in->mdcache->mds->clog->warn() << "bad backtrace on inode " << in->ino()
3928                                        << "(" << path << "), rewriting it";
3929         in->_mark_dirty_parent(in->mdcache->mds->mdlog->get_current_segment(),
3930                            false);
3931       }
3932
3933       // If the inode's number was free in the InoTable, fix that
3934       // (#15619)
3935       {
3936         InoTable *inotable = mdcache->mds->inotable;
3937
3938         dout(10) << "scrub: inotable ino = " << inode.ino << dendl;
3939         dout(10) << "scrub: inotable free says "
3940           << inotable->is_marked_free(inode.ino) << dendl;
3941
3942         if (inotable->is_marked_free(inode.ino)) {
3943           LogChannelRef clog = in->mdcache->mds->clog;
3944           clog->error() << "scrub: inode wrongly marked free: 0x" << std::hex
3945             << inode.ino;
3946
3947           if (in->scrub_infop->header->get_repair()) {
3948             bool repaired = inotable->repair(inode.ino);
3949             if (repaired) {
3950               clog->error() << "inode table repaired for inode: 0x" << std::hex
3951                 << inode.ino;
3952
3953               inotable->save();
3954             } else {
3955               clog->error() << "Cannot repair inotable while other operations"
3956                 " are in progress";
3957             }
3958           }
3959         }
3960       }
3961
3962       // quit if we're a file, or kick off directory checks otherwise
3963       // TODO: validate on-disk inode for non-base directories
3964       if (!in->is_dir()) {
3965         return true;
3966       }
3967
3968       return validate_directory_data();
3969     }
3970
3971     bool validate_directory_data() {
3972       assert(in->is_dir());
3973
3974       if (in->is_base()) {
3975         shadow_in = new CInode(in->mdcache);
3976         in->mdcache->create_unlinked_system_inode(shadow_in,
3977                                                   in->inode.ino,
3978                                                   in->inode.mode);
3979         shadow_in->fetch(get_internal_callback(INODE));
3980         return false;
3981       } else {
3982         results->inode.passed = true;
3983         return check_dirfrag_rstats();
3984       }
3985     }
3986
3987     bool _inode_disk(int rval) {
3988       results->inode.checked = true;
3989       results->inode.ondisk_read_retval = rval;
3990       results->inode.ondisk_value = shadow_in->inode;
3991       results->inode.memory_value = in->inode;
3992
3993       inode_t& si = shadow_in->inode;
3994       inode_t& i = in->inode;
3995       if (si.version > i.version) {
3996         // uh, what?
3997         results->inode.error_str << "On-disk inode is newer than in-memory one!";
3998         goto next;
3999       } else {
4000         bool divergent = false;
4001         int r = i.compare(si, &divergent);
4002         results->inode.passed = !divergent && r >= 0;
4003         if (!results->inode.passed) {
4004           results->inode.error_str <<
4005               "On-disk inode is divergent or newer than in-memory one!";
4006           goto next;
4007         }
4008       }
4009 next:
4010       return check_dirfrag_rstats();
4011     }
4012
4013     bool check_dirfrag_rstats() {
4014       MDSGatherBuilder gather(g_ceph_context);
4015       std::list<frag_t> frags;
4016       in->dirfragtree.get_leaves(frags);
4017       for (list<frag_t>::iterator p = frags.begin();
4018           p != frags.end();
4019           ++p) {
4020         CDir *dir = in->get_or_open_dirfrag(in->mdcache, *p);
4021         dir->scrub_info();
4022         if (!dir->scrub_infop->header)
4023           dir->scrub_infop->header = in->scrub_infop->header;
4024         if (dir->is_complete()) {
4025           dir->scrub_local();
4026         } else {
4027           dir->scrub_infop->need_scrub_local = true;
4028           dir->fetch(gather.new_sub(), false);
4029         }
4030       }
4031       if (gather.has_subs()) {
4032         gather.set_finisher(get_internal_callback(DIRFRAGS));
4033         gather.activate();
4034         return false;
4035       } else {
4036         return immediate(DIRFRAGS, 0);
4037       }
4038     }
4039
4040     bool _dirfrags(int rval) {
4041       int frags_errors = 0;
4042       // basic reporting setup
4043       results->raw_stats.checked = true;
4044       results->raw_stats.ondisk_read_retval = rval;
4045
4046       results->raw_stats.memory_value.dirstat = in->inode.dirstat;
4047       results->raw_stats.memory_value.rstat = in->inode.rstat;
4048       frag_info_t& dir_info = results->raw_stats.ondisk_value.dirstat;
4049       nest_info_t& nest_info = results->raw_stats.ondisk_value.rstat;
4050
4051       if (rval != 0) {
4052         results->raw_stats.error_str << "Failed to read dirfrags off disk";
4053         goto next;
4054       }
4055
4056       // check each dirfrag...
4057       for (compact_map<frag_t,CDir*>::iterator p = in->dirfrags.begin();
4058            p != in->dirfrags.end();
4059            ++p) {
4060         CDir *dir = p->second;
4061         assert(dir->get_version() > 0);
4062         nest_info.add(dir->fnode.accounted_rstat);
4063         dir_info.add(dir->fnode.accounted_fragstat);
4064         if (dir->scrub_infop &&
4065             dir->scrub_infop->pending_scrub_error) {
4066           dir->scrub_infop->pending_scrub_error = false;
4067           if (dir->scrub_infop->header->get_repair()) {
4068             results->raw_stats.error_str
4069               << "dirfrag(" << p->first << ") has bad stats (will be fixed); ";
4070           } else {
4071             results->raw_stats.error_str
4072               << "dirfrag(" << p->first << ") has bad stats; ";
4073           }
4074           frags_errors++;
4075         }
4076       }
4077       nest_info.rsubdirs++; // it gets one to account for self
4078       // ...and that their sum matches our inode settings
4079       if (!dir_info.same_sums(in->inode.dirstat) ||
4080           !nest_info.same_sums(in->inode.rstat)) {
4081         if (in->scrub_infop &&
4082             in->scrub_infop->header->get_repair()) {
4083           results->raw_stats.error_str
4084             << "freshly-calculated rstats don't match existing ones (will be fixed)";
4085           in->mdcache->repair_inode_stats(in);
4086         } else {
4087           results->raw_stats.error_str
4088             << "freshly-calculated rstats don't match existing ones";
4089         }
4090         goto next;
4091       }
4092       if (frags_errors > 0)
4093         goto next;
4094
4095       results->raw_stats.passed = true;
4096 next:
4097       return true;
4098     }
4099
4100     void _done() override {
4101       if ((!results->raw_stats.checked || results->raw_stats.passed) &&
4102           (!results->backtrace.checked || results->backtrace.passed) &&
4103           (!results->inode.checked || results->inode.passed))
4104         results->passed_validation = true;
4105       if (fin) {
4106         fin->complete(get_rval());
4107       }
4108     }
4109   };
4110
4111
4112   dout(10) << "scrub starting validate_disk_state on " << *this << dendl;
4113   ValidationContinuation *vc = new ValidationContinuation(this,
4114                                                           results,
4115                                                           fin);
4116   vc->begin();
4117 }
4118
4119 void CInode::validated_data::dump(Formatter *f) const
4120 {
4121   f->open_object_section("results");
4122   {
4123     f->dump_bool("performed_validation", performed_validation);
4124     f->dump_bool("passed_validation", passed_validation);
4125     f->open_object_section("backtrace");
4126     {
4127       f->dump_bool("checked", backtrace.checked);
4128       f->dump_bool("passed", backtrace.passed);
4129       f->dump_int("read_ret_val", backtrace.ondisk_read_retval);
4130       f->dump_stream("ondisk_value") << backtrace.ondisk_value;
4131       f->dump_stream("memoryvalue") << backtrace.memory_value;
4132       f->dump_string("error_str", backtrace.error_str.str());
4133     }
4134     f->close_section(); // backtrace
4135     f->open_object_section("raw_stats");
4136     {
4137       f->dump_bool("checked", raw_stats.checked);
4138       f->dump_bool("passed", raw_stats.passed);
4139       f->dump_int("read_ret_val", raw_stats.ondisk_read_retval);
4140       f->dump_stream("ondisk_value.dirstat") << raw_stats.ondisk_value.dirstat;
4141       f->dump_stream("ondisk_value.rstat") << raw_stats.ondisk_value.rstat;
4142       f->dump_stream("memory_value.dirrstat") << raw_stats.memory_value.dirstat;
4143       f->dump_stream("memory_value.rstat") << raw_stats.memory_value.rstat;
4144       f->dump_string("error_str", raw_stats.error_str.str());
4145     }
4146     f->close_section(); // raw_stats
4147     // dump failure return code
4148     int rc = 0;
4149     if (backtrace.checked && backtrace.ondisk_read_retval)
4150       rc = backtrace.ondisk_read_retval;
4151     if (inode.checked && inode.ondisk_read_retval)
4152       rc = inode.ondisk_read_retval;
4153     if (raw_stats.checked && raw_stats.ondisk_read_retval)
4154       rc = raw_stats.ondisk_read_retval;
4155     f->dump_int("return_code", rc);
4156   }
4157   f->close_section(); // results
4158 }
4159
4160 void CInode::dump(Formatter *f) const
4161 {
4162   InodeStoreBase::dump(f);
4163
4164   MDSCacheObject::dump(f);
4165
4166   f->open_object_section("versionlock");
4167   versionlock.dump(f);
4168   f->close_section();
4169
4170   f->open_object_section("authlock");
4171   authlock.dump(f);
4172   f->close_section();
4173
4174   f->open_object_section("linklock");
4175   linklock.dump(f);
4176   f->close_section();
4177
4178   f->open_object_section("dirfragtreelock");
4179   dirfragtreelock.dump(f);
4180   f->close_section();
4181
4182   f->open_object_section("filelock");
4183   filelock.dump(f);
4184   f->close_section();
4185
4186   f->open_object_section("xattrlock");
4187   xattrlock.dump(f);
4188   f->close_section();
4189
4190   f->open_object_section("snaplock");
4191   snaplock.dump(f);
4192   f->close_section();
4193
4194   f->open_object_section("nestlock");
4195   nestlock.dump(f);
4196   f->close_section();
4197
4198   f->open_object_section("flocklock");
4199   flocklock.dump(f);
4200   f->close_section();
4201
4202   f->open_object_section("policylock");
4203   policylock.dump(f);
4204   f->close_section();
4205
4206   f->open_array_section("states");
4207   MDSCacheObject::dump_states(f);
4208   if (state_test(STATE_EXPORTING))
4209     f->dump_string("state", "exporting");
4210   if (state_test(STATE_OPENINGDIR))
4211     f->dump_string("state", "openingdir");
4212   if (state_test(STATE_FREEZING))
4213     f->dump_string("state", "freezing");
4214   if (state_test(STATE_FROZEN))
4215     f->dump_string("state", "frozen");
4216   if (state_test(STATE_AMBIGUOUSAUTH))
4217     f->dump_string("state", "ambiguousauth");
4218   if (state_test(STATE_EXPORTINGCAPS))
4219     f->dump_string("state", "exportingcaps");
4220   if (state_test(STATE_NEEDSRECOVER))
4221     f->dump_string("state", "needsrecover");
4222   if (state_test(STATE_PURGING))
4223     f->dump_string("state", "purging");
4224   if (state_test(STATE_DIRTYPARENT))
4225     f->dump_string("state", "dirtyparent");
4226   if (state_test(STATE_DIRTYRSTAT))
4227     f->dump_string("state", "dirtyrstat");
4228   if (state_test(STATE_STRAYPINNED))
4229     f->dump_string("state", "straypinned");
4230   if (state_test(STATE_FROZENAUTHPIN))
4231     f->dump_string("state", "frozenauthpin");
4232   if (state_test(STATE_DIRTYPOOL))
4233     f->dump_string("state", "dirtypool");
4234   if (state_test(STATE_ORPHAN))
4235     f->dump_string("state", "orphan");
4236   if (state_test(STATE_MISSINGOBJS))
4237     f->dump_string("state", "missingobjs");
4238   f->close_section();
4239
4240   f->open_array_section("client_caps");
4241   for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
4242        it != client_caps.end(); ++it) {
4243     f->open_object_section("client_cap");
4244     f->dump_int("client_id", it->first.v);
4245     f->dump_string("pending", ccap_string(it->second->pending()));
4246     f->dump_string("issued", ccap_string(it->second->issued()));
4247     f->dump_string("wanted", ccap_string(it->second->wanted()));
4248     f->dump_string("last_sent", ccap_string(it->second->get_last_sent()));
4249     f->close_section();
4250   }
4251   f->close_section();
4252
4253   f->dump_int("loner", loner_cap.v);
4254   f->dump_int("want_loner", want_loner_cap.v);
4255
4256   f->open_array_section("mds_caps_wanted");
4257   for (compact_map<int,int>::const_iterator p = mds_caps_wanted.begin();
4258        p != mds_caps_wanted.end(); ++p) {
4259     f->open_object_section("mds_cap_wanted");
4260     f->dump_int("rank", p->first);
4261     f->dump_string("cap", ccap_string(p->second));
4262     f->close_section();
4263   }
4264   f->close_section();
4265 }
4266
4267 /****** Scrub Stuff *****/
4268 void CInode::scrub_info_create() const
4269 {
4270   dout(25) << __func__ << dendl;
4271   assert(!scrub_infop);
4272
4273   // break out of const-land to set up implicit initial state
4274   CInode *me = const_cast<CInode*>(this);
4275   inode_t *in = me->get_projected_inode();
4276
4277   scrub_info_t *si = new scrub_info_t();
4278   si->scrub_start_stamp = si->last_scrub_stamp = in->last_scrub_stamp;
4279   si->scrub_start_version = si->last_scrub_version = in->last_scrub_version;
4280
4281   me->scrub_infop = si;
4282 }
4283
4284 void CInode::scrub_maybe_delete_info()
4285 {
4286   if (scrub_infop &&
4287       !scrub_infop->scrub_in_progress &&
4288       !scrub_infop->last_scrub_dirty) {
4289     delete scrub_infop;
4290     scrub_infop = NULL;
4291   }
4292 }
4293
4294 void CInode::scrub_initialize(CDentry *scrub_parent,
4295                               const ScrubHeaderRefConst& header,
4296                               MDSInternalContextBase *f)
4297 {
4298   dout(20) << __func__ << " with scrub_version " << get_version() << dendl;
4299   assert(!scrub_is_in_progress());
4300   scrub_info();
4301   if (!scrub_infop)
4302     scrub_infop = new scrub_info_t();
4303
4304   if (get_projected_inode()->is_dir()) {
4305     // fill in dirfrag_stamps with initial state
4306     std::list<frag_t> frags;
4307     dirfragtree.get_leaves(frags);
4308     for (std::list<frag_t>::iterator i = frags.begin();
4309         i != frags.end();
4310         ++i) {
4311       if (header->get_force())
4312         scrub_infop->dirfrag_stamps[*i].reset();
4313       else
4314         scrub_infop->dirfrag_stamps[*i];
4315     }
4316   }
4317
4318   if (scrub_parent)
4319     scrub_parent->get(CDentry::PIN_SCRUBPARENT);
4320   scrub_infop->scrub_parent = scrub_parent;
4321   scrub_infop->on_finish = f;
4322   scrub_infop->scrub_in_progress = true;
4323   scrub_infop->children_scrubbed = false;
4324   scrub_infop->header = header;
4325
4326   scrub_infop->scrub_start_version = get_version();
4327   scrub_infop->scrub_start_stamp = ceph_clock_now();
4328   // right now we don't handle remote inodes
4329 }
4330
4331 int CInode::scrub_dirfrag_next(frag_t* out_dirfrag)
4332 {
4333   dout(20) << __func__ << dendl;
4334   assert(scrub_is_in_progress());
4335
4336   if (!is_dir()) {
4337     return -ENOTDIR;
4338   }
4339
4340   std::map<frag_t, scrub_stamp_info_t>::iterator i =
4341       scrub_infop->dirfrag_stamps.begin();
4342
4343   while (i != scrub_infop->dirfrag_stamps.end()) {
4344     if (i->second.scrub_start_version < scrub_infop->scrub_start_version) {
4345       i->second.scrub_start_version = get_projected_version();
4346       i->second.scrub_start_stamp = ceph_clock_now();
4347       *out_dirfrag = i->first;
4348       dout(20) << " return frag " << *out_dirfrag << dendl;
4349       return 0;
4350     }
4351     ++i;
4352   }
4353
4354   dout(20) << " no frags left, ENOENT " << dendl;
4355   return ENOENT;
4356 }
4357
4358 void CInode::scrub_dirfrags_scrubbing(list<frag_t>* out_dirfrags)
4359 {
4360   assert(out_dirfrags != NULL);
4361   assert(scrub_infop != NULL);
4362
4363   out_dirfrags->clear();
4364   std::map<frag_t, scrub_stamp_info_t>::iterator i =
4365       scrub_infop->dirfrag_stamps.begin();
4366
4367   while (i != scrub_infop->dirfrag_stamps.end()) {
4368     if (i->second.scrub_start_version >= scrub_infop->scrub_start_version) {
4369       if (i->second.last_scrub_version < scrub_infop->scrub_start_version)
4370         out_dirfrags->push_back(i->first);
4371     } else {
4372       return;
4373     }
4374
4375     ++i;
4376   }
4377 }
4378
4379 void CInode::scrub_dirfrag_finished(frag_t dirfrag)
4380 {
4381   dout(20) << __func__ << " on frag " << dirfrag << dendl;
4382   assert(scrub_is_in_progress());
4383
4384   std::map<frag_t, scrub_stamp_info_t>::iterator i =
4385       scrub_infop->dirfrag_stamps.find(dirfrag);
4386   assert(i != scrub_infop->dirfrag_stamps.end());
4387
4388   scrub_stamp_info_t &si = i->second;
4389   si.last_scrub_stamp = si.scrub_start_stamp;
4390   si.last_scrub_version = si.scrub_start_version;
4391 }
4392
4393 void CInode::scrub_finished(MDSInternalContextBase **c) {
4394   dout(20) << __func__ << dendl;
4395   assert(scrub_is_in_progress());
4396   for (std::map<frag_t, scrub_stamp_info_t>::iterator i =
4397       scrub_infop->dirfrag_stamps.begin();
4398       i != scrub_infop->dirfrag_stamps.end();
4399       ++i) {
4400     if(i->second.last_scrub_version != i->second.scrub_start_version) {
4401       derr << i->second.last_scrub_version << " != "
4402         << i->second.scrub_start_version << dendl;
4403     }
4404     assert(i->second.last_scrub_version == i->second.scrub_start_version);
4405   }
4406
4407   scrub_infop->last_scrub_version = scrub_infop->scrub_start_version;
4408   scrub_infop->last_scrub_stamp = scrub_infop->scrub_start_stamp;
4409   scrub_infop->last_scrub_dirty = true;
4410   scrub_infop->scrub_in_progress = false;
4411
4412   if (scrub_infop->scrub_parent) {
4413     CDentry *dn = scrub_infop->scrub_parent;
4414     scrub_infop->scrub_parent = NULL;
4415     dn->dir->scrub_dentry_finished(dn);
4416     dn->put(CDentry::PIN_SCRUBPARENT);
4417   }
4418
4419   *c = scrub_infop->on_finish;
4420   scrub_infop->on_finish = NULL;
4421
4422   if (scrub_infop->header->get_origin() == this) {
4423     // We are at the point that a tagging scrub was initiated
4424     LogChannelRef clog = mdcache->mds->clog;
4425     clog->info() << "scrub complete with tag '" << scrub_infop->header->get_tag() << "'";
4426   }
4427 }
4428
4429 int64_t CInode::get_backtrace_pool() const
4430 {
4431   if (is_dir()) {
4432     return mdcache->mds->mdsmap->get_metadata_pool();
4433   } else {
4434     // Files are required to have an explicit layout that specifies
4435     // a pool
4436     assert(inode.layout.pool_id != -1);
4437     return inode.layout.pool_id;
4438   }
4439 }
4440
4441 void CInode::maybe_export_pin(bool update)
4442 {
4443   if (!g_conf->mds_bal_export_pin)
4444     return;
4445   if (!is_dir() || !is_normal())
4446     return;
4447
4448   mds_rank_t export_pin = get_export_pin(false);
4449   if (export_pin == MDS_RANK_NONE && !update)
4450     return;
4451
4452   if (state_test(CInode::STATE_QUEUEDEXPORTPIN))
4453     return;
4454
4455   bool queue = false;
4456   for (auto p = dirfrags.begin(); p != dirfrags.end(); p++) {
4457     CDir *dir = p->second;
4458     if (!dir->is_auth())
4459       continue;
4460     if (export_pin != MDS_RANK_NONE) {
4461       if (dir->is_subtree_root()) {
4462         // set auxsubtree bit or export it
4463         if (!dir->state_test(CDir::STATE_AUXSUBTREE) ||
4464             export_pin != dir->get_dir_auth().first)
4465           queue = true;
4466       } else {
4467         // create aux subtree or export it
4468         queue = true;
4469       }
4470     } else {
4471       // clear aux subtrees ?
4472       queue = dir->state_test(CDir::STATE_AUXSUBTREE);
4473     }
4474     if (queue) {
4475       state_set(CInode::STATE_QUEUEDEXPORTPIN);
4476       mdcache->export_pin_queue.insert(this);
4477       break;
4478     }
4479   }
4480 }
4481
4482 void CInode::set_export_pin(mds_rank_t rank)
4483 {
4484   assert(is_dir());
4485   assert(is_projected());
4486   get_projected_inode()->export_pin = rank;
4487   maybe_export_pin(true);
4488 }
4489
4490 mds_rank_t CInode::get_export_pin(bool inherit) const
4491 {
4492   /* An inode that is export pinned may not necessarily be a subtree root, we
4493    * need to traverse the parents. A base or system inode cannot be pinned.
4494    * N.B. inodes not yet linked into a dir (i.e. anonymous inodes) will not
4495    * have a parent yet.
4496    */
4497   for (const CInode *in = this; !in->is_base() && !in->is_system() && in->get_projected_parent_dn(); in = in->get_projected_parent_dn()->dir->inode) {
4498     mds_rank_t pin = in->get_projected_inode()->export_pin;
4499     if (pin >= 0) {
4500       return pin;
4501     }
4502     if (!inherit) break;
4503   }
4504   return MDS_RANK_NONE;
4505 }
4506
4507 bool CInode::is_exportable(mds_rank_t dest) const
4508 {
4509   mds_rank_t pin = get_export_pin();
4510   if (pin == dest) {
4511     return true;
4512   } else if (pin >= 0) {
4513     return false;
4514   } else {
4515     return true;
4516   }
4517 }
4518
4519 MEMPOOL_DEFINE_OBJECT_FACTORY(CInode, co_inode, mds_co);