1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "include/int_types.h"
16 #include "common/errno.h"
31 #include "events/EUpdate.h"
33 #include "osdc/Objecter.h"
37 #include "LogSegment.h"
39 #include "common/Clock.h"
41 #include "messages/MLock.h"
42 #include "messages/MClientCaps.h"
44 #include "common/config.h"
45 #include "global/global_context.h"
46 #include "include/assert.h"
48 #include "mds/MDSContinuation.h"
49 #include "mds/InoTable.h"
51 #define dout_context g_ceph_context
52 #define dout_subsys ceph_subsys_mds
54 #define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") "
57 class CInodeIOContext : public MDSIOContextBase
61 MDSRank *get_mds() override {return in->mdcache->mds;}
63 explicit CInodeIOContext(CInode *in_) : in(in_) {
69 LockType CInode::versionlock_type(CEPH_LOCK_IVERSION);
70 LockType CInode::authlock_type(CEPH_LOCK_IAUTH);
71 LockType CInode::linklock_type(CEPH_LOCK_ILINK);
72 LockType CInode::dirfragtreelock_type(CEPH_LOCK_IDFT);
73 LockType CInode::filelock_type(CEPH_LOCK_IFILE);
74 LockType CInode::xattrlock_type(CEPH_LOCK_IXATTR);
75 LockType CInode::snaplock_type(CEPH_LOCK_ISNAP);
76 LockType CInode::nestlock_type(CEPH_LOCK_INEST);
77 LockType CInode::flocklock_type(CEPH_LOCK_IFLOCK);
78 LockType CInode::policylock_type(CEPH_LOCK_IPOLICY);
80 //int cinode_pins[CINODE_NUM_PINS]; // counts
81 ostream& CInode::print_db_line_prefix(ostream& out)
83 return out << ceph_clock_now() << " mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") ";
87 * write caps and lock ids
89 struct cinode_lock_info_t cinode_lock_info[] = {
90 { CEPH_LOCK_IFILE, CEPH_CAP_ANY_FILE_WR },
91 { CEPH_LOCK_IAUTH, CEPH_CAP_AUTH_EXCL },
92 { CEPH_LOCK_ILINK, CEPH_CAP_LINK_EXCL },
93 { CEPH_LOCK_IXATTR, CEPH_CAP_XATTR_EXCL },
95 int num_cinode_locks = sizeof(cinode_lock_info) / sizeof(cinode_lock_info[0]);
99 ostream& operator<<(ostream& out, const CInode& in)
102 in.make_path_string(path, true);
104 out << "[inode " << in.inode.ino;
106 << (in.is_multiversion() ? "...":"")
107 << in.first << "," << in.last << "]";
108 out << " " << path << (in.is_dir() ? "/":"");
112 if (in.is_replicated())
113 out << in.get_replicas();
115 mds_authority_t a = in.authority();
116 out << " rep@" << a.first;
117 if (a.second != CDIR_AUTH_UNKNOWN)
118 out << "," << a.second;
119 out << "." << in.get_replica_nonce();
123 out << " symlink='" << in.symlink << "'";
124 if (in.is_dir() && !in.dirfragtree.empty())
125 out << " " << in.dirfragtree;
127 out << " v" << in.get_version();
128 if (in.get_projected_version() > in.get_version())
129 out << " pv" << in.get_projected_version();
131 if (in.is_auth_pinned()) {
132 out << " ap=" << in.get_num_auth_pins() << "+" << in.get_num_nested_auth_pins();
133 #ifdef MDS_AUTHPIN_SET
134 out << "(" << in.auth_pin_set << ")";
139 out << " snaprealm=" << in.snaprealm;
141 if (in.state_test(CInode::STATE_AMBIGUOUSAUTH)) out << " AMBIGAUTH";
142 if (in.state_test(CInode::STATE_NEEDSRECOVER)) out << " needsrecover";
143 if (in.state_test(CInode::STATE_RECOVERING)) out << " recovering";
144 if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " dirtyparent";
145 if (in.state_test(CInode::STATE_MISSINGOBJS)) out << " missingobjs";
146 if (in.is_freezing_inode()) out << " FREEZING=" << in.auth_pin_freeze_allowance;
147 if (in.is_frozen_inode()) out << " FROZEN";
148 if (in.is_frozen_auth_pin()) out << " FROZEN_AUTHPIN";
150 const inode_t *pi = in.get_projected_inode();
151 if (pi->is_truncating())
152 out << " truncating(" << pi->truncate_from << " to " << pi->truncate_size << ")";
154 if (in.inode.is_dir()) {
155 out << " " << in.inode.dirstat;
156 if (g_conf->mds_debug_scatterstat && in.is_projected()) {
157 const inode_t *pi = in.get_projected_inode();
158 out << "->" << pi->dirstat;
161 out << " s=" << in.inode.size;
162 if (in.inode.nlink != 1)
163 out << " nl=" << in.inode.nlink;
167 out << " " << in.inode.rstat;
168 if (!(in.inode.rstat == in.inode.accounted_rstat))
169 out << "/" << in.inode.accounted_rstat;
170 if (g_conf->mds_debug_scatterstat && in.is_projected()) {
171 const inode_t *pi = in.get_projected_inode();
172 out << "->" << pi->rstat;
173 if (!(pi->rstat == pi->accounted_rstat))
174 out << "/" << pi->accounted_rstat;
177 if (!in.client_need_snapflush.empty())
178 out << " need_snapflush=" << in.client_need_snapflush;
182 if (!in.authlock.is_sync_and_unlocked())
183 out << " " << in.authlock;
184 if (!in.linklock.is_sync_and_unlocked())
185 out << " " << in.linklock;
186 if (in.inode.is_dir()) {
187 if (!in.dirfragtreelock.is_sync_and_unlocked())
188 out << " " << in.dirfragtreelock;
189 if (!in.snaplock.is_sync_and_unlocked())
190 out << " " << in.snaplock;
191 if (!in.nestlock.is_sync_and_unlocked())
192 out << " " << in.nestlock;
193 if (!in.policylock.is_sync_and_unlocked())
194 out << " " << in.policylock;
196 if (!in.flocklock.is_sync_and_unlocked())
197 out << " " << in.flocklock;
199 if (!in.filelock.is_sync_and_unlocked())
200 out << " " << in.filelock;
201 if (!in.xattrlock.is_sync_and_unlocked())
202 out << " " << in.xattrlock;
203 if (!in.versionlock.is_sync_and_unlocked())
204 out << " " << in.versionlock;
206 // hack: spit out crap on which clients have caps
207 if (in.inode.client_ranges.size())
208 out << " cr=" << in.inode.client_ranges;
210 if (!in.get_client_caps().empty()) {
212 for (map<client_t,Capability*>::const_iterator it = in.get_client_caps().begin();
213 it != in.get_client_caps().end();
215 if (it != in.get_client_caps().begin()) out << ",";
216 out << it->first << "="
217 << ccap_string(it->second->pending());
218 if (it->second->issued() != it->second->pending())
219 out << "/" << ccap_string(it->second->issued());
220 out << "/" << ccap_string(it->second->wanted())
221 << "@" << it->second->get_last_sent();
224 if (in.get_loner() >= 0 || in.get_wanted_loner() >= 0) {
225 out << ",l=" << in.get_loner();
226 if (in.get_loner() != in.get_wanted_loner())
227 out << "(" << in.get_wanted_loner() << ")";
230 if (!in.get_mds_caps_wanted().empty()) {
232 for (compact_map<int,int>::const_iterator p = in.get_mds_caps_wanted().begin();
233 p != in.get_mds_caps_wanted().end();
235 if (p != in.get_mds_caps_wanted().begin())
237 out << p->first << '=' << ccap_string(p->second);
242 if (in.get_num_ref()) {
244 in.print_pin_set(out);
247 if (in.inode.export_pin != MDS_RANK_NONE) {
248 out << " export_pin=" << in.inode.export_pin;
256 ostream& operator<<(ostream& out, const CInode::scrub_stamp_info_t& si)
258 out << "{scrub_start_version: " << si.scrub_start_version
259 << ", scrub_start_stamp: " << si.scrub_start_stamp
260 << ", last_scrub_version: " << si.last_scrub_version
261 << ", last_scrub_stamp: " << si.last_scrub_stamp;
267 void CInode::print(ostream& out)
274 void CInode::add_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
276 dout(10) << "add_need_snapflush client." << client << " snapid " << snapid << " on " << snapin << dendl;
278 if (client_need_snapflush.empty()) {
279 get(CInode::PIN_NEEDSNAPFLUSH);
281 // FIXME: this is non-optimal, as we'll block freezes/migrations for potentially
282 // long periods waiting for clients to flush their snaps.
283 auth_pin(this); // pin head inode...
286 set<client_t>& clients = client_need_snapflush[snapid];
288 snapin->auth_pin(this); // ...and pin snapped/old inode!
290 clients.insert(client);
293 void CInode::remove_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
295 dout(10) << "remove_need_snapflush client." << client << " snapid " << snapid << " on " << snapin << dendl;
296 compact_map<snapid_t, std::set<client_t> >::iterator p = client_need_snapflush.find(snapid);
297 if (p == client_need_snapflush.end()) {
298 dout(10) << " snapid not found" << dendl;
301 if (!p->second.count(client)) {
302 dout(10) << " client not found" << dendl;
305 p->second.erase(client);
306 if (p->second.empty()) {
307 client_need_snapflush.erase(p);
308 snapin->auth_unpin(this);
310 if (client_need_snapflush.empty()) {
311 put(CInode::PIN_NEEDSNAPFLUSH);
317 bool CInode::split_need_snapflush(CInode *cowin, CInode *in)
319 dout(10) << "split_need_snapflush [" << cowin->first << "," << cowin->last << "] for " << *cowin << dendl;
320 bool need_flush = false;
321 for (compact_map<snapid_t, set<client_t> >::iterator p = client_need_snapflush.lower_bound(cowin->first);
322 p != client_need_snapflush.end() && p->first < in->first; ) {
323 compact_map<snapid_t, set<client_t> >::iterator q = p;
325 assert(!q->second.empty());
326 if (cowin->last >= q->first) {
327 cowin->auth_pin(this);
330 client_need_snapflush.erase(q);
331 in->auth_unpin(this);
336 void CInode::mark_dirty_rstat()
338 if (!state_test(STATE_DIRTYRSTAT)) {
339 dout(10) << "mark_dirty_rstat" << dendl;
340 state_set(STATE_DIRTYRSTAT);
342 CDentry *pdn = get_projected_parent_dn();
343 if (pdn->is_auth()) {
344 CDir *pdir = pdn->dir;
345 pdir->dirty_rstat_inodes.push_back(&dirty_rstat_item);
346 mdcache->mds->locker->mark_updated_scatterlock(&pdir->inode->nestlock);
348 // under cross-MDS rename.
349 // DIRTYRSTAT flag will get cleared when rename finishes
350 assert(state_test(STATE_AMBIGUOUSAUTH));
354 void CInode::clear_dirty_rstat()
356 if (state_test(STATE_DIRTYRSTAT)) {
357 dout(10) << "clear_dirty_rstat" << dendl;
358 state_clear(STATE_DIRTYRSTAT);
360 dirty_rstat_item.remove_myself();
364 inode_t *CInode::project_inode(map<string,bufferptr> *px)
366 if (projected_nodes.empty()) {
367 projected_nodes.push_back(new projected_inode_t(new inode_t(inode)));
371 projected_nodes.push_back(new projected_inode_t(
372 new inode_t(*projected_nodes.back()->inode)));
374 *px = *get_projected_xattrs();
377 projected_inode_t &pi = *projected_nodes.back();
381 ++num_projected_xattrs;
384 if (scrub_infop && scrub_infop->last_scrub_dirty) {
385 pi.inode->last_scrub_stamp = scrub_infop->last_scrub_stamp;
386 pi.inode->last_scrub_version = scrub_infop->last_scrub_version;
387 scrub_infop->last_scrub_dirty = false;
388 scrub_maybe_delete_info();
390 dout(15) << "project_inode " << pi.inode << dendl;
394 void CInode::pop_and_dirty_projected_inode(LogSegment *ls)
396 assert(!projected_nodes.empty());
397 dout(15) << "pop_and_dirty_projected_inode " << projected_nodes.front()->inode
398 << " v" << projected_nodes.front()->inode->version << dendl;
399 int64_t old_pool = inode.layout.pool_id;
401 mark_dirty(projected_nodes.front()->inode->version, ls);
402 inode = *projected_nodes.front()->inode;
404 if (inode.is_backtrace_updated())
405 _mark_dirty_parent(ls, old_pool != inode.layout.pool_id);
407 map<string,bufferptr> *px = projected_nodes.front()->xattrs;
409 --num_projected_xattrs;
414 if (projected_nodes.front()->snapnode) {
415 pop_projected_snaprealm(projected_nodes.front()->snapnode);
416 --num_projected_srnodes;
419 delete projected_nodes.front()->inode;
420 delete projected_nodes.front();
422 projected_nodes.pop_front();
425 sr_t *CInode::project_snaprealm(snapid_t snapid)
427 sr_t *cur_srnode = get_projected_srnode();
431 new_srnode = new sr_t(*cur_srnode);
433 new_srnode = new sr_t();
434 new_srnode->created = snapid;
435 new_srnode->current_parent_since = get_oldest_snap();
437 dout(10) << "project_snaprealm " << new_srnode << dendl;
438 projected_nodes.back()->snapnode = new_srnode;
439 ++num_projected_srnodes;
443 /* if newparent != parent, add parent to past_parents
444 if parent DNE, we need to find what the parent actually is and fill that in */
445 void CInode::project_past_snaprealm_parent(SnapRealm *newparent)
447 sr_t *new_snap = project_snaprealm();
448 SnapRealm *oldparent;
450 oldparent = find_snaprealm();
451 new_snap->seq = oldparent->get_newest_seq();
454 oldparent = snaprealm->parent;
456 if (newparent != oldparent) {
457 snapid_t oldparentseq = oldparent->get_newest_seq();
458 if (oldparentseq + 1 > new_snap->current_parent_since) {
459 new_snap->past_parents[oldparentseq].ino = oldparent->inode->ino();
460 new_snap->past_parents[oldparentseq].first = new_snap->current_parent_since;
462 new_snap->current_parent_since = MAX(oldparentseq, newparent->get_last_created()) + 1;
466 void CInode::pop_projected_snaprealm(sr_t *next_snaprealm)
468 assert(next_snaprealm);
469 dout(10) << "pop_projected_snaprealm " << next_snaprealm
470 << " seq" << next_snaprealm->seq << dendl;
471 bool invalidate_cached_snaps = false;
474 } else if (next_snaprealm->past_parents.size() !=
475 snaprealm->srnode.past_parents.size()) {
476 invalidate_cached_snaps = true;
477 // re-open past parents
478 snaprealm->_close_parents();
480 dout(10) << " realm " << *snaprealm << " past_parents " << snaprealm->srnode.past_parents
481 << " -> " << next_snaprealm->past_parents << dendl;
483 snaprealm->srnode = *next_snaprealm;
484 delete next_snaprealm;
486 // we should be able to open these up (or have them already be open).
487 bool ok = snaprealm->_open_parents(NULL);
490 if (invalidate_cached_snaps)
491 snaprealm->invalidate_cached_snaps();
493 if (snaprealm->parent)
494 dout(10) << " realm " << *snaprealm << " parent " << *snaprealm->parent << dendl;
498 // ====== CInode =======
502 __u32 InodeStoreBase::hash_dentry_name(const string &dn)
504 int which = inode.dir_layout.dl_dir_hash;
506 which = CEPH_STR_HASH_LINUX;
507 assert(ceph_str_hash_valid(which));
508 return ceph_str_hash(which, dn.data(), dn.length());
511 frag_t InodeStoreBase::pick_dirfrag(const string& dn)
513 if (dirfragtree.empty())
514 return frag_t(); // avoid the string hash if we can.
516 __u32 h = hash_dentry_name(dn);
517 return dirfragtree[h];
520 bool CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls)
524 dirfragtree.get_leaves_under(fg, fglist);
525 for (list<frag_t>::iterator p = fglist.begin(); p != fglist.end(); ++p)
526 if (dirfrags.count(*p))
527 ls.push_back(dirfrags[*p]);
535 tmpdft.force_to_leaf(g_ceph_context, fg);
536 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) {
537 tmpdft.force_to_leaf(g_ceph_context, p->first);
538 if (fg.contains(p->first) && !dirfragtree.is_leaf(p->first))
539 ls.push_back(p->second);
543 tmpdft.get_leaves_under(fg, fglist);
544 for (list<frag_t>::iterator p = fglist.begin(); p != fglist.end(); ++p)
545 if (!dirfrags.count(*p)) {
553 void CInode::verify_dirfrags()
556 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) {
557 if (!dirfragtree.is_leaf(p->first)) {
558 dout(0) << "have open dirfrag " << p->first << " but not leaf in " << dirfragtree
559 << ": " << *p->second << dendl;
566 void CInode::force_dirfrags()
569 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) {
570 if (!dirfragtree.is_leaf(p->first)) {
571 dout(0) << "have open dirfrag " << p->first << " but not leaf in " << dirfragtree
572 << ": " << *p->second << dendl;
579 dirfragtree.get_leaves(leaves);
580 for (list<frag_t>::iterator p = leaves.begin(); p != leaves.end(); ++p)
581 mdcache->get_force_dirfrag(dirfrag_t(ino(),*p), true);
587 CDir *CInode::get_approx_dirfrag(frag_t fg)
589 CDir *dir = get_dirfrag(fg);
594 get_dirfrags_under(fg, ls);
599 while (fg.bits() > 0) {
601 dir = get_dirfrag(fg);
607 void CInode::get_dirfrags(list<CDir*>& ls)
610 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
613 ls.push_back(p->second);
615 void CInode::get_nested_dirfrags(list<CDir*>& ls)
617 // dirfrags in same subtree
618 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
621 if (!p->second->is_subtree_root())
622 ls.push_back(p->second);
624 void CInode::get_subtree_dirfrags(list<CDir*>& ls)
626 // dirfrags that are roots of new subtrees
627 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
630 if (p->second->is_subtree_root())
631 ls.push_back(p->second);
635 CDir *CInode::get_or_open_dirfrag(MDCache *mdcache, frag_t fg)
640 CDir *dir = get_dirfrag(fg);
643 assert(is_auth() || mdcache->mds->is_any_replay());
644 dir = new CDir(this, fg, mdcache, is_auth());
650 CDir *CInode::add_dirfrag(CDir *dir)
652 assert(dirfrags.count(dir->dirfrag().frag) == 0);
653 dirfrags[dir->dirfrag().frag] = dir;
655 if (stickydir_ref > 0) {
656 dir->state_set(CDir::STATE_STICKY);
657 dir->get(CDir::PIN_STICKY);
665 void CInode::close_dirfrag(frag_t fg)
667 dout(14) << "close_dirfrag " << fg << dendl;
668 assert(dirfrags.count(fg));
670 CDir *dir = dirfrags[fg];
671 dir->remove_null_dentries();
677 if (stickydir_ref > 0) {
678 dir->state_clear(CDir::STATE_STICKY);
679 dir->put(CDir::PIN_STICKY);
682 // dump any remaining dentries, for debugging purposes
683 for (CDir::map_t::iterator p = dir->items.begin();
684 p != dir->items.end();
686 dout(14) << "close_dirfrag LEFTOVER dn " << *p->second << dendl;
688 assert(dir->get_num_ref() == 0);
693 void CInode::close_dirfrags()
695 while (!dirfrags.empty())
696 close_dirfrag(dirfrags.begin()->first);
699 bool CInode::has_subtree_root_dirfrag(int auth)
701 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
704 if (p->second->is_subtree_root() &&
705 (auth == -1 || p->second->dir_auth.first == auth))
710 bool CInode::has_subtree_or_exporting_dirfrag()
712 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
715 if (p->second->is_subtree_root() ||
716 p->second->state_test(CDir::STATE_EXPORTING))
721 void CInode::get_stickydirs()
723 if (stickydir_ref == 0) {
725 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
728 p->second->state_set(CDir::STATE_STICKY);
729 p->second->get(CDir::PIN_STICKY);
735 void CInode::put_stickydirs()
737 assert(stickydir_ref > 0);
739 if (stickydir_ref == 0) {
741 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
744 p->second->state_clear(CDir::STATE_STICKY);
745 p->second->put(CDir::PIN_STICKY);
756 void CInode::first_get()
760 parent->get(CDentry::PIN_INODEPIN);
763 void CInode::last_put()
767 parent->put(CDentry::PIN_INODEPIN);
772 if (get_num_ref() == (int)is_dirty() + (int)is_dirty_parent())
773 mdcache->maybe_eval_stray(this, true);
776 void CInode::add_remote_parent(CDentry *p)
778 if (remote_parents.empty())
779 get(PIN_REMOTEPARENT);
780 remote_parents.insert(p);
782 void CInode::remove_remote_parent(CDentry *p)
784 remote_parents.erase(p);
785 if (remote_parents.empty())
786 put(PIN_REMOTEPARENT);
792 CDir *CInode::get_parent_dir()
798 CDir *CInode::get_projected_parent_dir()
800 CDentry *p = get_projected_parent_dn();
805 CInode *CInode::get_parent_inode()
808 return parent->dir->inode;
812 bool CInode::is_projected_ancestor_of(CInode *other)
817 if (!other->get_projected_parent_dn())
819 other = other->get_projected_parent_dn()->get_dir()->get_inode();
825 * Because a non-directory inode may have multiple links, the use_parent
826 * argument allows selecting which parent to use for path construction. This
827 * argument is only meaningful for the final component (i.e. the first of the
828 * nested calls) because directories cannot have multiple hard links. If
829 * use_parent is NULL and projected is true, the primary parent's projected
830 * inode is used all the way up the path chain. Otherwise the primary parent
831 * stable inode is used.
833 void CInode::make_path_string(string& s, bool projected, const CDentry *use_parent) const
836 use_parent = projected ? get_projected_parent_dn() : parent;
840 use_parent->make_path_string(s, projected);
841 } else if (is_root()) {
843 } else if (is_mdsdir()) {
845 uint64_t eino(ino());
846 eino -= MDS_INO_MDSDIR_OFFSET;
847 snprintf(t, sizeof(t), "~mds%" PRId64, eino);
851 uint64_t eino(ino());
852 snprintf(n, sizeof(n), "#%" PRIx64, eino);
857 void CInode::make_path(filepath& fp, bool projected) const
859 const CDentry *use_parent = projected ? get_projected_parent_dn() : parent;
862 use_parent->make_path(fp, projected);
864 fp = filepath(ino());
868 void CInode::name_stray_dentry(string& dname)
871 snprintf(s, sizeof(s), "%llx", (unsigned long long)inode.ino.val);
875 version_t CInode::pre_dirty()
878 CDentry* _cdentry = get_projected_parent_dn();
880 pv = _cdentry->pre_dirty(get_projected_version());
881 dout(10) << "pre_dirty " << pv << " (current v " << inode.version << ")" << dendl;
884 pv = get_projected_version() + 1;
886 // force update backtrace for old format inode (see inode_t::decode)
887 if (inode.backtrace_version == 0 && !projected_nodes.empty()) {
888 inode_t *pi = projected_nodes.back()->inode;
889 if (pi->backtrace_version == 0)
890 pi->update_backtrace(pv);
895 void CInode::_mark_dirty(LogSegment *ls)
897 if (!state_test(STATE_DIRTY)) {
898 state_set(STATE_DIRTY);
903 // move myself to this segment's dirty list
905 ls->dirty_inodes.push_back(&item_dirty);
908 void CInode::mark_dirty(version_t pv, LogSegment *ls) {
910 dout(10) << "mark_dirty " << *this << dendl;
913 NOTE: I may already be dirty, but this fn _still_ needs to be called so that
914 the directory is (perhaps newly) dirtied, and so that parent_dir_version is
918 // only auth can get dirty. "dirty" async data in replicas is relative to
919 // filelock state, not the dirty flag.
922 // touch my private version
923 assert(inode.version < pv);
929 parent->mark_dirty(pv, ls);
933 void CInode::mark_clean()
935 dout(10) << " mark_clean " << *this << dendl;
936 if (state_test(STATE_DIRTY)) {
937 state_clear(STATE_DIRTY);
940 // remove myself from ls dirty list
941 item_dirty.remove_myself();
948 // (currently for root inode only)
950 struct C_IO_Inode_Stored : public CInodeIOContext {
953 C_IO_Inode_Stored(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
954 void finish(int r) override {
955 in->_stored(r, version, fin);
959 object_t InodeStoreBase::get_object_name(inodeno_t ino, frag_t fg, const char *suffix)
962 snprintf(n, sizeof(n), "%llx.%08llx%s", (long long unsigned)ino, (long long unsigned)fg, suffix ? suffix : "");
966 void CInode::store(MDSInternalContextBase *fin)
968 dout(10) << "store " << get_version() << dendl;
972 purge_stale_snap_data(snaprealm->get_snaps());
976 string magic = CEPH_FS_ONDISK_MAGIC;
978 encode_store(bl, mdcache->mds->mdsmap->get_up_features());
985 object_t oid = CInode::get_object_name(ino(), frag_t(), ".inode");
986 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
989 new C_OnFinisher(new C_IO_Inode_Stored(this, get_version(), fin),
990 mdcache->mds->finisher);
991 mdcache->mds->objecter->mutate(oid, oloc, m, snapc,
992 ceph::real_clock::now(), 0,
996 void CInode::_stored(int r, version_t v, Context *fin)
999 dout(1) << "store error " << r << " v " << v << " on " << *this << dendl;
1000 mdcache->mds->clog->error() << "failed to store inode " << ino()
1001 << " object: " << cpp_strerror(r);
1002 mdcache->mds->handle_write_error(r);
1007 dout(10) << "_stored " << v << " on " << *this << dendl;
1008 if (v == get_projected_version())
1014 void CInode::flush(MDSInternalContextBase *fin)
1016 dout(10) << "flush " << *this << dendl;
1017 assert(is_auth() && can_auth_pin());
1019 MDSGatherBuilder gather(g_ceph_context);
1021 if (is_dirty_parent()) {
1022 store_backtrace(gather.new_sub());
1026 store(gather.new_sub());
1028 parent->dir->commit(0, gather.new_sub());
1032 if (gather.has_subs()) {
1033 gather.set_finisher(fin);
1040 struct C_IO_Inode_Fetched : public CInodeIOContext {
1043 C_IO_Inode_Fetched(CInode *i, Context *f) : CInodeIOContext(i), fin(f) {}
1044 void finish(int r) override {
1045 // Ignore 'r', because we fetch from two places, so r is usually ENOENT
1046 in->_fetched(bl, bl2, fin);
1050 void CInode::fetch(MDSInternalContextBase *fin)
1052 dout(10) << "fetch" << dendl;
1054 C_IO_Inode_Fetched *c = new C_IO_Inode_Fetched(this, fin);
1055 C_GatherBuilder gather(g_ceph_context, new C_OnFinisher(c, mdcache->mds->finisher));
1057 object_t oid = CInode::get_object_name(ino(), frag_t(), "");
1058 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
1060 // Old on-disk format: inode stored in xattr of a dirfrag
1062 rd.getxattr("inode", &c->bl, NULL);
1063 mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, (bufferlist*)NULL, 0, gather.new_sub());
1065 // Current on-disk format: inode stored in a .inode object
1066 object_t oid2 = CInode::get_object_name(ino(), frag_t(), ".inode");
1067 mdcache->mds->objecter->read(oid2, oloc, 0, 0, CEPH_NOSNAP, &c->bl2, 0, gather.new_sub());
1072 void CInode::_fetched(bufferlist& bl, bufferlist& bl2, Context *fin)
1074 dout(10) << "_fetched got " << bl.length() << " and " << bl2.length() << dendl;
1075 bufferlist::iterator p;
1078 } else if (bl.length()) {
1081 derr << "No data while reading inode " << ino() << dendl;
1082 fin->complete(-ENOENT);
1090 dout(10) << " magic is '" << magic << "' (expecting '"
1091 << CEPH_FS_ONDISK_MAGIC << "')" << dendl;
1092 if (magic != CEPH_FS_ONDISK_MAGIC) {
1093 dout(0) << "on disk magic '" << magic << "' != my magic '" << CEPH_FS_ONDISK_MAGIC
1095 fin->complete(-EINVAL);
1098 dout(10) << "_fetched " << *this << dendl;
1101 } catch (buffer::error &err) {
1102 derr << "Corrupt inode " << ino() << ": " << err << dendl;
1103 fin->complete(-EINVAL);
1108 void CInode::build_backtrace(int64_t pool, inode_backtrace_t& bt)
1111 bt.ancestors.clear();
1115 CDentry *pdn = get_parent_dn();
1117 CInode *diri = pdn->get_dir()->get_inode();
1118 bt.ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->name, in->inode.version));
1120 pdn = in->get_parent_dn();
1122 for (compact_set<int64_t>::iterator i = inode.old_pools.begin();
1123 i != inode.old_pools.end();
1125 // don't add our own pool id to old_pools to avoid looping (e.g. setlayout 0, 1, 0)
1127 bt.old_pools.insert(*i);
1131 struct C_IO_Inode_StoredBacktrace : public CInodeIOContext {
1134 C_IO_Inode_StoredBacktrace(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
1135 void finish(int r) override {
1136 in->_stored_backtrace(r, version, fin);
1140 void CInode::store_backtrace(MDSInternalContextBase *fin, int op_prio)
1142 dout(10) << "store_backtrace on " << *this << dendl;
1143 assert(is_dirty_parent());
1146 op_prio = CEPH_MSG_PRIO_DEFAULT;
1150 const int64_t pool = get_backtrace_pool();
1151 inode_backtrace_t bt;
1152 build_backtrace(pool, bt);
1153 bufferlist parent_bl;
1154 ::encode(bt, parent_bl);
1157 op.priority = op_prio;
1159 op.setxattr("parent", parent_bl);
1161 bufferlist layout_bl;
1162 ::encode(inode.layout, layout_bl, mdcache->mds->mdsmap->get_up_features());
1163 op.setxattr("layout", layout_bl);
1166 object_t oid = get_object_name(ino(), frag_t(), "");
1167 object_locator_t oloc(pool);
1168 Context *fin2 = new C_OnFinisher(
1169 new C_IO_Inode_StoredBacktrace(this, inode.backtrace_version, fin),
1170 mdcache->mds->finisher);
1172 if (!state_test(STATE_DIRTYPOOL) || inode.old_pools.empty()) {
1173 dout(20) << __func__ << ": no dirtypool or no old pools" << dendl;
1174 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1175 ceph::real_clock::now(),
1180 C_GatherBuilder gather(g_ceph_context, fin2);
1181 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1182 ceph::real_clock::now(),
1183 0, gather.new_sub());
1185 // In the case where DIRTYPOOL is set, we update all old pools backtraces
1186 // such that anyone reading them will see the new pool ID in
1187 // inode_backtrace_t::pool and go read everything else from there.
1188 for (compact_set<int64_t>::iterator p = inode.old_pools.begin();
1189 p != inode.old_pools.end();
1194 dout(20) << __func__ << ": updating old pool " << *p << dendl;
1197 op.priority = op_prio;
1199 op.setxattr("parent", parent_bl);
1201 object_locator_t oloc(*p);
1202 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1203 ceph::real_clock::now(),
1204 0, gather.new_sub());
1209 void CInode::_stored_backtrace(int r, version_t v, Context *fin)
1212 const int64_t pool = get_backtrace_pool();
1213 bool exists = mdcache->mds->objecter->with_osdmap(
1214 [pool](const OSDMap &osd_map) {
1215 return osd_map.have_pg_pool(pool);
1218 // This ENOENT is because the pool doesn't exist (the user deleted it
1219 // out from under us), so the backtrace can never be written, so pretend
1220 // to succeed so that the user can proceed to e.g. delete the file.
1222 dout(4) << "store_backtrace got ENOENT: a data pool was deleted "
1223 "beneath us!" << dendl;
1229 dout(1) << "store backtrace error " << r << " v " << v << dendl;
1230 mdcache->mds->clog->error() << "failed to store backtrace on ino "
1231 << ino() << " object"
1232 << ", pool " << get_backtrace_pool()
1234 mdcache->mds->handle_write_error(r);
1240 dout(10) << "_stored_backtrace v " << v << dendl;
1243 if (v == inode.backtrace_version)
1244 clear_dirty_parent();
1249 void CInode::fetch_backtrace(Context *fin, bufferlist *backtrace)
1251 mdcache->fetch_backtrace(inode.ino, get_backtrace_pool(), *backtrace, fin);
1254 void CInode::_mark_dirty_parent(LogSegment *ls, bool dirty_pool)
1256 if (!state_test(STATE_DIRTYPARENT)) {
1257 dout(10) << "mark_dirty_parent" << dendl;
1258 state_set(STATE_DIRTYPARENT);
1259 get(PIN_DIRTYPARENT);
1263 state_set(STATE_DIRTYPOOL);
1265 ls->dirty_parent_inodes.push_back(&item_dirty_parent);
1268 void CInode::clear_dirty_parent()
1270 if (state_test(STATE_DIRTYPARENT)) {
1271 dout(10) << "clear_dirty_parent" << dendl;
1272 state_clear(STATE_DIRTYPARENT);
1273 state_clear(STATE_DIRTYPOOL);
1274 put(PIN_DIRTYPARENT);
1275 item_dirty_parent.remove_myself();
1279 void CInode::verify_diri_backtrace(bufferlist &bl, int err)
1281 if (is_base() || is_dirty_parent() || !is_auth())
1284 dout(10) << "verify_diri_backtrace" << dendl;
1287 inode_backtrace_t backtrace;
1288 ::decode(backtrace, bl);
1289 CDentry *pdn = get_parent_dn();
1290 if (backtrace.ancestors.empty() ||
1291 backtrace.ancestors[0].dname != pdn->name ||
1292 backtrace.ancestors[0].dirino != pdn->get_dir()->ino())
1297 MDSRank *mds = mdcache->mds;
1298 mds->clog->error() << "bad backtrace on directory inode " << ino();
1299 assert(!"bad backtrace" == (g_conf->mds_verify_backtrace > 1));
1301 _mark_dirty_parent(mds->mdlog->get_current_segment(), false);
1302 mds->mdlog->flush();
1306 // ------------------
1310 void InodeStoreBase::encode_bare(bufferlist &bl, uint64_t features,
1311 const bufferlist *snap_blob) const
1313 ::encode(inode, bl, features);
1315 ::encode(symlink, bl);
1316 ::encode(dirfragtree, bl);
1317 ::encode(xattrs, bl);
1319 ::encode(*snap_blob, bl);
1321 ::encode(bufferlist(), bl);
1322 ::encode(old_inodes, bl, features);
1323 ::encode(oldest_snap, bl);
1324 ::encode(damage_flags, bl);
1327 void InodeStoreBase::encode(bufferlist &bl, uint64_t features,
1328 const bufferlist *snap_blob) const
1330 ENCODE_START(6, 4, bl);
1331 encode_bare(bl, features, snap_blob);
1335 void CInode::encode_store(bufferlist& bl, uint64_t features)
1337 bufferlist snap_blob;
1338 encode_snap_blob(snap_blob);
1339 InodeStoreBase::encode(bl, mdcache->mds->mdsmap->get_up_features(),
1343 void InodeStoreBase::decode_bare(bufferlist::iterator &bl,
1344 bufferlist& snap_blob, __u8 struct_v)
1346 ::decode(inode, bl);
1348 ::decode(symlink, bl);
1349 ::decode(dirfragtree, bl);
1350 ::decode(xattrs, bl);
1351 ::decode(snap_blob, bl);
1353 ::decode(old_inodes, bl);
1354 if (struct_v == 2 && inode.is_dir()) {
1355 bool default_layout_exists;
1356 ::decode(default_layout_exists, bl);
1357 if (default_layout_exists) {
1358 ::decode(struct_v, bl); // this was a default_file_layout
1359 ::decode(inode.layout, bl); // but we only care about the layout portion
1363 if (struct_v >= 5) {
1364 // InodeStore is embedded in dentries without proper versioning, so
1365 // we consume up to the end of the buffer
1367 ::decode(oldest_snap, bl);
1371 ::decode(damage_flags, bl);
1377 void InodeStoreBase::decode(bufferlist::iterator &bl, bufferlist& snap_blob)
1379 DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl);
1380 decode_bare(bl, snap_blob, struct_v);
1384 void CInode::decode_store(bufferlist::iterator& bl)
1386 bufferlist snap_blob;
1387 InodeStoreBase::decode(bl, snap_blob);
1388 decode_snap_blob(snap_blob);
1391 // ------------------
1394 void CInode::set_object_info(MDSCacheObjectInfo &info)
1400 void CInode::encode_lock_state(int type, bufferlist& bl)
1402 ::encode(first, bl);
1405 case CEPH_LOCK_IAUTH:
1406 ::encode(inode.version, bl);
1407 ::encode(inode.ctime, bl);
1408 ::encode(inode.mode, bl);
1409 ::encode(inode.uid, bl);
1410 ::encode(inode.gid, bl);
1413 case CEPH_LOCK_ILINK:
1414 ::encode(inode.version, bl);
1415 ::encode(inode.ctime, bl);
1416 ::encode(inode.nlink, bl);
1419 case CEPH_LOCK_IDFT:
1421 ::encode(inode.version, bl);
1423 // treat flushing as dirty when rejoining cache
1424 bool dirty = dirfragtreelock.is_dirty_or_flushing();
1425 ::encode(dirty, bl);
1428 // encode the raw tree
1429 ::encode(dirfragtree, bl);
1431 // also specify which frags are mine
1432 set<frag_t> myfrags;
1435 for (list<CDir*>::iterator p = dfls.begin(); p != dfls.end(); ++p)
1436 if ((*p)->is_auth()) {
1437 frag_t fg = (*p)->get_frag();
1440 ::encode(myfrags, bl);
1444 case CEPH_LOCK_IFILE:
1446 ::encode(inode.version, bl);
1447 ::encode(inode.ctime, bl);
1448 ::encode(inode.mtime, bl);
1449 ::encode(inode.atime, bl);
1450 ::encode(inode.time_warp_seq, bl);
1452 ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1453 ::encode(inode.size, bl);
1454 ::encode(inode.truncate_seq, bl);
1455 ::encode(inode.truncate_size, bl);
1456 ::encode(inode.client_ranges, bl);
1457 ::encode(inode.inline_data, bl);
1460 // treat flushing as dirty when rejoining cache
1461 bool dirty = filelock.is_dirty_or_flushing();
1462 ::encode(dirty, bl);
1466 dout(15) << "encode_lock_state inode.dirstat is " << inode.dirstat << dendl;
1467 ::encode(inode.dirstat, bl); // only meaningful if i am auth.
1470 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1471 p != dirfrags.end();
1473 frag_t fg = p->first;
1474 CDir *dir = p->second;
1475 if (is_auth() || dir->is_auth()) {
1476 fnode_t *pf = dir->get_projected_fnode();
1477 dout(15) << fg << " " << *dir << dendl;
1478 dout(20) << fg << " fragstat " << pf->fragstat << dendl;
1479 dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
1481 ::encode(dir->first, tmp);
1482 ::encode(pf->fragstat, tmp);
1483 ::encode(pf->accounted_fragstat, tmp);
1488 bl.claim_append(tmp);
1492 case CEPH_LOCK_INEST:
1494 ::encode(inode.version, bl);
1496 // treat flushing as dirty when rejoining cache
1497 bool dirty = nestlock.is_dirty_or_flushing();
1498 ::encode(dirty, bl);
1501 dout(15) << "encode_lock_state inode.rstat is " << inode.rstat << dendl;
1502 ::encode(inode.rstat, bl); // only meaningful if i am auth.
1505 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1506 p != dirfrags.end();
1508 frag_t fg = p->first;
1509 CDir *dir = p->second;
1510 if (is_auth() || dir->is_auth()) {
1511 fnode_t *pf = dir->get_projected_fnode();
1512 dout(10) << fg << " " << *dir << dendl;
1513 dout(10) << fg << " " << pf->rstat << dendl;
1514 dout(10) << fg << " " << pf->rstat << dendl;
1515 dout(10) << fg << " " << dir->dirty_old_rstat << dendl;
1517 ::encode(dir->first, tmp);
1518 ::encode(pf->rstat, tmp);
1519 ::encode(pf->accounted_rstat, tmp);
1520 ::encode(dir->dirty_old_rstat, tmp);
1525 bl.claim_append(tmp);
1529 case CEPH_LOCK_IXATTR:
1530 ::encode(inode.version, bl);
1531 ::encode(inode.ctime, bl);
1532 ::encode(xattrs, bl);
1535 case CEPH_LOCK_ISNAP:
1536 ::encode(inode.version, bl);
1537 ::encode(inode.ctime, bl);
1541 case CEPH_LOCK_IFLOCK:
1542 ::encode(inode.version, bl);
1543 _encode_file_locks(bl);
1546 case CEPH_LOCK_IPOLICY:
1547 if (inode.is_dir()) {
1548 ::encode(inode.version, bl);
1549 ::encode(inode.ctime, bl);
1550 ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1551 ::encode(inode.quota, bl);
1552 ::encode(inode.export_pin, bl);
1562 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1564 void CInode::decode_lock_state(int type, bufferlist& bl)
1566 bufferlist::iterator p = bl.begin();
1570 ::decode(newfirst, p);
1572 if (!is_auth() && newfirst != first) {
1573 dout(10) << "decode_lock_state first " << first << " -> " << newfirst << dendl;
1574 assert(newfirst > first);
1575 if (!is_multiversion() && parent) {
1576 assert(parent->first == first);
1577 parent->first = newfirst;
1583 case CEPH_LOCK_IAUTH:
1584 ::decode(inode.version, p);
1586 if (inode.ctime < tm) inode.ctime = tm;
1587 ::decode(inode.mode, p);
1588 ::decode(inode.uid, p);
1589 ::decode(inode.gid, p);
1592 case CEPH_LOCK_ILINK:
1593 ::decode(inode.version, p);
1595 if (inode.ctime < tm) inode.ctime = tm;
1596 ::decode(inode.nlink, p);
1599 case CEPH_LOCK_IDFT:
1602 ::decode(replica_dirty, p);
1603 if (replica_dirty) {
1604 dout(10) << "decode_lock_state setting dftlock dirty flag" << dendl;
1605 dirfragtreelock.mark_dirty(); // ok bc we're auth and caller will handle
1608 ::decode(inode.version, p);
1613 set<frag_t> authfrags;
1614 ::decode(authfrags, p);
1616 // auth. believe replica's auth frags only.
1617 for (set<frag_t>::iterator p = authfrags.begin(); p != authfrags.end(); ++p)
1618 if (!dirfragtree.is_leaf(*p)) {
1619 dout(10) << " forcing frag " << *p << " to leaf (split|merge)" << dendl;
1620 dirfragtree.force_to_leaf(g_ceph_context, *p);
1621 dirfragtreelock.mark_dirty(); // ok bc we're auth and caller will handle
1624 // replica. take the new tree, BUT make sure any open
1625 // dirfrags remain leaves (they may have split _after_ this
1626 // dft was scattered, or we may still be be waiting on the
1627 // notify from the auth)
1628 dirfragtree.swap(temp);
1629 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1630 p != dirfrags.end();
1632 if (!dirfragtree.is_leaf(p->first)) {
1633 dout(10) << " forcing open dirfrag " << p->first << " to leaf (racing with split|merge)" << dendl;
1634 dirfragtree.force_to_leaf(g_ceph_context, p->first);
1636 if (p->second->is_auth())
1637 p->second->state_clear(CDir::STATE_DIRTYDFT);
1640 if (g_conf->mds_debug_frag)
1645 case CEPH_LOCK_IFILE:
1647 ::decode(inode.version, p);
1649 if (inode.ctime < tm) inode.ctime = tm;
1650 ::decode(inode.mtime, p);
1651 ::decode(inode.atime, p);
1652 ::decode(inode.time_warp_seq, p);
1654 ::decode(inode.layout, p);
1655 ::decode(inode.size, p);
1656 ::decode(inode.truncate_seq, p);
1657 ::decode(inode.truncate_size, p);
1658 ::decode(inode.client_ranges, p);
1659 ::decode(inode.inline_data, p);
1663 ::decode(replica_dirty, p);
1664 if (replica_dirty) {
1665 dout(10) << "decode_lock_state setting filelock dirty flag" << dendl;
1666 filelock.mark_dirty(); // ok bc we're auth and caller will handle
1670 frag_info_t dirstat;
1671 ::decode(dirstat, p);
1673 dout(10) << " taking inode dirstat " << dirstat << " for " << *this << dendl;
1674 inode.dirstat = dirstat; // take inode summation if replica
1678 dout(10) << " ...got " << n << " fragstats on " << *this << dendl;
1682 frag_info_t fragstat;
1683 frag_info_t accounted_fragstat;
1685 ::decode(fgfirst, p);
1686 ::decode(fragstat, p);
1687 ::decode(accounted_fragstat, p);
1688 dout(10) << fg << " [" << fgfirst << ",head] " << dendl;
1689 dout(10) << fg << " fragstat " << fragstat << dendl;
1690 dout(20) << fg << " accounted_fragstat " << accounted_fragstat << dendl;
1692 CDir *dir = get_dirfrag(fg);
1694 assert(dir); // i am auth; i had better have this dir open
1695 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1696 << " on " << *dir << dendl;
1697 dir->first = fgfirst;
1698 dir->fnode.fragstat = fragstat;
1699 dir->fnode.accounted_fragstat = accounted_fragstat;
1700 dir->first = fgfirst;
1701 if (!(fragstat == accounted_fragstat)) {
1702 dout(10) << fg << " setting filelock updated flag" << dendl;
1703 filelock.mark_dirty(); // ok bc we're auth and caller will handle
1706 if (dir && dir->is_auth()) {
1707 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1708 << " on " << *dir << dendl;
1709 dir->first = fgfirst;
1710 fnode_t *pf = dir->get_projected_fnode();
1711 finish_scatter_update(&filelock, dir,
1712 inode.dirstat.version, pf->accounted_fragstat.version);
1719 case CEPH_LOCK_INEST:
1722 ::decode(replica_dirty, p);
1723 if (replica_dirty) {
1724 dout(10) << "decode_lock_state setting nestlock dirty flag" << dendl;
1725 nestlock.mark_dirty(); // ok bc we're auth and caller will handle
1728 ::decode(inode.version, p);
1734 dout(10) << " taking inode rstat " << rstat << " for " << *this << dendl;
1735 inode.rstat = rstat; // take inode summation if replica
1743 nest_info_t accounted_rstat;
1744 compact_map<snapid_t,old_rstat_t> dirty_old_rstat;
1746 ::decode(fgfirst, p);
1748 ::decode(accounted_rstat, p);
1749 ::decode(dirty_old_rstat, p);
1750 dout(10) << fg << " [" << fgfirst << ",head]" << dendl;
1751 dout(10) << fg << " rstat " << rstat << dendl;
1752 dout(10) << fg << " accounted_rstat " << accounted_rstat << dendl;
1753 dout(10) << fg << " dirty_old_rstat " << dirty_old_rstat << dendl;
1755 CDir *dir = get_dirfrag(fg);
1757 assert(dir); // i am auth; i had better have this dir open
1758 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1759 << " on " << *dir << dendl;
1760 dir->first = fgfirst;
1761 dir->fnode.rstat = rstat;
1762 dir->fnode.accounted_rstat = accounted_rstat;
1763 dir->dirty_old_rstat.swap(dirty_old_rstat);
1764 if (!(rstat == accounted_rstat) || !dir->dirty_old_rstat.empty()) {
1765 dout(10) << fg << " setting nestlock updated flag" << dendl;
1766 nestlock.mark_dirty(); // ok bc we're auth and caller will handle
1769 if (dir && dir->is_auth()) {
1770 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1771 << " on " << *dir << dendl;
1772 dir->first = fgfirst;
1773 fnode_t *pf = dir->get_projected_fnode();
1774 finish_scatter_update(&nestlock, dir,
1775 inode.rstat.version, pf->accounted_rstat.version);
1782 case CEPH_LOCK_IXATTR:
1783 ::decode(inode.version, p);
1785 if (inode.ctime < tm) inode.ctime = tm;
1786 ::decode(xattrs, p);
1789 case CEPH_LOCK_ISNAP:
1791 ::decode(inode.version, p);
1793 if (inode.ctime < tm) inode.ctime = tm;
1796 seq = snaprealm->srnode.seq;
1798 if (snaprealm && snaprealm->srnode.seq != seq)
1799 mdcache->do_realm_invalidate_and_update_notify(this, seq ? CEPH_SNAP_OP_UPDATE:CEPH_SNAP_OP_SPLIT);
1803 case CEPH_LOCK_IFLOCK:
1804 ::decode(inode.version, p);
1805 _decode_file_locks(p);
1808 case CEPH_LOCK_IPOLICY:
1809 if (inode.is_dir()) {
1810 ::decode(inode.version, p);
1812 if (inode.ctime < tm) inode.ctime = tm;
1813 ::decode(inode.layout, p);
1814 ::decode(inode.quota, p);
1815 mds_rank_t old_pin = inode.export_pin;
1816 ::decode(inode.export_pin, p);
1817 maybe_export_pin(old_pin != inode.export_pin);
1827 bool CInode::is_dirty_scattered()
1830 filelock.is_dirty_or_flushing() ||
1831 nestlock.is_dirty_or_flushing() ||
1832 dirfragtreelock.is_dirty_or_flushing();
1835 void CInode::clear_scatter_dirty()
1837 filelock.remove_dirty();
1838 nestlock.remove_dirty();
1839 dirfragtreelock.remove_dirty();
1842 void CInode::clear_dirty_scattered(int type)
1844 dout(10) << "clear_dirty_scattered " << type << " on " << *this << dendl;
1846 case CEPH_LOCK_IFILE:
1847 item_dirty_dirfrag_dir.remove_myself();
1850 case CEPH_LOCK_INEST:
1851 item_dirty_dirfrag_nest.remove_myself();
1854 case CEPH_LOCK_IDFT:
1855 item_dirty_dirfrag_dirfragtree.remove_myself();
1865 * when we initially scatter a lock, we need to check if any of the dirfrags
1866 * have out of date accounted_rstat/fragstat. if so, mark the lock stale.
1868 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1869 void CInode::start_scatter(ScatterLock *lock)
1871 dout(10) << "start_scatter " << *lock << " on " << *this << dendl;
1873 inode_t *pi = get_projected_inode();
1875 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1876 p != dirfrags.end();
1878 frag_t fg = p->first;
1879 CDir *dir = p->second;
1880 fnode_t *pf = dir->get_projected_fnode();
1881 dout(20) << fg << " " << *dir << dendl;
1883 if (!dir->is_auth())
1886 switch (lock->get_type()) {
1887 case CEPH_LOCK_IFILE:
1888 finish_scatter_update(lock, dir, pi->dirstat.version, pf->accounted_fragstat.version);
1891 case CEPH_LOCK_INEST:
1892 finish_scatter_update(lock, dir, pi->rstat.version, pf->accounted_rstat.version);
1895 case CEPH_LOCK_IDFT:
1896 dir->state_clear(CDir::STATE_DIRTYDFT);
1903 class C_Inode_FragUpdate : public MDSLogContextBase {
1908 MDSRank *get_mds() override {return in->mdcache->mds;}
1909 void finish(int r) override {
1910 in->_finish_frag_update(dir, mut);
1914 C_Inode_FragUpdate(CInode *i, CDir *d, MutationRef& m) : in(i), dir(d), mut(m) {}
1917 void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
1918 version_t inode_version, version_t dir_accounted_version)
1920 frag_t fg = dir->get_frag();
1921 assert(dir->is_auth());
1923 if (dir->is_frozen()) {
1924 dout(10) << "finish_scatter_update " << fg << " frozen, marking " << *lock << " stale " << *dir << dendl;
1925 } else if (dir->get_version() == 0) {
1926 dout(10) << "finish_scatter_update " << fg << " not loaded, marking " << *lock << " stale " << *dir << dendl;
1928 if (dir_accounted_version != inode_version) {
1929 dout(10) << "finish_scatter_update " << fg << " journaling accounted scatterstat update v" << inode_version << dendl;
1931 MDLog *mdlog = mdcache->mds->mdlog;
1932 MutationRef mut(new MutationImpl());
1933 mut->ls = mdlog->get_current_segment();
1935 inode_t *pi = get_projected_inode();
1936 fnode_t *pf = dir->project_fnode();
1938 const char *ename = 0;
1939 switch (lock->get_type()) {
1940 case CEPH_LOCK_IFILE:
1941 pf->fragstat.version = pi->dirstat.version;
1942 pf->accounted_fragstat = pf->fragstat;
1943 ename = "lock ifile accounted scatter stat update";
1945 case CEPH_LOCK_INEST:
1946 pf->rstat.version = pi->rstat.version;
1947 pf->accounted_rstat = pf->rstat;
1948 ename = "lock inest accounted scatter stat update";
1950 if (!is_auth() && lock->get_state() == LOCK_MIX) {
1951 dout(10) << "finish_scatter_update try to assimilate dirty rstat on "
1953 dir->assimilate_dirty_rstat_inodes();
1961 pf->version = dir->pre_dirty();
1962 mut->add_projected_fnode(dir);
1964 EUpdate *le = new EUpdate(mdlog, ename);
1965 mdlog->start_entry(le);
1966 le->metablob.add_dir_context(dir);
1967 le->metablob.add_dir(dir, true);
1969 assert(!dir->is_frozen());
1972 if (lock->get_type() == CEPH_LOCK_INEST &&
1973 !is_auth() && lock->get_state() == LOCK_MIX) {
1974 dout(10) << "finish_scatter_update finish assimilating dirty rstat on "
1976 dir->assimilate_dirty_rstat_inodes_finish(mut, &le->metablob);
1978 if (!(pf->rstat == pf->accounted_rstat)) {
1979 if (mut->wrlocks.count(&nestlock) == 0) {
1980 mdcache->mds->locker->wrlock_force(&nestlock, mut);
1983 mdcache->mds->locker->mark_updated_scatterlock(&nestlock);
1984 mut->ls->dirty_dirfrag_nest.push_back(&item_dirty_dirfrag_nest);
1988 mdlog->submit_entry(le, new C_Inode_FragUpdate(this, dir, mut));
1990 dout(10) << "finish_scatter_update " << fg << " accounted " << *lock
1991 << " scatter stat unchanged at v" << dir_accounted_version << dendl;
1996 void CInode::_finish_frag_update(CDir *dir, MutationRef& mut)
1998 dout(10) << "_finish_frag_update on " << *dir << dendl;
2000 mdcache->mds->locker->drop_locks(mut.get());
2006 * when we gather a lock, we need to assimilate dirfrag changes into the inode
2007 * state. it's possible we can't update the dirfrag accounted_rstat/fragstat
2008 * because the frag is auth and frozen, or that the replica couldn't for the same
2009 * reason. hopefully it will get updated the next time the lock cycles.
2011 * we have two dimensions of behavior:
2012 * - we may be (auth and !frozen), and able to update, or not.
2013 * - the frag may be stale, or not.
2015 * if the frag is non-stale, we want to assimilate the diff into the
2016 * inode, regardless of whether it's auth or updateable.
2018 * if we update the frag, we want to set accounted_fragstat = frag,
2019 * both if we took the diff or it was stale and we are making it
2022 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
2023 void CInode::finish_scatter_gather_update(int type)
2025 LogChannelRef clog = mdcache->mds->clog;
2027 dout(10) << "finish_scatter_gather_update " << type << " on " << *this << dendl;
2031 case CEPH_LOCK_IFILE:
2033 fragtree_t tmpdft = dirfragtree;
2034 struct frag_info_t dirstat;
2035 bool dirstat_valid = true;
2039 inode_t *pi = get_projected_inode();
2041 bool touched_mtime = false, touched_chattr = false;
2042 dout(20) << " orig dirstat " << pi->dirstat << dendl;
2043 pi->dirstat.version++;
2044 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2045 p != dirfrags.end();
2047 frag_t fg = p->first;
2048 CDir *dir = p->second;
2049 dout(20) << fg << " " << *dir << dendl;
2052 if (dir->get_version() != 0) {
2053 update = dir->is_auth() && !dir->is_frozen();
2056 dirstat_valid = false;
2059 fnode_t *pf = dir->get_projected_fnode();
2061 pf = dir->project_fnode();
2063 if (pf->accounted_fragstat.version == pi->dirstat.version - 1) {
2064 dout(20) << fg << " fragstat " << pf->fragstat << dendl;
2065 dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
2066 pi->dirstat.add_delta(pf->fragstat, pf->accounted_fragstat, &touched_mtime, &touched_chattr);
2068 dout(20) << fg << " skipping STALE accounted_fragstat " << pf->accounted_fragstat << dendl;
2071 if (pf->fragstat.nfiles < 0 ||
2072 pf->fragstat.nsubdirs < 0) {
2073 clog->error() << "bad/negative dir size on "
2074 << dir->dirfrag() << " " << pf->fragstat;
2075 assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2077 if (pf->fragstat.nfiles < 0)
2078 pf->fragstat.nfiles = 0;
2079 if (pf->fragstat.nsubdirs < 0)
2080 pf->fragstat.nsubdirs = 0;
2084 pf->accounted_fragstat = pf->fragstat;
2085 pf->fragstat.version = pf->accounted_fragstat.version = pi->dirstat.version;
2086 dout(10) << fg << " updated accounted_fragstat " << pf->fragstat << " on " << *dir << dendl;
2089 tmpdft.force_to_leaf(g_ceph_context, fg);
2090 dirstat.add(pf->fragstat);
2093 pi->mtime = pi->ctime = pi->dirstat.mtime;
2095 pi->change_attr = pi->dirstat.change_attr;
2096 dout(20) << " final dirstat " << pi->dirstat << dendl;
2098 if (dirstat_valid && !dirstat.same_sums(pi->dirstat)) {
2100 tmpdft.get_leaves_under(frag_t(), ls);
2101 for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2102 if (!dirfrags.count(*p)) {
2103 dirstat_valid = false;
2106 if (dirstat_valid) {
2107 if (state_test(CInode::STATE_REPAIRSTATS)) {
2108 dout(20) << " dirstat mismatch, fixing" << dendl;
2110 clog->error() << "unmatched fragstat on " << ino() << ", inode has "
2111 << pi->dirstat << ", dirfrags have " << dirstat;
2112 assert(!"unmatched fragstat" == g_conf->mds_verify_scatter);
2114 // trust the dirfrags for now
2115 version_t v = pi->dirstat.version;
2116 if (pi->dirstat.mtime > dirstat.mtime)
2117 dirstat.mtime = pi->dirstat.mtime;
2118 if (pi->dirstat.change_attr > dirstat.change_attr)
2119 dirstat.change_attr = pi->dirstat.change_attr;
2120 pi->dirstat = dirstat;
2121 pi->dirstat.version = v;
2125 if (pi->dirstat.nfiles < 0 || pi->dirstat.nsubdirs < 0)
2128 make_path_string(path);
2129 clog->error() << "Inconsistent statistics detected: fragstat on inode "
2130 << ino() << " (" << path << "), inode has " << pi->dirstat;
2131 assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2133 if (pi->dirstat.nfiles < 0)
2134 pi->dirstat.nfiles = 0;
2135 if (pi->dirstat.nsubdirs < 0)
2136 pi->dirstat.nsubdirs = 0;
2141 case CEPH_LOCK_INEST:
2143 fragtree_t tmpdft = dirfragtree;
2146 bool rstat_valid = true;
2150 inode_t *pi = get_projected_inode();
2151 dout(20) << " orig rstat " << pi->rstat << dendl;
2152 pi->rstat.version++;
2153 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2154 p != dirfrags.end();
2156 frag_t fg = p->first;
2157 CDir *dir = p->second;
2158 dout(20) << fg << " " << *dir << dendl;
2161 if (dir->get_version() != 0) {
2162 update = dir->is_auth() && !dir->is_frozen();
2165 rstat_valid = false;
2168 fnode_t *pf = dir->get_projected_fnode();
2170 pf = dir->project_fnode();
2172 if (pf->accounted_rstat.version == pi->rstat.version-1) {
2173 // only pull this frag's dirty rstat inodes into the frag if
2174 // the frag is non-stale and updateable. if it's stale,
2175 // that info will just get thrown out!
2177 dir->assimilate_dirty_rstat_inodes();
2179 dout(20) << fg << " rstat " << pf->rstat << dendl;
2180 dout(20) << fg << " accounted_rstat " << pf->accounted_rstat << dendl;
2181 dout(20) << fg << " dirty_old_rstat " << dir->dirty_old_rstat << dendl;
2182 mdcache->project_rstat_frag_to_inode(pf->rstat, pf->accounted_rstat,
2183 dir->first, CEPH_NOSNAP, this, true);
2184 for (compact_map<snapid_t,old_rstat_t>::iterator q = dir->dirty_old_rstat.begin();
2185 q != dir->dirty_old_rstat.end();
2187 mdcache->project_rstat_frag_to_inode(q->second.rstat, q->second.accounted_rstat,
2188 q->second.first, q->first, this, true);
2189 if (update) // dir contents not valid if frozen or non-auth
2190 dir->check_rstats();
2192 dout(20) << fg << " skipping STALE accounted_rstat " << pf->accounted_rstat << dendl;
2195 pf->accounted_rstat = pf->rstat;
2196 dir->dirty_old_rstat.clear();
2197 pf->rstat.version = pf->accounted_rstat.version = pi->rstat.version;
2198 dir->check_rstats();
2199 dout(10) << fg << " updated accounted_rstat " << pf->rstat << " on " << *dir << dendl;
2202 tmpdft.force_to_leaf(g_ceph_context, fg);
2203 rstat.add(pf->rstat);
2205 dout(20) << " final rstat " << pi->rstat << dendl;
2207 if (rstat_valid && !rstat.same_sums(pi->rstat)) {
2209 tmpdft.get_leaves_under(frag_t(), ls);
2210 for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2211 if (!dirfrags.count(*p)) {
2212 rstat_valid = false;
2216 if (state_test(CInode::STATE_REPAIRSTATS)) {
2217 dout(20) << " rstat mismatch, fixing" << dendl;
2219 clog->error() << "inconsistent rstat on inode " << ino()
2220 << ", inode has " << pi->rstat
2221 << ", directory fragments have " << rstat;
2222 assert(!"unmatched rstat" == g_conf->mds_verify_scatter);
2224 // trust the dirfrag for now
2225 version_t v = pi->rstat.version;
2226 if (pi->rstat.rctime > rstat.rctime)
2227 rstat.rctime = pi->rstat.rctime;
2229 pi->rstat.version = v;
2233 mdcache->broadcast_quota_to_client(this);
2237 case CEPH_LOCK_IDFT:
2245 void CInode::finish_scatter_gather_update_accounted(int type, MutationRef& mut, EMetaBlob *metablob)
2247 dout(10) << "finish_scatter_gather_update_accounted " << type << " on " << *this << dendl;
2250 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2251 p != dirfrags.end();
2253 CDir *dir = p->second;
2254 if (!dir->is_auth() || dir->get_version() == 0 || dir->is_frozen())
2257 if (type == CEPH_LOCK_IDFT)
2258 continue; // nothing to do.
2260 dout(10) << " journaling updated frag accounted_ on " << *dir << dendl;
2261 assert(dir->is_projected());
2262 fnode_t *pf = dir->get_projected_fnode();
2263 pf->version = dir->pre_dirty();
2264 mut->add_projected_fnode(dir);
2265 metablob->add_dir(dir, true);
2268 if (type == CEPH_LOCK_INEST)
2269 dir->assimilate_dirty_rstat_inodes_finish(mut, metablob);
2275 bool CInode::is_frozen() const
2277 if (is_frozen_inode()) return true;
2278 if (parent && parent->dir->is_frozen()) return true;
2282 bool CInode::is_frozen_dir() const
2284 if (parent && parent->dir->is_frozen_dir()) return true;
2288 bool CInode::is_freezing() const
2290 if (is_freezing_inode()) return true;
2291 if (parent && parent->dir->is_freezing()) return true;
2295 void CInode::add_dir_waiter(frag_t fg, MDSInternalContextBase *c)
2297 if (waiting_on_dir.empty())
2299 waiting_on_dir[fg].push_back(c);
2300 dout(10) << "add_dir_waiter frag " << fg << " " << c << " on " << *this << dendl;
2303 void CInode::take_dir_waiting(frag_t fg, list<MDSInternalContextBase*>& ls)
2305 if (waiting_on_dir.empty())
2308 compact_map<frag_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dir.find(fg);
2309 if (p != waiting_on_dir.end()) {
2310 dout(10) << "take_dir_waiting frag " << fg << " on " << *this << dendl;
2311 ls.splice(ls.end(), p->second);
2312 waiting_on_dir.erase(p);
2314 if (waiting_on_dir.empty())
2319 void CInode::add_waiter(uint64_t tag, MDSInternalContextBase *c)
2321 dout(10) << "add_waiter tag " << std::hex << tag << std::dec << " " << c
2322 << " !ambig " << !state_test(STATE_AMBIGUOUSAUTH)
2323 << " !frozen " << !is_frozen_inode()
2324 << " !freezing " << !is_freezing_inode()
2326 // wait on the directory?
2327 // make sure its not the inode that is explicitly ambiguous|freezing|frozen
2328 if (((tag & WAIT_SINGLEAUTH) && !state_test(STATE_AMBIGUOUSAUTH)) ||
2329 ((tag & WAIT_UNFREEZE) &&
2330 !is_frozen_inode() && !is_freezing_inode() && !is_frozen_auth_pin())) {
2331 dout(15) << "passing waiter up tree" << dendl;
2332 parent->dir->add_waiter(tag, c);
2335 dout(15) << "taking waiter here" << dendl;
2336 MDSCacheObject::add_waiter(tag, c);
2339 void CInode::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
2341 if ((mask & WAIT_DIR) && !waiting_on_dir.empty()) {
2342 // take all dentry waiters
2343 while (!waiting_on_dir.empty()) {
2344 compact_map<frag_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dir.begin();
2345 dout(10) << "take_waiting dirfrag " << p->first << " on " << *this << dendl;
2346 ls.splice(ls.end(), p->second);
2347 waiting_on_dir.erase(p);
2353 MDSCacheObject::take_waiting(mask, ls);
2356 bool CInode::freeze_inode(int auth_pin_allowance)
2358 assert(auth_pin_allowance > 0); // otherwise we need to adjust parent's nested_auth_pins
2359 assert(auth_pins >= auth_pin_allowance);
2360 if (auth_pins > auth_pin_allowance) {
2361 dout(10) << "freeze_inode - waiting for auth_pins to drop to " << auth_pin_allowance << dendl;
2362 auth_pin_freeze_allowance = auth_pin_allowance;
2364 state_set(STATE_FREEZING);
2368 dout(10) << "freeze_inode - frozen" << dendl;
2369 assert(auth_pins == auth_pin_allowance);
2370 if (!state_test(STATE_FROZEN)) {
2372 state_set(STATE_FROZEN);
2377 void CInode::unfreeze_inode(list<MDSInternalContextBase*>& finished)
2379 dout(10) << "unfreeze_inode" << dendl;
2380 if (state_test(STATE_FREEZING)) {
2381 state_clear(STATE_FREEZING);
2383 } else if (state_test(STATE_FROZEN)) {
2384 state_clear(STATE_FROZEN);
2388 take_waiting(WAIT_UNFREEZE, finished);
2391 void CInode::unfreeze_inode()
2393 list<MDSInternalContextBase*> finished;
2394 unfreeze_inode(finished);
2395 mdcache->mds->queue_waiters(finished);
2398 void CInode::freeze_auth_pin()
2400 assert(state_test(CInode::STATE_FROZEN));
2401 state_set(CInode::STATE_FROZENAUTHPIN);
2404 void CInode::unfreeze_auth_pin()
2406 assert(state_test(CInode::STATE_FROZENAUTHPIN));
2407 state_clear(CInode::STATE_FROZENAUTHPIN);
2408 if (!state_test(STATE_FREEZING|STATE_FROZEN)) {
2409 list<MDSInternalContextBase*> finished;
2410 take_waiting(WAIT_UNFREEZE, finished);
2411 mdcache->mds->queue_waiters(finished);
2415 void CInode::clear_ambiguous_auth(list<MDSInternalContextBase*>& finished)
2417 assert(state_test(CInode::STATE_AMBIGUOUSAUTH));
2418 state_clear(CInode::STATE_AMBIGUOUSAUTH);
2419 take_waiting(CInode::WAIT_SINGLEAUTH, finished);
2422 void CInode::clear_ambiguous_auth()
2424 list<MDSInternalContextBase*> finished;
2425 clear_ambiguous_auth(finished);
2426 mdcache->mds->queue_waiters(finished);
2430 bool CInode::can_auth_pin() const {
2431 if (!is_auth() || is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin())
2434 return parent->can_auth_pin();
2438 void CInode::auth_pin(void *by)
2444 #ifdef MDS_AUTHPIN_SET
2445 auth_pin_set.insert(by);
2448 dout(10) << "auth_pin by " << by << " on " << *this
2449 << " now " << auth_pins << "+" << nested_auth_pins
2453 parent->adjust_nested_auth_pins(1, 1, this);
2456 void CInode::auth_unpin(void *by)
2460 #ifdef MDS_AUTHPIN_SET
2461 assert(auth_pin_set.count(by));
2462 auth_pin_set.erase(auth_pin_set.find(by));
2468 dout(10) << "auth_unpin by " << by << " on " << *this
2469 << " now " << auth_pins << "+" << nested_auth_pins
2472 assert(auth_pins >= 0);
2475 parent->adjust_nested_auth_pins(-1, -1, by);
2477 if (is_freezing_inode() &&
2478 auth_pins == auth_pin_freeze_allowance) {
2479 dout(10) << "auth_unpin freezing!" << dendl;
2482 state_clear(STATE_FREEZING);
2483 state_set(STATE_FROZEN);
2484 finish_waiting(WAIT_FROZEN);
2488 void CInode::adjust_nested_auth_pins(int a, void *by)
2491 nested_auth_pins += a;
2492 dout(35) << "adjust_nested_auth_pins by " << by
2493 << " change " << a << " yields "
2494 << auth_pins << "+" << nested_auth_pins << dendl;
2495 assert(nested_auth_pins >= 0);
2497 if (g_conf->mds_debug_auth_pins) {
2500 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2501 p != dirfrags.end();
2503 CDir *dir = p->second;
2504 if (!dir->is_subtree_root() && dir->get_cum_auth_pins())
2507 assert(s == nested_auth_pins);
2511 parent->adjust_nested_auth_pins(a, 0, by);
2517 mds_authority_t CInode::authority() const
2519 if (inode_auth.first >= 0)
2523 return parent->dir->authority();
2525 // new items that are not yet linked in (in the committed plane) belong
2526 // to their first parent.
2527 if (!projected_parent.empty())
2528 return projected_parent.front()->dir->authority();
2530 return CDIR_AUTH_UNDEF;
2536 snapid_t CInode::get_oldest_snap()
2539 if (!old_inodes.empty())
2540 t = old_inodes.begin()->second.first;
2541 return MIN(t, oldest_snap);
2544 old_inode_t& CInode::cow_old_inode(snapid_t follows, bool cow_head)
2546 assert(follows >= first);
2548 inode_t *pi = cow_head ? get_projected_inode() : get_previous_projected_inode();
2549 map<string,bufferptr> *px = cow_head ? get_projected_xattrs() : get_previous_projected_xattrs();
2551 old_inode_t &old = old_inodes[follows];
2556 if (first < oldest_snap)
2557 oldest_snap = first;
2559 dout(10) << " " << px->size() << " xattrs cowed, " << *px << dendl;
2561 old.inode.trim_client_ranges(follows);
2563 if (g_conf->mds_snap_rstat &&
2564 !(old.inode.rstat == old.inode.accounted_rstat))
2565 dirty_old_rstats.insert(follows);
2569 dout(10) << "cow_old_inode " << (cow_head ? "head" : "previous_head" )
2570 << " to [" << old.first << "," << follows << "] on "
2576 void CInode::split_old_inode(snapid_t snap)
2578 compact_map<snapid_t, old_inode_t>::iterator p = old_inodes.lower_bound(snap);
2579 assert(p != old_inodes.end() && p->second.first < snap);
2581 old_inode_t &old = old_inodes[snap - 1];
2584 p->second.first = snap;
2585 dout(10) << "split_old_inode " << "[" << old.first << "," << p->first
2586 << "] to [" << snap << "," << p->first << "] on " << *this << dendl;
2589 void CInode::pre_cow_old_inode()
2591 snapid_t follows = find_snaprealm()->get_newest_seq();
2592 if (first <= follows)
2593 cow_old_inode(follows, true);
2596 void CInode::purge_stale_snap_data(const set<snapid_t>& snaps)
2598 dout(10) << "purge_stale_snap_data " << snaps << dendl;
2600 if (old_inodes.empty())
2603 compact_map<snapid_t,old_inode_t>::iterator p = old_inodes.begin();
2604 while (p != old_inodes.end()) {
2605 set<snapid_t>::const_iterator q = snaps.lower_bound(p->second.first);
2606 if (q == snaps.end() || *q > p->first) {
2607 dout(10) << " purging old_inode [" << p->second.first << "," << p->first << "]" << dendl;
2608 old_inodes.erase(p++);
2615 * pick/create an old_inode
2617 old_inode_t * CInode::pick_old_inode(snapid_t snap)
2619 compact_map<snapid_t, old_inode_t>::iterator p = old_inodes.lower_bound(snap); // p is first key >= to snap
2620 if (p != old_inodes.end() && p->second.first <= snap) {
2621 dout(10) << "pick_old_inode snap " << snap << " -> [" << p->second.first << "," << p->first << "]" << dendl;
2624 dout(10) << "pick_old_inode snap " << snap << " -> nothing" << dendl;
2628 void CInode::open_snaprealm(bool nosplit)
2631 SnapRealm *parent = find_snaprealm();
2632 snaprealm = new SnapRealm(mdcache, this);
2634 dout(10) << "open_snaprealm " << snaprealm
2635 << " parent is " << parent
2637 dout(30) << " siblings are " << parent->open_children << dendl;
2638 snaprealm->parent = parent;
2640 parent->split_at(snaprealm);
2641 parent->open_children.insert(snaprealm);
2645 void CInode::close_snaprealm(bool nojoin)
2648 dout(15) << "close_snaprealm " << *snaprealm << dendl;
2649 snaprealm->close_parents();
2650 if (snaprealm->parent) {
2651 snaprealm->parent->open_children.erase(snaprealm);
2653 //snaprealm->parent->join(snaprealm);
2660 SnapRealm *CInode::find_snaprealm() const
2662 const CInode *cur = this;
2663 while (!cur->snaprealm) {
2664 if (cur->get_parent_dn())
2665 cur = cur->get_parent_dn()->get_dir()->get_inode();
2666 else if (get_projected_parent_dn())
2667 cur = cur->get_projected_parent_dn()->get_dir()->get_inode();
2671 return cur->snaprealm;
2674 void CInode::encode_snap_blob(bufferlist &snapbl)
2677 ::encode(snaprealm->srnode, snapbl);
2678 dout(20) << "encode_snap_blob " << *snaprealm << dendl;
2681 void CInode::decode_snap_blob(bufferlist& snapbl)
2683 if (snapbl.length()) {
2685 bufferlist::iterator p = snapbl.begin();
2686 ::decode(snaprealm->srnode, p);
2688 bool ok = snaprealm->_open_parents(NULL);
2691 dout(20) << "decode_snap_blob " << *snaprealm << dendl;
2695 void CInode::encode_snap(bufferlist& bl)
2698 encode_snap_blob(snapbl);
2699 ::encode(snapbl, bl);
2700 ::encode(oldest_snap, bl);
2703 void CInode::decode_snap(bufferlist::iterator& p)
2706 ::decode(snapbl, p);
2707 ::decode(oldest_snap, p);
2708 decode_snap_blob(snapbl);
2711 // =============================================
2713 client_t CInode::calc_ideal_loner()
2715 if (mdcache->is_readonly())
2717 if (!mds_caps_wanted.empty())
2721 client_t loner = -1;
2722 for (map<client_t,Capability*>::iterator it = client_caps.begin();
2723 it != client_caps.end();
2725 if (!it->second->is_stale() &&
2726 ((it->second->wanted() & (CEPH_CAP_ANY_WR|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_RD)) ||
2727 (inode.is_dir() && !has_subtree_root_dirfrag()))) {
2736 client_t CInode::choose_ideal_loner()
2738 want_loner_cap = calc_ideal_loner();
2739 return want_loner_cap;
2742 bool CInode::try_set_loner()
2744 assert(want_loner_cap >= 0);
2745 if (loner_cap >= 0 && loner_cap != want_loner_cap)
2747 set_loner_cap(want_loner_cap);
2751 void CInode::set_loner_cap(client_t l)
2754 authlock.set_excl_client(loner_cap);
2755 filelock.set_excl_client(loner_cap);
2756 linklock.set_excl_client(loner_cap);
2757 xattrlock.set_excl_client(loner_cap);
2760 bool CInode::try_drop_loner()
2765 int other_allowed = get_caps_allowed_by_type(CAP_ANY);
2766 Capability *cap = get_client_cap(loner_cap);
2768 (cap->issued() & ~other_allowed) == 0) {
2776 // choose new lock state during recovery, based on issued caps
2777 void CInode::choose_lock_state(SimpleLock *lock, int allissued)
2779 int shift = lock->get_cap_shift();
2780 int issued = (allissued >> shift) & lock->get_cap_mask();
2782 if (lock->is_xlocked()) {
2784 } else if (lock->get_state() != LOCK_MIX) {
2785 if (issued & (CEPH_CAP_GEXCL | CEPH_CAP_GBUFFER))
2786 lock->set_state(LOCK_EXCL);
2787 else if (issued & CEPH_CAP_GWR)
2788 lock->set_state(LOCK_MIX);
2789 else if (lock->is_dirty()) {
2790 if (is_replicated())
2791 lock->set_state(LOCK_MIX);
2793 lock->set_state(LOCK_LOCK);
2795 lock->set_state(LOCK_SYNC);
2798 // our states have already been chosen during rejoin.
2799 if (lock->is_xlocked())
2800 assert(lock->get_state() == LOCK_LOCK);
2804 void CInode::choose_lock_states(int dirty_caps)
2806 int issued = get_caps_issued() | dirty_caps;
2807 if (is_auth() && (issued & (CEPH_CAP_ANY_EXCL|CEPH_CAP_ANY_WR)) &&
2808 choose_ideal_loner() >= 0)
2810 choose_lock_state(&filelock, issued);
2811 choose_lock_state(&nestlock, issued);
2812 choose_lock_state(&dirfragtreelock, issued);
2813 choose_lock_state(&authlock, issued);
2814 choose_lock_state(&xattrlock, issued);
2815 choose_lock_state(&linklock, issued);
2818 Capability *CInode::add_client_cap(client_t client, Session *session, SnapRealm *conrealm)
2820 if (client_caps.empty()) {
2823 containing_realm = conrealm;
2825 containing_realm = find_snaprealm();
2826 containing_realm->inodes_with_caps.push_back(&item_caps);
2827 dout(10) << "add_client_cap first cap, joining realm " << *containing_realm << dendl;
2830 if (client_caps.empty())
2831 mdcache->num_inodes_with_caps++;
2833 Capability *cap = new Capability(this, ++mdcache->last_cap_id, client);
2834 assert(client_caps.count(client) == 0);
2835 client_caps[client] = cap;
2837 session->add_cap(cap);
2838 if (session->is_stale())
2841 cap->client_follows = first-1;
2843 containing_realm->add_cap(client, cap);
2848 void CInode::remove_client_cap(client_t client)
2850 assert(client_caps.count(client) == 1);
2851 Capability *cap = client_caps[client];
2853 cap->item_session_caps.remove_myself();
2854 cap->item_revoking_caps.remove_myself();
2855 cap->item_client_revoking_caps.remove_myself();
2856 containing_realm->remove_cap(client, cap);
2858 if (client == loner_cap)
2862 client_caps.erase(client);
2863 if (client_caps.empty()) {
2864 dout(10) << "remove_client_cap last cap, leaving realm " << *containing_realm << dendl;
2866 item_caps.remove_myself();
2867 containing_realm = NULL;
2868 item_open_file.remove_myself(); // unpin logsegment
2869 mdcache->num_inodes_with_caps--;
2872 //clean up advisory locks
2873 bool fcntl_removed = fcntl_locks ? fcntl_locks->remove_all_from(client) : false;
2874 bool flock_removed = flock_locks ? flock_locks->remove_all_from(client) : false;
2875 if (fcntl_removed || flock_removed) {
2876 list<MDSInternalContextBase*> waiters;
2877 take_waiting(CInode::WAIT_FLOCK, waiters);
2878 mdcache->mds->queue_waiters(waiters);
2882 void CInode::move_to_realm(SnapRealm *realm)
2884 dout(10) << "move_to_realm joining realm " << *realm
2885 << ", leaving realm " << *containing_realm << dendl;
2886 for (map<client_t,Capability*>::iterator q = client_caps.begin();
2887 q != client_caps.end();
2889 containing_realm->remove_cap(q->first, q->second);
2890 realm->add_cap(q->first, q->second);
2892 item_caps.remove_myself();
2893 realm->inodes_with_caps.push_back(&item_caps);
2894 containing_realm = realm;
2897 Capability *CInode::reconnect_cap(client_t client, const cap_reconnect_t& icr, Session *session)
2899 Capability *cap = get_client_cap(client);
2902 cap->merge(icr.capinfo.wanted, icr.capinfo.issued);
2904 cap = add_client_cap(client, session);
2905 cap->set_cap_id(icr.capinfo.cap_id);
2906 cap->set_wanted(icr.capinfo.wanted);
2907 cap->issue_norevoke(icr.capinfo.issued);
2910 cap->set_last_issue_stamp(ceph_clock_now());
2914 void CInode::clear_client_caps_after_export()
2916 while (!client_caps.empty())
2917 remove_client_cap(client_caps.begin()->first);
2919 want_loner_cap = -1;
2920 mds_caps_wanted.clear();
2923 void CInode::export_client_caps(map<client_t,Capability::Export>& cl)
2925 for (map<client_t,Capability*>::iterator it = client_caps.begin();
2926 it != client_caps.end();
2928 cl[it->first] = it->second->make_export();
2933 int CInode::get_caps_liked() const
2936 return CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED; // but not, say, FILE_RD|WR|WRBUFFER
2938 return CEPH_CAP_ANY & ~CEPH_CAP_FILE_LAZYIO;
2941 int CInode::get_caps_allowed_ever() const
2945 allowed = CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED;
2947 allowed = CEPH_CAP_ANY;
2950 (filelock.gcaps_allowed_ever() << filelock.get_cap_shift()) |
2951 (authlock.gcaps_allowed_ever() << authlock.get_cap_shift()) |
2952 (xattrlock.gcaps_allowed_ever() << xattrlock.get_cap_shift()) |
2953 (linklock.gcaps_allowed_ever() << linklock.get_cap_shift()));
2956 int CInode::get_caps_allowed_by_type(int type) const
2960 (filelock.gcaps_allowed(type) << filelock.get_cap_shift()) |
2961 (authlock.gcaps_allowed(type) << authlock.get_cap_shift()) |
2962 (xattrlock.gcaps_allowed(type) << xattrlock.get_cap_shift()) |
2963 (linklock.gcaps_allowed(type) << linklock.get_cap_shift());
2966 int CInode::get_caps_careful() const
2969 (filelock.gcaps_careful() << filelock.get_cap_shift()) |
2970 (authlock.gcaps_careful() << authlock.get_cap_shift()) |
2971 (xattrlock.gcaps_careful() << xattrlock.get_cap_shift()) |
2972 (linklock.gcaps_careful() << linklock.get_cap_shift());
2975 int CInode::get_xlocker_mask(client_t client) const
2978 (filelock.gcaps_xlocker_mask(client) << filelock.get_cap_shift()) |
2979 (authlock.gcaps_xlocker_mask(client) << authlock.get_cap_shift()) |
2980 (xattrlock.gcaps_xlocker_mask(client) << xattrlock.get_cap_shift()) |
2981 (linklock.gcaps_xlocker_mask(client) << linklock.get_cap_shift());
2984 int CInode::get_caps_allowed_for_client(Session *session, inode_t *file_i) const
2986 client_t client = session->info.inst.name.num();
2988 if (client == get_loner()) {
2989 // as the loner, we get the loner_caps AND any xlocker_caps for things we have xlocked
2991 get_caps_allowed_by_type(CAP_LONER) |
2992 (get_caps_allowed_by_type(CAP_XLOCKER) & get_xlocker_mask(client));
2994 allowed = get_caps_allowed_by_type(CAP_ANY);
2998 if ((file_i->inline_data.version != CEPH_INLINE_NONE &&
2999 !session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) ||
3000 (!file_i->layout.pool_ns.empty() &&
3001 !session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)))
3002 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
3007 // caps issued, wanted
3008 int CInode::get_caps_issued(int *ploner, int *pother, int *pxlocker,
3009 int shift, int mask)
3012 int loner = 0, other = 0, xlocker = 0;
3017 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3018 it != client_caps.end();
3020 int i = it->second->issued();
3022 if (it->first == loner_cap)
3026 xlocker |= get_xlocker_mask(it->first) & i;
3028 if (ploner) *ploner = (loner >> shift) & mask;
3029 if (pother) *pother = (other >> shift) & mask;
3030 if (pxlocker) *pxlocker = (xlocker >> shift) & mask;
3031 return (c >> shift) & mask;
3034 bool CInode::is_any_caps_wanted() const
3036 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3037 it != client_caps.end();
3039 if (it->second->wanted())
3044 int CInode::get_caps_wanted(int *ploner, int *pother, int shift, int mask) const
3047 int loner = 0, other = 0;
3048 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3049 it != client_caps.end();
3051 if (!it->second->is_stale()) {
3052 int t = it->second->wanted();
3054 if (it->first == loner_cap)
3059 //cout << " get_caps_wanted client " << it->first << " " << cap_string(it->second.wanted()) << endl;
3062 for (compact_map<int,int>::const_iterator it = mds_caps_wanted.begin();
3063 it != mds_caps_wanted.end();
3066 other |= it->second;
3067 //cout << " get_caps_wanted mds " << it->first << " " << cap_string(it->second) << endl;
3069 if (ploner) *ploner = (loner >> shift) & mask;
3070 if (pother) *pother = (other >> shift) & mask;
3071 return (w >> shift) & mask;
3074 bool CInode::issued_caps_need_gather(SimpleLock *lock)
3076 int loner_issued, other_issued, xlocker_issued;
3077 get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
3078 lock->get_cap_shift(), lock->get_cap_mask());
3079 if ((loner_issued & ~lock->gcaps_allowed(CAP_LONER)) ||
3080 (other_issued & ~lock->gcaps_allowed(CAP_ANY)) ||
3081 (xlocker_issued & ~lock->gcaps_allowed(CAP_XLOCKER)))
3086 void CInode::replicate_relax_locks()
3088 //dout(10) << " relaxing locks on " << *this << dendl;
3090 assert(!is_replicated());
3092 authlock.replicate_relax();
3093 linklock.replicate_relax();
3094 dirfragtreelock.replicate_relax();
3095 filelock.replicate_relax();
3096 xattrlock.replicate_relax();
3097 snaplock.replicate_relax();
3098 nestlock.replicate_relax();
3099 flocklock.replicate_relax();
3100 policylock.replicate_relax();
3105 // =============================================
3107 int CInode::encode_inodestat(bufferlist& bl, Session *session,
3108 SnapRealm *dir_realm,
3113 client_t client = session->info.inst.name.num();
3115 assert(session->connection);
3120 inode_t *oi = &inode;
3121 inode_t *pi = get_projected_inode();
3123 map<string, bufferptr> *pxattrs = 0;
3125 if (snapid != CEPH_NOSNAP) {
3127 // for now at least, old_inodes is only defined/valid on the auth
3131 if (is_multiversion()) {
3132 compact_map<snapid_t,old_inode_t>::iterator p = old_inodes.lower_bound(snapid);
3133 if (p != old_inodes.end()) {
3134 if (p->second.first > snapid) {
3135 if (p != old_inodes.begin())
3138 if (p->second.first <= snapid && snapid <= p->first) {
3139 dout(15) << "encode_inodestat snapid " << snapid
3140 << " to old_inode [" << p->second.first << "," << p->first << "]"
3141 << " " << p->second.inode.rstat
3143 pi = oi = &p->second.inode;
3144 pxattrs = &p->second.xattrs;
3146 // snapshoted remote dentry can result this
3147 dout(0) << "encode_inodestat old_inode for snapid " << snapid
3148 << " not found" << dendl;
3151 } else if (snapid < first || snapid > last) {
3152 // snapshoted remote dentry can result this
3153 dout(0) << "encode_inodestat [" << first << "," << last << "]"
3154 << " not match snapid " << snapid << dendl;
3158 SnapRealm *realm = find_snaprealm();
3160 bool no_caps = !valid ||
3161 session->is_stale() ||
3162 (dir_realm && realm != dir_realm) ||
3164 state_test(CInode::STATE_EXPORTINGCAPS);
3166 dout(20) << "encode_inodestat no caps"
3167 << (!valid?", !valid":"")
3168 << (session->is_stale()?", session stale ":"")
3169 << ((dir_realm && realm != dir_realm)?", snaprealm differs ":"")
3170 << (is_frozen()?", frozen inode":"")
3171 << (state_test(CInode::STATE_EXPORTINGCAPS)?", exporting caps":"")
3175 // "fake" a version that is old (stable) version, +1 if projected.
3176 version_t version = (oi->version * 2) + is_projected();
3178 Capability *cap = get_client_cap(client);
3179 bool pfile = filelock.is_xlocked_by_client(client) || get_loner() == client;
3180 //(cap && (cap->issued() & CEPH_CAP_FILE_EXCL));
3181 bool pauth = authlock.is_xlocked_by_client(client) || get_loner() == client;
3182 bool plink = linklock.is_xlocked_by_client(client) || get_loner() == client;
3183 bool pxattr = xattrlock.is_xlocked_by_client(client) || get_loner() == client;
3185 bool plocal = versionlock.get_last_wrlock_client() == client;
3186 bool ppolicy = policylock.is_xlocked_by_client(client) || get_loner()==client;
3188 inode_t *any_i = (pfile|pauth|plink|pxattr|plocal) ? pi : oi;
3190 dout(20) << " pfile " << pfile << " pauth " << pauth
3191 << " plink " << plink << " pxattr " << pxattr
3192 << " plocal " << plocal
3193 << " ctime " << any_i->ctime
3194 << " valid=" << valid << dendl;
3197 inode_t *file_i = pfile ? pi:oi;
3198 file_layout_t layout;
3200 layout = (ppolicy ? pi : oi)->layout;
3202 layout = file_i->layout;
3205 // max_size is min of projected, actual
3207 MIN(oi->client_ranges.count(client) ?
3208 oi->client_ranges[client].range.last : 0,
3209 pi->client_ranges.count(client) ?
3210 pi->client_ranges[client].range.last : 0);
3213 version_t inline_version = 0;
3214 bufferlist inline_data;
3215 if (file_i->inline_data.version == CEPH_INLINE_NONE) {
3216 inline_version = CEPH_INLINE_NONE;
3217 } else if ((!cap && !no_caps) ||
3218 (cap && cap->client_inline_version < file_i->inline_data.version) ||
3219 (getattr_caps & CEPH_CAP_FILE_RD)) { // client requests inline data
3220 inline_version = file_i->inline_data.version;
3221 if (file_i->inline_data.length() > 0)
3222 inline_data = file_i->inline_data.get_data();
3225 // nest (do same as file... :/)
3227 cap->last_rbytes = file_i->rstat.rbytes;
3228 cap->last_rsize = file_i->rstat.rsize();
3232 inode_t *auth_i = pauth ? pi:oi;
3235 inode_t *link_i = plink ? pi:oi;
3238 inode_t *xattr_i = pxattr ? pi:oi;
3242 version_t xattr_version;
3243 if ((!cap && !no_caps) ||
3244 (cap && cap->client_xattr_version < xattr_i->xattr_version) ||
3245 (getattr_caps & CEPH_CAP_XATTR_SHARED)) { // client requests xattrs
3247 pxattrs = pxattr ? get_projected_xattrs() : &xattrs;
3248 ::encode(*pxattrs, xbl);
3249 xattr_version = xattr_i->xattr_version;
3256 unsigned bytes = 8 + 8 + 4 + 8 + 8 + sizeof(ceph_mds_reply_cap) +
3257 sizeof(struct ceph_file_layout) + 4 + layout.pool_ns.size() +
3258 sizeof(struct ceph_timespec) * 3 +
3259 4 + 8 + 8 + 8 + 4 + 4 + 4 + 4 + 4 +
3260 8 + 8 + 8 + 8 + 8 + sizeof(struct ceph_timespec) +
3262 bytes += sizeof(__u32);
3263 bytes += (sizeof(__u32) + sizeof(__u32)) * dirfragtree._splits.size();
3264 bytes += sizeof(__u32) + symlink.length();
3265 bytes += sizeof(__u32) + xbl.length();
3266 bytes += sizeof(version_t) + sizeof(__u32) + inline_data.length();
3267 if (bytes > max_bytes)
3273 struct ceph_mds_reply_cap ecap;
3274 if (snapid != CEPH_NOSNAP) {
3276 * snapped inodes (files or dirs) only get read-only caps. always
3277 * issue everything possible, since it is read only.
3279 * if a snapped inode has caps, limit issued caps based on the
3282 * if it is a live inode, limit issued caps based on the lock
3285 * do NOT adjust cap issued state, because the client always
3286 * tracks caps per-snap and the mds does either per-interval or
3289 ecap.caps = valid ? get_caps_allowed_by_type(CAP_ANY) : CEPH_STAT_CAP_INODE;
3290 if (last == CEPH_NOSNAP || is_any_caps())
3291 ecap.caps = ecap.caps & get_caps_allowed_for_client(session, file_i);
3296 if (!no_caps && !cap) {
3298 cap = add_client_cap(client, session, realm);
3300 if (choose_ideal_loner() >= 0)
3302 else if (get_wanted_loner() < 0)
3308 if (!no_caps && cap) {
3309 int likes = get_caps_liked();
3310 int allowed = get_caps_allowed_for_client(session, file_i);
3311 issue = (cap->wanted() | likes) & allowed;
3312 cap->issue_norevoke(issue);
3313 issue = cap->pending();
3314 dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3315 << " seq " << cap->get_last_seq() << dendl;
3316 } else if (cap && cap->is_new() && !dir_realm) {
3317 // alway issue new caps to client, otherwise the caps get lost
3318 assert(cap->is_stale());
3319 issue = cap->pending() | CEPH_CAP_PIN;
3320 cap->issue_norevoke(issue);
3321 dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3322 << " seq " << cap->get_last_seq()
3323 << "(stale|new caps)" << dendl;
3327 cap->set_last_issue();
3328 cap->set_last_issue_stamp(ceph_clock_now());
3331 ecap.wanted = cap->wanted();
3332 ecap.cap_id = cap->get_cap_id();
3333 ecap.seq = cap->get_last_seq();
3334 ecap.mseq = cap->get_mseq();
3335 ecap.realm = realm->inode->ino();
3345 ecap.flags = is_auth() ? CEPH_CAP_FLAG_AUTH : 0;
3346 dout(10) << "encode_inodestat caps " << ccap_string(ecap.caps)
3347 << " seq " << ecap.seq << " mseq " << ecap.mseq
3348 << " xattrv " << xattr_version << " len " << xbl.length()
3351 if (inline_data.length() && cap) {
3352 if ((cap->pending() | getattr_caps) & CEPH_CAP_FILE_SHARED) {
3353 dout(10) << "including inline version " << inline_version << dendl;
3354 cap->client_inline_version = inline_version;
3356 dout(10) << "dropping inline version " << inline_version << dendl;
3358 inline_data.clear();
3362 // include those xattrs?
3363 if (xbl.length() && cap) {
3364 if ((cap->pending() | getattr_caps) & CEPH_CAP_XATTR_SHARED) {
3365 dout(10) << "including xattrs version " << xattr_i->xattr_version << dendl;
3366 cap->client_xattr_version = xattr_i->xattr_version;
3368 dout(10) << "dropping xattrs version " << xattr_i->xattr_version << dendl;
3369 xbl.clear(); // no xattrs .. XXX what's this about?!?
3375 * note: encoding matches MClientReply::InodeStat
3377 ::encode(oi->ino, bl);
3378 ::encode(snapid, bl);
3379 ::encode(oi->rdev, bl);
3380 ::encode(version, bl);
3382 ::encode(xattr_version, bl);
3386 ceph_file_layout legacy_layout;
3387 layout.to_legacy(&legacy_layout);
3388 ::encode(legacy_layout, bl);
3390 ::encode(any_i->ctime, bl);
3391 ::encode(file_i->mtime, bl);
3392 ::encode(file_i->atime, bl);
3393 ::encode(file_i->time_warp_seq, bl);
3394 ::encode(file_i->size, bl);
3395 ::encode(max_size, bl);
3396 ::encode(file_i->truncate_size, bl);
3397 ::encode(file_i->truncate_seq, bl);
3399 ::encode(auth_i->mode, bl);
3400 ::encode((uint32_t)auth_i->uid, bl);
3401 ::encode((uint32_t)auth_i->gid, bl);
3403 ::encode(link_i->nlink, bl);
3405 ::encode(file_i->dirstat.nfiles, bl);
3406 ::encode(file_i->dirstat.nsubdirs, bl);
3407 ::encode(file_i->rstat.rbytes, bl);
3408 ::encode(file_i->rstat.rfiles, bl);
3409 ::encode(file_i->rstat.rsubdirs, bl);
3410 ::encode(file_i->rstat.rctime, bl);
3412 dirfragtree.encode(bl);
3414 ::encode(symlink, bl);
3415 if (session->connection->has_feature(CEPH_FEATURE_DIRLAYOUTHASH)) {
3416 ::encode(file_i->dir_layout, bl);
3419 if (session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
3420 ::encode(inline_version, bl);
3421 ::encode(inline_data, bl);
3423 if (session->connection->has_feature(CEPH_FEATURE_MDS_QUOTA)) {
3424 inode_t *policy_i = ppolicy ? pi : oi;
3425 ::encode(policy_i->quota, bl);
3427 if (session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
3428 ::encode(layout.pool_ns, bl);
3430 if (session->connection->has_feature(CEPH_FEATURE_FS_BTIME)) {
3431 ::encode(any_i->btime, bl);
3432 ::encode(any_i->change_attr, bl);
3438 void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
3442 client_t client = cap->get_client();
3444 bool pfile = filelock.is_xlocked_by_client(client) || (cap->issued() & CEPH_CAP_FILE_EXCL);
3445 bool pauth = authlock.is_xlocked_by_client(client);
3446 bool plink = linklock.is_xlocked_by_client(client);
3447 bool pxattr = xattrlock.is_xlocked_by_client(client);
3449 inode_t *oi = &inode;
3450 inode_t *pi = get_projected_inode();
3451 inode_t *i = (pfile|pauth|plink|pxattr) ? pi : oi;
3453 dout(20) << "encode_cap_message pfile " << pfile
3454 << " pauth " << pauth << " plink " << plink << " pxattr " << pxattr
3455 << " ctime " << i->ctime << dendl;
3458 m->set_layout(i->layout);
3460 m->truncate_seq = i->truncate_seq;
3461 m->truncate_size = i->truncate_size;
3462 m->mtime = i->mtime;
3463 m->atime = i->atime;
3464 m->ctime = i->ctime;
3465 m->change_attr = i->change_attr;
3466 m->time_warp_seq = i->time_warp_seq;
3468 if (cap->client_inline_version < i->inline_data.version) {
3469 m->inline_version = cap->client_inline_version = i->inline_data.version;
3470 if (i->inline_data.length() > 0)
3471 m->inline_data = i->inline_data.get_data();
3473 m->inline_version = 0;
3476 // max_size is min of projected, actual.
3477 uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0;
3478 uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0;
3479 m->max_size = MIN(oldms, newms);
3482 m->head.mode = i->mode;
3483 m->head.uid = i->uid;
3484 m->head.gid = i->gid;
3487 m->head.nlink = i->nlink;
3490 map<string,bufferptr> *ix = pxattr ? get_projected_xattrs() : &xattrs;
3491 if ((cap->pending() & CEPH_CAP_XATTR_SHARED) &&
3492 i->xattr_version > cap->client_xattr_version) {
3493 dout(10) << " including xattrs v " << i->xattr_version << dendl;
3494 ::encode(*ix, m->xattrbl);
3495 m->head.xattr_version = i->xattr_version;
3496 cap->client_xattr_version = i->xattr_version;
3502 void CInode::_encode_base(bufferlist& bl, uint64_t features)
3504 ::encode(first, bl);
3505 ::encode(inode, bl, features);
3506 ::encode(symlink, bl);
3507 ::encode(dirfragtree, bl);
3508 ::encode(xattrs, bl);
3509 ::encode(old_inodes, bl, features);
3510 ::encode(damage_flags, bl);
3513 void CInode::_decode_base(bufferlist::iterator& p)
3517 ::decode(symlink, p);
3518 ::decode(dirfragtree, p);
3519 ::decode(xattrs, p);
3520 ::decode(old_inodes, p);
3521 ::decode(damage_flags, p);
3525 void CInode::_encode_locks_full(bufferlist& bl)
3527 ::encode(authlock, bl);
3528 ::encode(linklock, bl);
3529 ::encode(dirfragtreelock, bl);
3530 ::encode(filelock, bl);
3531 ::encode(xattrlock, bl);
3532 ::encode(snaplock, bl);
3533 ::encode(nestlock, bl);
3534 ::encode(flocklock, bl);
3535 ::encode(policylock, bl);
3537 ::encode(loner_cap, bl);
3539 void CInode::_decode_locks_full(bufferlist::iterator& p)
3541 ::decode(authlock, p);
3542 ::decode(linklock, p);
3543 ::decode(dirfragtreelock, p);
3544 ::decode(filelock, p);
3545 ::decode(xattrlock, p);
3546 ::decode(snaplock, p);
3547 ::decode(nestlock, p);
3548 ::decode(flocklock, p);
3549 ::decode(policylock, p);
3551 ::decode(loner_cap, p);
3552 set_loner_cap(loner_cap);
3553 want_loner_cap = loner_cap; // for now, we'll eval() shortly.
3556 void CInode::_encode_locks_state_for_replica(bufferlist& bl)
3558 authlock.encode_state_for_replica(bl);
3559 linklock.encode_state_for_replica(bl);
3560 dirfragtreelock.encode_state_for_replica(bl);
3561 filelock.encode_state_for_replica(bl);
3562 nestlock.encode_state_for_replica(bl);
3563 xattrlock.encode_state_for_replica(bl);
3564 snaplock.encode_state_for_replica(bl);
3565 flocklock.encode_state_for_replica(bl);
3566 policylock.encode_state_for_replica(bl);
3568 void CInode::_encode_locks_state_for_rejoin(bufferlist& bl, int rep)
3570 authlock.encode_state_for_replica(bl);
3571 linklock.encode_state_for_replica(bl);
3572 dirfragtreelock.encode_state_for_rejoin(bl, rep);
3573 filelock.encode_state_for_rejoin(bl, rep);
3574 nestlock.encode_state_for_rejoin(bl, rep);
3575 xattrlock.encode_state_for_replica(bl);
3576 snaplock.encode_state_for_replica(bl);
3577 flocklock.encode_state_for_replica(bl);
3578 policylock.encode_state_for_replica(bl);
3580 void CInode::_decode_locks_state(bufferlist::iterator& p, bool is_new)
3582 authlock.decode_state(p, is_new);
3583 linklock.decode_state(p, is_new);
3584 dirfragtreelock.decode_state(p, is_new);
3585 filelock.decode_state(p, is_new);
3586 nestlock.decode_state(p, is_new);
3587 xattrlock.decode_state(p, is_new);
3588 snaplock.decode_state(p, is_new);
3589 flocklock.decode_state(p, is_new);
3590 policylock.decode_state(p, is_new);
3592 void CInode::_decode_locks_rejoin(bufferlist::iterator& p, list<MDSInternalContextBase*>& waiters,
3593 list<SimpleLock*>& eval_locks)
3595 authlock.decode_state_rejoin(p, waiters);
3596 linklock.decode_state_rejoin(p, waiters);
3597 dirfragtreelock.decode_state_rejoin(p, waiters);
3598 filelock.decode_state_rejoin(p, waiters);
3599 nestlock.decode_state_rejoin(p, waiters);
3600 xattrlock.decode_state_rejoin(p, waiters);
3601 snaplock.decode_state_rejoin(p, waiters);
3602 flocklock.decode_state_rejoin(p, waiters);
3603 policylock.decode_state_rejoin(p, waiters);
3605 if (!dirfragtreelock.is_stable() && !dirfragtreelock.is_wrlocked())
3606 eval_locks.push_back(&dirfragtreelock);
3607 if (!filelock.is_stable() && !filelock.is_wrlocked())
3608 eval_locks.push_back(&filelock);
3609 if (!nestlock.is_stable() && !nestlock.is_wrlocked())
3610 eval_locks.push_back(&nestlock);
3616 void CInode::encode_export(bufferlist& bl)
3618 ENCODE_START(5, 4, bl);
3619 _encode_base(bl, mdcache->mds->mdsmap->get_up_features());
3621 ::encode(state, bl);
3625 ::encode(get_replicas(), bl);
3627 // include scatterlock info for any bounding CDirs
3628 bufferlist bounding;
3630 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
3631 p != dirfrags.end();
3633 CDir *dir = p->second;
3634 if (dir->state_test(CDir::STATE_EXPORTBOUND)) {
3635 ::encode(p->first, bounding);
3636 ::encode(dir->fnode.fragstat, bounding);
3637 ::encode(dir->fnode.accounted_fragstat, bounding);
3638 ::encode(dir->fnode.rstat, bounding);
3639 ::encode(dir->fnode.accounted_rstat, bounding);
3640 dout(10) << " encoded fragstat/rstat info for " << *dir << dendl;
3643 ::encode(bounding, bl);
3645 _encode_locks_full(bl);
3647 _encode_file_locks(bl);
3651 get(PIN_TEMPEXPORTING);
3654 void CInode::finish_export(utime_t now)
3656 state &= MASK_STATE_EXPORT_KEPT;
3661 //dirlock.clear_updated();
3665 put(PIN_TEMPEXPORTING);
3668 void CInode::decode_import(bufferlist::iterator& p,
3677 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
3683 if (is_dirty_parent()) {
3684 get(PIN_DIRTYPARENT);
3685 _mark_dirty_parent(ls);
3688 ::decode(pop, ceph_clock_now(), p);
3690 ::decode(get_replicas(), p);
3691 if (is_replicated())
3692 get(PIN_REPLICATED);
3695 // decode fragstat info on bounding cdirs
3696 bufferlist bounding;
3697 ::decode(bounding, p);
3698 bufferlist::iterator q = bounding.begin();
3702 CDir *dir = get_dirfrag(fg);
3703 assert(dir); // we should have all bounds open
3705 // Only take the remote's fragstat/rstat if we are non-auth for
3706 // this dirfrag AND the lock is NOT in a scattered (MIX) state.
3707 // We know lock is stable, and MIX is the only state in which
3708 // the inode auth (who sent us this data) may not have the best
3711 // HMM: Are there cases where dir->is_auth() is an insufficient
3712 // check because the dirfrag is under migration? That implies
3713 // it is frozen (and in a SYNC or LOCK state). FIXME.
3715 if (dir->is_auth() ||
3716 filelock.get_state() == LOCK_MIX) {
3717 dout(10) << " skipped fragstat info for " << *dir << dendl;
3722 ::decode(dir->fnode.fragstat, q);
3723 ::decode(dir->fnode.accounted_fragstat, q);
3724 dout(10) << " took fragstat info for " << *dir << dendl;
3726 if (dir->is_auth() ||
3727 nestlock.get_state() == LOCK_MIX) {
3728 dout(10) << " skipped rstat info for " << *dir << dendl;
3733 ::decode(dir->fnode.rstat, q);
3734 ::decode(dir->fnode.accounted_rstat, q);
3735 dout(10) << " took rstat info for " << *dir << dendl;
3739 _decode_locks_full(p);
3741 _decode_file_locks(p);
3747 void InodeStoreBase::dump(Formatter *f) const
3750 f->dump_string("symlink", symlink);
3751 f->open_array_section("old_inodes");
3752 for (compact_map<snapid_t, old_inode_t>::const_iterator i = old_inodes.begin();
3753 i != old_inodes.end(); ++i) {
3754 f->open_object_section("old_inode");
3756 // The key is the last snapid, the first is in the old_inode_t
3757 f->dump_int("last", i->first);
3760 f->close_section(); // old_inode
3762 f->close_section(); // old_inodes
3764 f->open_object_section("dirfragtree");
3765 dirfragtree.dump(f);
3766 f->close_section(); // dirfragtree
3770 void InodeStore::generate_test_instances(list<InodeStore*> &ls)
3772 InodeStore *populated = new InodeStore;
3773 populated->inode.ino = 0xdeadbeef;
3774 populated->symlink = "rhubarb";
3775 ls.push_back(populated);
3778 void CInode::validate_disk_state(CInode::validated_data *results,
3779 MDSInternalContext *fin)
3781 class ValidationContinuation : public MDSContinuation {
3783 MDSInternalContext *fin;
3785 CInode::validated_data *results;
3796 ValidationContinuation(CInode *i,
3797 CInode::validated_data *data_r,
3798 MDSInternalContext *fin_) :
3799 MDSContinuation(i->mdcache->mds->server),
3804 set_callback(START, static_cast<Continuation::stagePtr>(&ValidationContinuation::_start));
3805 set_callback(BACKTRACE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_backtrace));
3806 set_callback(INODE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_inode_disk));
3807 set_callback(DIRFRAGS, static_cast<Continuation::stagePtr>(&ValidationContinuation::_dirfrags));
3810 ~ValidationContinuation() override {
3815 * Fetch backtrace and set tag if tag is non-empty
3817 void fetch_backtrace_and_tag(CInode *in, std::string tag,
3818 Context *fin, int *bt_r, bufferlist *bt)
3820 const int64_t pool = in->get_backtrace_pool();
3821 object_t oid = CInode::get_object_name(in->ino(), frag_t(), "");
3823 ObjectOperation fetch;
3824 fetch.getxattr("parent", bt, bt_r);
3825 in->mdcache->mds->objecter->read(oid, object_locator_t(pool), fetch, CEPH_NOSNAP,
3828 ObjectOperation scrub_tag;
3830 ::encode(tag, tag_bl);
3831 scrub_tag.setxattr("scrub_tag", tag_bl);
3833 in->mdcache->mds->objecter->mutate(oid, object_locator_t(pool), scrub_tag, snapc,
3834 ceph::real_clock::now(),
3839 bool _start(int rval) {
3840 if (in->is_dirty()) {
3841 MDCache *mdcache = in->mdcache;
3842 inode_t& inode = in->inode;
3843 dout(20) << "validating a dirty CInode; results will be inconclusive"
3846 if (in->is_symlink()) {
3847 // there's nothing to do for symlinks!
3851 C_OnFinisher *conf = new C_OnFinisher(get_io_callback(BACKTRACE),
3852 in->mdcache->mds->finisher);
3854 // Whether we have a tag to apply depends on ScrubHeader (if one is
3856 if (in->scrub_infop) {
3857 // I'm a non-orphan, so look up my ScrubHeader via my linkage
3858 const std::string &tag = in->scrub_infop->header->get_tag();
3859 // Rather than using the usual CInode::fetch_backtrace,
3860 // use a special variant that optionally writes a tag in the same
3862 fetch_backtrace_and_tag(in, tag, conf,
3863 &results->backtrace.ondisk_read_retval, &bl);
3865 // When we're invoked outside of ScrubStack we might be called
3866 // on an orphaned inode like /
3867 fetch_backtrace_and_tag(in, {}, conf,
3868 &results->backtrace.ondisk_read_retval, &bl);
3873 bool _backtrace(int rval) {
3874 // set up basic result reporting and make sure we got the data
3875 results->performed_validation = true; // at least, some of it!
3876 results->backtrace.checked = true;
3878 const int64_t pool = in->get_backtrace_pool();
3879 inode_backtrace_t& memory_backtrace = results->backtrace.memory_value;
3880 in->build_backtrace(pool, memory_backtrace);
3881 bool equivalent, divergent;
3884 MDCache *mdcache = in->mdcache; // For the benefit of dout
3885 const inode_t& inode = in->inode; // For the benefit of dout
3887 // Ignore rval because it's the result of a FAILOK operation
3888 // from fetch_backtrace_and_tag: the real result is in
3889 // backtrace.ondisk_read_retval
3890 dout(20) << "ondisk_read_retval: " << results->backtrace.ondisk_read_retval << dendl;
3891 if (results->backtrace.ondisk_read_retval != 0) {
3892 results->backtrace.error_str << "failed to read off disk; see retval";
3896 // extract the backtrace, and compare it to a newly-constructed one
3898 bufferlist::iterator p = bl.begin();
3899 ::decode(results->backtrace.ondisk_value, p);
3900 dout(10) << "decoded " << bl.length() << " bytes of backtrace successfully" << dendl;
3901 } catch (buffer::error&) {
3902 if (results->backtrace.ondisk_read_retval == 0 && rval != 0) {
3903 // Cases where something has clearly gone wrong with the overall
3904 // fetch op, though we didn't get a nonzero rc from the getxattr
3905 // operation. e.g. object missing.
3906 results->backtrace.ondisk_read_retval = rval;
3908 results->backtrace.error_str << "failed to decode on-disk backtrace ("
3909 << bl.length() << " bytes)!";
3913 memory_newer = memory_backtrace.compare(results->backtrace.ondisk_value,
3914 &equivalent, &divergent);
3916 if (divergent || memory_newer < 0) {
3917 // we're divergent, or on-disk version is newer
3918 results->backtrace.error_str << "On-disk backtrace is divergent or newer";
3920 results->backtrace.passed = true;
3924 if (!results->backtrace.passed && in->scrub_infop->header->get_repair()) {
3926 in->make_path_string(path);
3927 in->mdcache->mds->clog->warn() << "bad backtrace on inode " << in->ino()
3928 << "(" << path << "), rewriting it";
3929 in->_mark_dirty_parent(in->mdcache->mds->mdlog->get_current_segment(),
3933 // If the inode's number was free in the InoTable, fix that
3936 InoTable *inotable = mdcache->mds->inotable;
3938 dout(10) << "scrub: inotable ino = " << inode.ino << dendl;
3939 dout(10) << "scrub: inotable free says "
3940 << inotable->is_marked_free(inode.ino) << dendl;
3942 if (inotable->is_marked_free(inode.ino)) {
3943 LogChannelRef clog = in->mdcache->mds->clog;
3944 clog->error() << "scrub: inode wrongly marked free: 0x" << std::hex
3947 if (in->scrub_infop->header->get_repair()) {
3948 bool repaired = inotable->repair(inode.ino);
3950 clog->error() << "inode table repaired for inode: 0x" << std::hex
3955 clog->error() << "Cannot repair inotable while other operations"
3962 // quit if we're a file, or kick off directory checks otherwise
3963 // TODO: validate on-disk inode for non-base directories
3964 if (!in->is_dir()) {
3968 return validate_directory_data();
3971 bool validate_directory_data() {
3972 assert(in->is_dir());
3974 if (in->is_base()) {
3975 shadow_in = new CInode(in->mdcache);
3976 in->mdcache->create_unlinked_system_inode(shadow_in,
3979 shadow_in->fetch(get_internal_callback(INODE));
3982 results->inode.passed = true;
3983 return check_dirfrag_rstats();
3987 bool _inode_disk(int rval) {
3988 results->inode.checked = true;
3989 results->inode.ondisk_read_retval = rval;
3990 results->inode.ondisk_value = shadow_in->inode;
3991 results->inode.memory_value = in->inode;
3993 inode_t& si = shadow_in->inode;
3994 inode_t& i = in->inode;
3995 if (si.version > i.version) {
3997 results->inode.error_str << "On-disk inode is newer than in-memory one!";
4000 bool divergent = false;
4001 int r = i.compare(si, &divergent);
4002 results->inode.passed = !divergent && r >= 0;
4003 if (!results->inode.passed) {
4004 results->inode.error_str <<
4005 "On-disk inode is divergent or newer than in-memory one!";
4010 return check_dirfrag_rstats();
4013 bool check_dirfrag_rstats() {
4014 MDSGatherBuilder gather(g_ceph_context);
4015 std::list<frag_t> frags;
4016 in->dirfragtree.get_leaves(frags);
4017 for (list<frag_t>::iterator p = frags.begin();
4020 CDir *dir = in->get_or_open_dirfrag(in->mdcache, *p);
4022 if (!dir->scrub_infop->header)
4023 dir->scrub_infop->header = in->scrub_infop->header;
4024 if (dir->is_complete()) {
4027 dir->scrub_infop->need_scrub_local = true;
4028 dir->fetch(gather.new_sub(), false);
4031 if (gather.has_subs()) {
4032 gather.set_finisher(get_internal_callback(DIRFRAGS));
4036 return immediate(DIRFRAGS, 0);
4040 bool _dirfrags(int rval) {
4041 int frags_errors = 0;
4042 // basic reporting setup
4043 results->raw_stats.checked = true;
4044 results->raw_stats.ondisk_read_retval = rval;
4046 results->raw_stats.memory_value.dirstat = in->inode.dirstat;
4047 results->raw_stats.memory_value.rstat = in->inode.rstat;
4048 frag_info_t& dir_info = results->raw_stats.ondisk_value.dirstat;
4049 nest_info_t& nest_info = results->raw_stats.ondisk_value.rstat;
4052 results->raw_stats.error_str << "Failed to read dirfrags off disk";
4056 // check each dirfrag...
4057 for (compact_map<frag_t,CDir*>::iterator p = in->dirfrags.begin();
4058 p != in->dirfrags.end();
4060 CDir *dir = p->second;
4061 assert(dir->get_version() > 0);
4062 nest_info.add(dir->fnode.accounted_rstat);
4063 dir_info.add(dir->fnode.accounted_fragstat);
4064 if (dir->scrub_infop &&
4065 dir->scrub_infop->pending_scrub_error) {
4066 dir->scrub_infop->pending_scrub_error = false;
4067 if (dir->scrub_infop->header->get_repair()) {
4068 results->raw_stats.error_str
4069 << "dirfrag(" << p->first << ") has bad stats (will be fixed); ";
4071 results->raw_stats.error_str
4072 << "dirfrag(" << p->first << ") has bad stats; ";
4077 nest_info.rsubdirs++; // it gets one to account for self
4078 // ...and that their sum matches our inode settings
4079 if (!dir_info.same_sums(in->inode.dirstat) ||
4080 !nest_info.same_sums(in->inode.rstat)) {
4081 if (in->scrub_infop &&
4082 in->scrub_infop->header->get_repair()) {
4083 results->raw_stats.error_str
4084 << "freshly-calculated rstats don't match existing ones (will be fixed)";
4085 in->mdcache->repair_inode_stats(in);
4087 results->raw_stats.error_str
4088 << "freshly-calculated rstats don't match existing ones";
4092 if (frags_errors > 0)
4095 results->raw_stats.passed = true;
4100 void _done() override {
4101 if ((!results->raw_stats.checked || results->raw_stats.passed) &&
4102 (!results->backtrace.checked || results->backtrace.passed) &&
4103 (!results->inode.checked || results->inode.passed))
4104 results->passed_validation = true;
4106 fin->complete(get_rval());
4112 dout(10) << "scrub starting validate_disk_state on " << *this << dendl;
4113 ValidationContinuation *vc = new ValidationContinuation(this,
4119 void CInode::validated_data::dump(Formatter *f) const
4121 f->open_object_section("results");
4123 f->dump_bool("performed_validation", performed_validation);
4124 f->dump_bool("passed_validation", passed_validation);
4125 f->open_object_section("backtrace");
4127 f->dump_bool("checked", backtrace.checked);
4128 f->dump_bool("passed", backtrace.passed);
4129 f->dump_int("read_ret_val", backtrace.ondisk_read_retval);
4130 f->dump_stream("ondisk_value") << backtrace.ondisk_value;
4131 f->dump_stream("memoryvalue") << backtrace.memory_value;
4132 f->dump_string("error_str", backtrace.error_str.str());
4134 f->close_section(); // backtrace
4135 f->open_object_section("raw_stats");
4137 f->dump_bool("checked", raw_stats.checked);
4138 f->dump_bool("passed", raw_stats.passed);
4139 f->dump_int("read_ret_val", raw_stats.ondisk_read_retval);
4140 f->dump_stream("ondisk_value.dirstat") << raw_stats.ondisk_value.dirstat;
4141 f->dump_stream("ondisk_value.rstat") << raw_stats.ondisk_value.rstat;
4142 f->dump_stream("memory_value.dirrstat") << raw_stats.memory_value.dirstat;
4143 f->dump_stream("memory_value.rstat") << raw_stats.memory_value.rstat;
4144 f->dump_string("error_str", raw_stats.error_str.str());
4146 f->close_section(); // raw_stats
4147 // dump failure return code
4149 if (backtrace.checked && backtrace.ondisk_read_retval)
4150 rc = backtrace.ondisk_read_retval;
4151 if (inode.checked && inode.ondisk_read_retval)
4152 rc = inode.ondisk_read_retval;
4153 if (raw_stats.checked && raw_stats.ondisk_read_retval)
4154 rc = raw_stats.ondisk_read_retval;
4155 f->dump_int("return_code", rc);
4157 f->close_section(); // results
4160 void CInode::dump(Formatter *f) const
4162 InodeStoreBase::dump(f);
4164 MDSCacheObject::dump(f);
4166 f->open_object_section("versionlock");
4167 versionlock.dump(f);
4170 f->open_object_section("authlock");
4174 f->open_object_section("linklock");
4178 f->open_object_section("dirfragtreelock");
4179 dirfragtreelock.dump(f);
4182 f->open_object_section("filelock");
4186 f->open_object_section("xattrlock");
4190 f->open_object_section("snaplock");
4194 f->open_object_section("nestlock");
4198 f->open_object_section("flocklock");
4202 f->open_object_section("policylock");
4206 f->open_array_section("states");
4207 MDSCacheObject::dump_states(f);
4208 if (state_test(STATE_EXPORTING))
4209 f->dump_string("state", "exporting");
4210 if (state_test(STATE_OPENINGDIR))
4211 f->dump_string("state", "openingdir");
4212 if (state_test(STATE_FREEZING))
4213 f->dump_string("state", "freezing");
4214 if (state_test(STATE_FROZEN))
4215 f->dump_string("state", "frozen");
4216 if (state_test(STATE_AMBIGUOUSAUTH))
4217 f->dump_string("state", "ambiguousauth");
4218 if (state_test(STATE_EXPORTINGCAPS))
4219 f->dump_string("state", "exportingcaps");
4220 if (state_test(STATE_NEEDSRECOVER))
4221 f->dump_string("state", "needsrecover");
4222 if (state_test(STATE_PURGING))
4223 f->dump_string("state", "purging");
4224 if (state_test(STATE_DIRTYPARENT))
4225 f->dump_string("state", "dirtyparent");
4226 if (state_test(STATE_DIRTYRSTAT))
4227 f->dump_string("state", "dirtyrstat");
4228 if (state_test(STATE_STRAYPINNED))
4229 f->dump_string("state", "straypinned");
4230 if (state_test(STATE_FROZENAUTHPIN))
4231 f->dump_string("state", "frozenauthpin");
4232 if (state_test(STATE_DIRTYPOOL))
4233 f->dump_string("state", "dirtypool");
4234 if (state_test(STATE_ORPHAN))
4235 f->dump_string("state", "orphan");
4236 if (state_test(STATE_MISSINGOBJS))
4237 f->dump_string("state", "missingobjs");
4240 f->open_array_section("client_caps");
4241 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
4242 it != client_caps.end(); ++it) {
4243 f->open_object_section("client_cap");
4244 f->dump_int("client_id", it->first.v);
4245 f->dump_string("pending", ccap_string(it->second->pending()));
4246 f->dump_string("issued", ccap_string(it->second->issued()));
4247 f->dump_string("wanted", ccap_string(it->second->wanted()));
4248 f->dump_string("last_sent", ccap_string(it->second->get_last_sent()));
4253 f->dump_int("loner", loner_cap.v);
4254 f->dump_int("want_loner", want_loner_cap.v);
4256 f->open_array_section("mds_caps_wanted");
4257 for (compact_map<int,int>::const_iterator p = mds_caps_wanted.begin();
4258 p != mds_caps_wanted.end(); ++p) {
4259 f->open_object_section("mds_cap_wanted");
4260 f->dump_int("rank", p->first);
4261 f->dump_string("cap", ccap_string(p->second));
4267 /****** Scrub Stuff *****/
4268 void CInode::scrub_info_create() const
4270 dout(25) << __func__ << dendl;
4271 assert(!scrub_infop);
4273 // break out of const-land to set up implicit initial state
4274 CInode *me = const_cast<CInode*>(this);
4275 inode_t *in = me->get_projected_inode();
4277 scrub_info_t *si = new scrub_info_t();
4278 si->scrub_start_stamp = si->last_scrub_stamp = in->last_scrub_stamp;
4279 si->scrub_start_version = si->last_scrub_version = in->last_scrub_version;
4281 me->scrub_infop = si;
4284 void CInode::scrub_maybe_delete_info()
4287 !scrub_infop->scrub_in_progress &&
4288 !scrub_infop->last_scrub_dirty) {
4294 void CInode::scrub_initialize(CDentry *scrub_parent,
4295 const ScrubHeaderRefConst& header,
4296 MDSInternalContextBase *f)
4298 dout(20) << __func__ << " with scrub_version " << get_version() << dendl;
4299 assert(!scrub_is_in_progress());
4302 scrub_infop = new scrub_info_t();
4304 if (get_projected_inode()->is_dir()) {
4305 // fill in dirfrag_stamps with initial state
4306 std::list<frag_t> frags;
4307 dirfragtree.get_leaves(frags);
4308 for (std::list<frag_t>::iterator i = frags.begin();
4311 if (header->get_force())
4312 scrub_infop->dirfrag_stamps[*i].reset();
4314 scrub_infop->dirfrag_stamps[*i];
4319 scrub_parent->get(CDentry::PIN_SCRUBPARENT);
4320 scrub_infop->scrub_parent = scrub_parent;
4321 scrub_infop->on_finish = f;
4322 scrub_infop->scrub_in_progress = true;
4323 scrub_infop->children_scrubbed = false;
4324 scrub_infop->header = header;
4326 scrub_infop->scrub_start_version = get_version();
4327 scrub_infop->scrub_start_stamp = ceph_clock_now();
4328 // right now we don't handle remote inodes
4331 int CInode::scrub_dirfrag_next(frag_t* out_dirfrag)
4333 dout(20) << __func__ << dendl;
4334 assert(scrub_is_in_progress());
4340 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4341 scrub_infop->dirfrag_stamps.begin();
4343 while (i != scrub_infop->dirfrag_stamps.end()) {
4344 if (i->second.scrub_start_version < scrub_infop->scrub_start_version) {
4345 i->second.scrub_start_version = get_projected_version();
4346 i->second.scrub_start_stamp = ceph_clock_now();
4347 *out_dirfrag = i->first;
4348 dout(20) << " return frag " << *out_dirfrag << dendl;
4354 dout(20) << " no frags left, ENOENT " << dendl;
4358 void CInode::scrub_dirfrags_scrubbing(list<frag_t>* out_dirfrags)
4360 assert(out_dirfrags != NULL);
4361 assert(scrub_infop != NULL);
4363 out_dirfrags->clear();
4364 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4365 scrub_infop->dirfrag_stamps.begin();
4367 while (i != scrub_infop->dirfrag_stamps.end()) {
4368 if (i->second.scrub_start_version >= scrub_infop->scrub_start_version) {
4369 if (i->second.last_scrub_version < scrub_infop->scrub_start_version)
4370 out_dirfrags->push_back(i->first);
4379 void CInode::scrub_dirfrag_finished(frag_t dirfrag)
4381 dout(20) << __func__ << " on frag " << dirfrag << dendl;
4382 assert(scrub_is_in_progress());
4384 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4385 scrub_infop->dirfrag_stamps.find(dirfrag);
4386 assert(i != scrub_infop->dirfrag_stamps.end());
4388 scrub_stamp_info_t &si = i->second;
4389 si.last_scrub_stamp = si.scrub_start_stamp;
4390 si.last_scrub_version = si.scrub_start_version;
4393 void CInode::scrub_finished(MDSInternalContextBase **c) {
4394 dout(20) << __func__ << dendl;
4395 assert(scrub_is_in_progress());
4396 for (std::map<frag_t, scrub_stamp_info_t>::iterator i =
4397 scrub_infop->dirfrag_stamps.begin();
4398 i != scrub_infop->dirfrag_stamps.end();
4400 if(i->second.last_scrub_version != i->second.scrub_start_version) {
4401 derr << i->second.last_scrub_version << " != "
4402 << i->second.scrub_start_version << dendl;
4404 assert(i->second.last_scrub_version == i->second.scrub_start_version);
4407 scrub_infop->last_scrub_version = scrub_infop->scrub_start_version;
4408 scrub_infop->last_scrub_stamp = scrub_infop->scrub_start_stamp;
4409 scrub_infop->last_scrub_dirty = true;
4410 scrub_infop->scrub_in_progress = false;
4412 if (scrub_infop->scrub_parent) {
4413 CDentry *dn = scrub_infop->scrub_parent;
4414 scrub_infop->scrub_parent = NULL;
4415 dn->dir->scrub_dentry_finished(dn);
4416 dn->put(CDentry::PIN_SCRUBPARENT);
4419 *c = scrub_infop->on_finish;
4420 scrub_infop->on_finish = NULL;
4422 if (scrub_infop->header->get_origin() == this) {
4423 // We are at the point that a tagging scrub was initiated
4424 LogChannelRef clog = mdcache->mds->clog;
4425 clog->info() << "scrub complete with tag '" << scrub_infop->header->get_tag() << "'";
4429 int64_t CInode::get_backtrace_pool() const
4432 return mdcache->mds->mdsmap->get_metadata_pool();
4434 // Files are required to have an explicit layout that specifies
4436 assert(inode.layout.pool_id != -1);
4437 return inode.layout.pool_id;
4441 void CInode::maybe_export_pin(bool update)
4443 if (!g_conf->mds_bal_export_pin)
4445 if (!is_dir() || !is_normal())
4448 mds_rank_t export_pin = get_export_pin(false);
4449 if (export_pin == MDS_RANK_NONE && !update)
4452 if (state_test(CInode::STATE_QUEUEDEXPORTPIN))
4456 for (auto p = dirfrags.begin(); p != dirfrags.end(); p++) {
4457 CDir *dir = p->second;
4458 if (!dir->is_auth())
4460 if (export_pin != MDS_RANK_NONE) {
4461 if (dir->is_subtree_root()) {
4462 // set auxsubtree bit or export it
4463 if (!dir->state_test(CDir::STATE_AUXSUBTREE) ||
4464 export_pin != dir->get_dir_auth().first)
4467 // create aux subtree or export it
4471 // clear aux subtrees ?
4472 queue = dir->state_test(CDir::STATE_AUXSUBTREE);
4475 state_set(CInode::STATE_QUEUEDEXPORTPIN);
4476 mdcache->export_pin_queue.insert(this);
4482 void CInode::set_export_pin(mds_rank_t rank)
4485 assert(is_projected());
4486 get_projected_inode()->export_pin = rank;
4487 maybe_export_pin(true);
4490 mds_rank_t CInode::get_export_pin(bool inherit) const
4492 /* An inode that is export pinned may not necessarily be a subtree root, we
4493 * need to traverse the parents. A base or system inode cannot be pinned.
4494 * N.B. inodes not yet linked into a dir (i.e. anonymous inodes) will not
4495 * have a parent yet.
4497 for (const CInode *in = this; !in->is_base() && !in->is_system() && in->get_projected_parent_dn(); in = in->get_projected_parent_dn()->dir->inode) {
4498 mds_rank_t pin = in->get_projected_inode()->export_pin;
4502 if (!inherit) break;
4504 return MDS_RANK_NONE;
4507 bool CInode::is_exportable(mds_rank_t dest) const
4509 mds_rank_t pin = get_export_pin();
4512 } else if (pin >= 0) {
4519 MEMPOOL_DEFINE_OBJECT_FACTORY(CInode, co_inode, mds_co);