1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
23 #include "MDSContext.h"
28 #include "events/EUpdate.h"
29 #include "events/EOpen.h"
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
34 #include "messages/MInodeFileCaps.h"
35 #include "messages/MLock.h"
36 #include "messages/MClientLease.h"
37 #include "messages/MClientReply.h"
38 #include "messages/MClientCaps.h"
39 #include "messages/MClientCapRelease.h"
41 #include "messages/MMDSSlaveRequest.h"
45 #include "common/config.h"
48 #define dout_subsys ceph_subsys_mds
50 #define dout_context g_ceph_context
51 #define dout_prefix _prefix(_dout, mds)
52 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
53 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
57 class LockerContext : public MDSInternalContextBase {
60 MDSRank *get_mds() override
66 explicit LockerContext(Locker *locker_) : locker(locker_) {
67 assert(locker != NULL);
71 class LockerLogContext : public MDSLogContextBase {
74 MDSRank *get_mds() override
80 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
81 assert(locker != NULL);
85 /* This function DOES put the passed message before returning */
86 void Locker::dispatch(Message *m)
89 switch (m->get_type()) {
93 handle_lock(static_cast<MLock*>(m));
96 case MSG_MDS_INODEFILECAPS:
97 handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
101 case CEPH_MSG_CLIENT_CAPS:
102 handle_client_caps(static_cast<MClientCaps*>(m));
105 case CEPH_MSG_CLIENT_CAPRELEASE:
106 handle_client_cap_release(static_cast<MClientCapRelease*>(m));
108 case CEPH_MSG_CLIENT_LEASE:
109 handle_client_lease(static_cast<MClientLease*>(m));
113 derr << "locker unknown message " << m->get_type() << dendl;
114 assert(0 == "locker unknown message");
131 void Locker::send_lock_message(SimpleLock *lock, int msg)
133 for (const auto &it : lock->get_parent()->get_replicas()) {
134 if (mds->is_cluster_degraded() &&
135 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
137 MLock *m = new MLock(lock, msg, mds->get_nodeid());
138 mds->send_message_mds(m, it.first);
142 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
144 for (const auto &it : lock->get_parent()->get_replicas()) {
145 if (mds->is_cluster_degraded() &&
146 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
148 MLock *m = new MLock(lock, msg, mds->get_nodeid());
150 mds->send_message_mds(m, it.first);
157 void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
159 // rdlock ancestor snaps
161 rdlocks.insert(&in->snaplock);
162 while (t->get_projected_parent_dn()) {
163 t = t->get_projected_parent_dn()->get_dir()->get_inode();
164 rdlocks.insert(&t->snaplock);
168 void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
169 file_layout_t **layout)
171 //rdlock ancestor snaps
173 rdlocks.insert(&in->snaplock);
174 rdlocks.insert(&in->policylock);
175 bool found_layout = false;
177 rdlocks.insert(&t->snaplock);
179 rdlocks.insert(&t->policylock);
180 if (t->get_projected_inode()->has_layout()) {
181 *layout = &t->get_projected_inode()->layout;
185 if (t->get_projected_parent_dn() &&
186 t->get_projected_parent_dn()->get_dir())
187 t = t->get_projected_parent_dn()->get_dir()->get_inode();
192 struct MarkEventOnDestruct {
196 MarkEventOnDestruct(MDRequestRef& _mdr,
197 const char *_message) : mdr(_mdr),
200 ~MarkEventOnDestruct() {
202 mdr->mark_event(message);
206 /* If this function returns false, the mdr has been placed
207 * on the appropriate wait list */
208 bool Locker::acquire_locks(MDRequestRef& mdr,
209 set<SimpleLock*> &rdlocks,
210 set<SimpleLock*> &wrlocks,
211 set<SimpleLock*> &xlocks,
212 map<SimpleLock*,mds_rank_t> *remote_wrlocks,
213 CInode *auth_pin_freeze,
214 bool auth_pin_nonblock)
216 if (mdr->done_locking &&
217 !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
218 dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
219 return true; // at least we had better be!
221 dout(10) << "acquire_locks " << *mdr << dendl;
223 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
225 client_t client = mdr->get_client();
227 set<SimpleLock*, SimpleLock::ptr_lt> sorted; // sort everything we will lock
228 set<MDSCacheObject*> mustpin; // items to authpin
231 for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
232 dout(20) << " must xlock " << **p << " " << *(*p)->get_parent() << dendl;
234 mustpin.insert((*p)->get_parent());
236 // augment xlock with a versionlock?
237 if ((*p)->get_type() == CEPH_LOCK_DN) {
238 CDentry *dn = (CDentry*)(*p)->get_parent();
242 if (xlocks.count(&dn->versionlock))
243 continue; // we're xlocking the versionlock too; don't wrlock it!
245 if (mdr->is_master()) {
246 // master. wrlock versionlock so we can pipeline dentry updates to journal.
247 wrlocks.insert(&dn->versionlock);
249 // slave. exclusively lock the dentry version (i.e. block other journal updates).
250 // this makes rollback safe.
251 xlocks.insert(&dn->versionlock);
252 sorted.insert(&dn->versionlock);
255 if ((*p)->get_type() > CEPH_LOCK_IVERSION) {
256 // inode version lock?
257 CInode *in = (CInode*)(*p)->get_parent();
260 if (mdr->is_master()) {
261 // master. wrlock versionlock so we can pipeline inode updates to journal.
262 wrlocks.insert(&in->versionlock);
264 // slave. exclusively lock the inode version (i.e. block other journal updates).
265 // this makes rollback safe.
266 xlocks.insert(&in->versionlock);
267 sorted.insert(&in->versionlock);
273 for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
274 MDSCacheObject *object = (*p)->get_parent();
275 dout(20) << " must wrlock " << **p << " " << *object << dendl;
277 if (object->is_auth())
278 mustpin.insert(object);
279 else if (!object->is_auth() &&
280 !(*p)->can_wrlock(client) && // we might have to request a scatter
281 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
282 dout(15) << " will also auth_pin " << *object
283 << " in case we need to request a scatter" << dendl;
284 mustpin.insert(object);
289 if (remote_wrlocks) {
290 for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
291 MDSCacheObject *object = p->first->get_parent();
292 dout(20) << " must remote_wrlock on mds." << p->second << " "
293 << *p->first << " " << *object << dendl;
294 sorted.insert(p->first);
295 mustpin.insert(object);
300 for (set<SimpleLock*>::iterator p = rdlocks.begin();
303 MDSCacheObject *object = (*p)->get_parent();
304 dout(20) << " must rdlock " << **p << " " << *object << dendl;
306 if (object->is_auth())
307 mustpin.insert(object);
308 else if (!object->is_auth() &&
309 !(*p)->can_rdlock(client)) { // we might have to request an rdlock
310 dout(15) << " will also auth_pin " << *object
311 << " in case we need to request a rdlock" << dendl;
312 mustpin.insert(object);
318 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
320 // can i auth pin them all now?
321 marker.message = "failed to authpin local pins";
322 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
325 MDSCacheObject *object = *p;
327 dout(10) << " must authpin " << *object << dendl;
329 if (mdr->is_auth_pinned(object)) {
330 if (object != (MDSCacheObject*)auth_pin_freeze)
332 if (mdr->more()->is_remote_frozen_authpin) {
333 if (mdr->more()->rename_inode == auth_pin_freeze)
335 // unfreeze auth pin for the wrong inode
336 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
340 if (!object->is_auth()) {
341 if (!mdr->locks.empty())
342 drop_locks(mdr.get());
343 if (object->is_ambiguous_auth()) {
345 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
346 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
347 mdr->drop_local_auth_pins();
350 mustpin_remote[object->authority().first].insert(object);
353 if (!object->can_auth_pin()) {
355 drop_locks(mdr.get());
356 mdr->drop_local_auth_pins();
357 if (auth_pin_nonblock) {
358 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
362 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
363 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
365 if (!mdr->remote_auth_pins.empty())
366 notify_freeze_waiter(object);
372 // ok, grab local auth pins
373 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
376 MDSCacheObject *object = *p;
377 if (mdr->is_auth_pinned(object)) {
378 dout(10) << " already auth_pinned " << *object << dendl;
379 } else if (object->is_auth()) {
380 dout(10) << " auth_pinning " << *object << dendl;
381 mdr->auth_pin(object);
385 // request remote auth_pins
386 if (!mustpin_remote.empty()) {
387 marker.message = "requesting remote authpins";
388 for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
389 p != mdr->remote_auth_pins.end();
391 if (mustpin.count(p->first)) {
392 assert(p->second == p->first->authority().first);
393 map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
394 if (q != mustpin_remote.end())
395 q->second.insert(p->first);
398 for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
399 p != mustpin_remote.end();
401 dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
403 // wait for active auth
404 if (mds->is_cluster_degraded() &&
405 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
406 dout(10) << " mds." << p->first << " is not active" << dendl;
407 if (mdr->more()->waiting_on_slave.empty())
408 mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
412 MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
413 MMDSSlaveRequest::OP_AUTHPIN);
414 for (set<MDSCacheObject*>::iterator q = p->second.begin();
415 q != p->second.end();
417 dout(10) << " req remote auth_pin of " << **q << dendl;
418 MDSCacheObjectInfo info;
419 (*q)->set_object_info(info);
420 req->get_authpins().push_back(info);
421 if (*q == auth_pin_freeze)
422 (*q)->set_object_info(req->get_authpin_freeze());
425 if (auth_pin_nonblock)
426 req->mark_nonblock();
427 mds->send_message_mds(req, p->first);
429 // put in waiting list
430 assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
431 mdr->more()->waiting_on_slave.insert(p->first);
436 // caps i'll need to issue
437 set<CInode*> issue_set;
441 // make sure they match currently acquired locks.
442 set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
443 for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
446 bool need_wrlock = !!wrlocks.count(*p);
447 bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
450 if (existing != mdr->locks.end() && *existing == *p) {
452 SimpleLock *have = *existing;
454 if (xlocks.count(have) && mdr->xlocks.count(have)) {
455 dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
458 if (mdr->remote_wrlocks.count(have)) {
459 if (!need_remote_wrlock ||
460 mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
461 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
462 << " " << *have << " " << *have->get_parent() << dendl;
463 remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
466 if (need_wrlock || need_remote_wrlock) {
467 if (need_wrlock == !!mdr->wrlocks.count(have) &&
468 need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
470 dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
471 if (need_remote_wrlock)
472 dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
476 if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
477 dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
482 // hose any stray locks
483 if (existing != mdr->locks.end() && *existing == *p) {
484 assert(need_wrlock || need_remote_wrlock);
485 SimpleLock *lock = *existing;
486 if (mdr->wrlocks.count(lock)) {
488 dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
489 else if (need_remote_wrlock) // acquire remote_wrlock first
490 dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
491 bool need_issue = false;
492 wrlock_finish(lock, mdr.get(), &need_issue);
494 issue_set.insert(static_cast<CInode*>(lock->get_parent()));
498 while (existing != mdr->locks.end()) {
499 SimpleLock *stray = *existing;
501 dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
502 bool need_issue = false;
503 if (mdr->xlocks.count(stray)) {
504 xlock_finish(stray, mdr.get(), &need_issue);
505 } else if (mdr->rdlocks.count(stray)) {
506 rdlock_finish(stray, mdr.get(), &need_issue);
508 // may have acquired both wrlock and remore wrlock
509 if (mdr->wrlocks.count(stray))
510 wrlock_finish(stray, mdr.get(), &need_issue);
511 if (mdr->remote_wrlocks.count(stray))
512 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
515 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
519 if (mdr->locking && *p != mdr->locking) {
520 cancel_locking(mdr.get(), &issue_set);
522 if (xlocks.count(*p)) {
523 marker.message = "failed to xlock, waiting";
524 if (!xlock_start(*p, mdr))
526 dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
527 } else if (need_wrlock || need_remote_wrlock) {
528 if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
529 marker.message = "waiting for remote wrlocks";
530 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
533 if (need_wrlock && !mdr->wrlocks.count(*p)) {
534 marker.message = "failed to wrlock, waiting";
535 if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
536 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
537 // can't take the wrlock because the scatter lock is gathering. need to
538 // release the remote wrlock, so that the gathering process can finish.
539 remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
540 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
543 // nowait if we have already gotten remote wrlock
544 if (!wrlock_start(*p, mdr, need_remote_wrlock))
546 dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
549 assert(mdr->is_master());
550 if ((*p)->is_scatterlock()) {
551 ScatterLock *slock = static_cast<ScatterLock *>(*p);
552 if (slock->is_rejoin_mix()) {
553 // If there is a recovering mds who replcated an object when it failed
554 // and scatterlock in the object was in MIX state, It's possible that
555 // the recovering mds needs to take wrlock on the scatterlock when it
556 // replays unsafe requests. So this mds should delay taking rdlock on
557 // the scatterlock until the recovering mds finishes replaying unsafe.
558 // Otherwise unsafe requests may get replayed after current request.
561 // The recovering mds is auth mds of a dirfrag, this mds is auth mds
562 // of correspinding inode. when 'rm -rf' the direcotry, this mds should
563 // delay the rmdir request until the recovering mds has replayed unlink
565 if (mds->is_cluster_degraded()) {
566 if (!mdr->is_replay()) {
567 drop_locks(mdr.get());
568 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
569 dout(10) << " rejoin mix scatterlock " << *slock << " " << *(*p)->get_parent()
570 << ", waiting for cluster recovered" << dendl;
571 marker.message = "rejoin mix scatterlock, waiting for cluster recovered";
575 slock->clear_rejoin_mix();
580 marker.message = "failed to rdlock, waiting";
581 if (!rdlock_start(*p, mdr))
583 dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
587 // any extra unneeded locks?
588 while (existing != mdr->locks.end()) {
589 SimpleLock *stray = *existing;
591 dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
592 bool need_issue = false;
593 if (mdr->xlocks.count(stray)) {
594 xlock_finish(stray, mdr.get(), &need_issue);
595 } else if (mdr->rdlocks.count(stray)) {
596 rdlock_finish(stray, mdr.get(), &need_issue);
598 // may have acquired both wrlock and remore wrlock
599 if (mdr->wrlocks.count(stray))
600 wrlock_finish(stray, mdr.get(), &need_issue);
601 if (mdr->remote_wrlocks.count(stray))
602 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
605 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
608 mdr->done_locking = true;
609 mdr->set_mds_stamp(ceph_clock_now());
611 marker.message = "acquired locks";
614 issue_caps_set(issue_set);
618 void Locker::notify_freeze_waiter(MDSCacheObject *o)
621 if (CInode *in = dynamic_cast<CInode*>(o)) {
623 dir = in->get_parent_dir();
624 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
627 dir = dynamic_cast<CDir*>(o);
631 if (dir->is_freezing_dir())
632 mdcache->fragment_freeze_inc_num_waiters(dir);
633 if (dir->is_freezing_tree()) {
634 while (!dir->is_freezing_tree_root())
635 dir = dir->get_parent_dir();
636 mdcache->migrator->export_freeze_inc_num_waiters(dir);
641 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
643 for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
644 p != mut->xlocks.end();
646 MDSCacheObject *object = (*p)->get_parent();
647 assert(object->is_auth());
649 ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
651 dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
652 (*p)->set_xlock_done();
656 void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
658 while (!mut->rdlocks.empty()) {
660 MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
661 rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
663 pneed_issue->insert(static_cast<CInode*>(p));
667 void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
669 set<mds_rank_t> slaves;
671 while (!mut->xlocks.empty()) {
672 SimpleLock *lock = *mut->xlocks.begin();
673 MDSCacheObject *p = lock->get_parent();
675 assert(lock->get_sm()->can_remote_xlock);
676 slaves.insert(p->authority().first);
678 mut->locks.erase(lock);
679 mut->xlocks.erase(lock);
683 xlock_finish(lock, mut, &ni);
685 pneed_issue->insert(static_cast<CInode*>(p));
688 while (!mut->remote_wrlocks.empty()) {
689 map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
690 slaves.insert(p->second);
691 if (mut->wrlocks.count(p->first) == 0)
692 mut->locks.erase(p->first);
693 mut->remote_wrlocks.erase(p);
696 while (!mut->wrlocks.empty()) {
698 MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
699 wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
701 pneed_issue->insert(static_cast<CInode*>(p));
704 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
705 if (!mds->is_cluster_degraded() ||
706 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
707 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
708 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
709 MMDSSlaveRequest::OP_DROPLOCKS);
710 mds->send_message_mds(slavereq, *p);
715 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
717 SimpleLock *lock = mut->locking;
719 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
721 if (lock->get_parent()->is_auth()) {
722 bool need_issue = false;
723 if (lock->get_state() == LOCK_PREXLOCK) {
724 _finish_xlock(lock, -1, &need_issue);
725 } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
726 lock->get_num_xlocks() == 0) {
727 lock->set_state(LOCK_XLOCKDONE);
728 eval_gather(lock, true, &need_issue);
731 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
733 mut->finish_locking(lock);
736 void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
739 set<CInode*> my_need_issue;
741 pneed_issue = &my_need_issue;
744 cancel_locking(mut, pneed_issue);
745 _drop_non_rdlocks(mut, pneed_issue);
746 _drop_rdlocks(mut, pneed_issue);
748 if (pneed_issue == &my_need_issue)
749 issue_caps_set(*pneed_issue);
750 mut->done_locking = false;
753 void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
755 set<CInode*> my_need_issue;
757 pneed_issue = &my_need_issue;
759 _drop_non_rdlocks(mut, pneed_issue);
761 if (pneed_issue == &my_need_issue)
762 issue_caps_set(*pneed_issue);
765 void Locker::drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
767 set<CInode*> my_need_issue;
769 pneed_issue = &my_need_issue;
771 _drop_rdlocks(mut, pneed_issue);
773 if (pneed_issue == &my_need_issue)
774 issue_caps_set(*pneed_issue);
780 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
782 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
783 assert(!lock->is_stable());
785 int next = lock->get_next_state();
788 bool caps = lock->get_cap_shift();
789 if (lock->get_type() != CEPH_LOCK_DN)
790 in = static_cast<CInode *>(lock->get_parent());
792 bool need_issue = false;
794 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
795 assert(!caps || in != NULL);
796 if (caps && in->is_head()) {
797 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
798 lock->get_cap_shift(), lock->get_cap_mask());
799 dout(10) << " next state is " << lock->get_state_name(next)
800 << " issued/allows loner " << gcap_string(loner_issued)
801 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
802 << " xlocker " << gcap_string(xlocker_issued)
803 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
804 << " other " << gcap_string(other_issued)
805 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
808 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
809 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
810 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
814 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
815 bool auth = lock->get_parent()->is_auth();
816 if (!lock->is_gathering() &&
817 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
818 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
819 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
820 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
821 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
822 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
823 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
824 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
825 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
826 lock->get_state() != LOCK_MIX_SYNC2
828 dout(7) << "eval_gather finished gather on " << *lock
829 << " on " << *lock->get_parent() << dendl;
831 if (lock->get_sm() == &sm_filelock) {
833 if (in->state_test(CInode::STATE_RECOVERING)) {
834 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
836 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
837 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
838 mds->mdcache->queue_file_recover(in);
839 mds->mdcache->do_file_recover();
844 if (!lock->get_parent()->is_auth()) {
845 // replica: tell auth
846 mds_rank_t auth = lock->get_parent()->authority().first;
848 if (lock->get_parent()->is_rejoining() &&
849 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
850 dout(7) << "eval_gather finished gather, but still rejoining "
851 << *lock->get_parent() << dendl;
855 if (!mds->is_cluster_degraded() ||
856 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
857 switch (lock->get_state()) {
859 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
865 MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
866 lock->encode_locked_state(reply->get_data());
867 mds->send_message_mds(reply, auth);
868 next = LOCK_MIX_SYNC2;
869 (static_cast<ScatterLock *>(lock))->start_flush();
874 (static_cast<ScatterLock *>(lock))->finish_flush();
875 (static_cast<ScatterLock *>(lock))->clear_flushed();
878 // do nothing, we already acked
883 MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
884 mds->send_message_mds(reply, auth);
885 next = LOCK_SYNC_MIX2;
892 lock->encode_locked_state(data);
893 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
894 (static_cast<ScatterLock *>(lock))->start_flush();
895 // we'll get an AC_LOCKFLUSHED to complete
906 // once the first (local) stage of mix->lock gather complete we can
907 // gather from replicas
908 if (lock->get_state() == LOCK_MIX_LOCK &&
909 lock->get_parent()->is_replicated()) {
910 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
911 send_lock_message(lock, LOCK_AC_LOCK);
913 lock->set_state(LOCK_MIX_LOCK2);
917 if (lock->is_dirty() && !lock->is_flushed()) {
918 scatter_writebehind(static_cast<ScatterLock *>(lock));
922 lock->clear_flushed();
924 switch (lock->get_state()) {
929 in->start_scatter(static_cast<ScatterLock *>(lock));
930 if (lock->get_parent()->is_replicated()) {
932 lock->encode_locked_state(softdata);
933 send_lock_message(lock, LOCK_AC_MIX, softdata);
935 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
940 if (next != LOCK_SYNC)
949 if (lock->get_parent()->is_replicated()) {
951 lock->encode_locked_state(softdata);
952 send_lock_message(lock, LOCK_AC_SYNC, softdata);
959 lock->set_state(next);
961 if (lock->get_parent()->is_auth() &&
963 lock->get_parent()->auth_unpin(lock);
965 // drop loner before doing waiters
969 in->get_wanted_loner() != in->get_loner()) {
970 dout(10) << " trying to drop loner" << dendl;
971 if (in->try_drop_loner()) {
972 dout(10) << " dropped loner" << dendl;
978 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
981 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
983 if (caps && in->is_head())
986 if (lock->get_parent()->is_auth() &&
988 try_eval(lock, &need_issue);
994 else if (in->is_head())
1000 bool Locker::eval(CInode *in, int mask, bool caps_imported)
1002 bool need_issue = caps_imported;
1003 list<MDSInternalContextBase*> finishers;
1005 dout(10) << "eval " << mask << " " << *in << dendl;
1008 if (in->is_auth() && in->is_head()) {
1009 if (in->choose_ideal_loner() >= 0) {
1010 if (in->try_set_loner()) {
1011 dout(10) << "eval set loner to client." << in->get_loner() << dendl;
1015 dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1017 dout(10) << "eval doesn't want loner" << dendl;
1021 if (mask & CEPH_LOCK_IFILE)
1022 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1023 if (mask & CEPH_LOCK_IAUTH)
1024 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1025 if (mask & CEPH_LOCK_ILINK)
1026 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1027 if (mask & CEPH_LOCK_IXATTR)
1028 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1029 if (mask & CEPH_LOCK_INEST)
1030 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1031 if (mask & CEPH_LOCK_IFLOCK)
1032 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1033 if (mask & CEPH_LOCK_IPOLICY)
1034 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1037 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1038 dout(10) << " trying to drop loner" << dendl;
1039 if (in->try_drop_loner()) {
1040 dout(10) << " dropped loner" << dendl;
1043 if (in->get_wanted_loner() >= 0) {
1044 if (in->try_set_loner()) {
1045 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1049 dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1055 finish_contexts(g_ceph_context, finishers);
1057 if (need_issue && in->is_head())
1060 dout(10) << "eval done" << dendl;
1064 class C_Locker_Eval : public LockerContext {
1068 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1069 // We are used as an MDSCacheObject waiter, so should
1070 // only be invoked by someone already holding the big lock.
1071 assert(locker->mds->mds_lock.is_locked_by_me());
1072 p->get(MDSCacheObject::PIN_PTRWAITER);
1074 void finish(int r) override {
1075 locker->try_eval(p, mask);
1076 p->put(MDSCacheObject::PIN_PTRWAITER);
1080 void Locker::try_eval(MDSCacheObject *p, int mask)
1082 // unstable and ambiguous auth?
1083 if (p->is_ambiguous_auth()) {
1084 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1085 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1089 if (p->is_auth() && p->is_frozen()) {
1090 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1091 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1095 if (mask & CEPH_LOCK_DN) {
1096 assert(mask == CEPH_LOCK_DN);
1097 bool need_issue = false; // ignore this, no caps on dentries
1098 CDentry *dn = static_cast<CDentry *>(p);
1099 eval_any(&dn->lock, &need_issue);
1101 CInode *in = static_cast<CInode *>(p);
1106 void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1108 MDSCacheObject *p = lock->get_parent();
1110 // unstable and ambiguous auth?
1111 if (p->is_ambiguous_auth()) {
1112 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1113 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1117 if (!p->is_auth()) {
1118 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1122 if (p->is_frozen()) {
1123 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1124 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1129 * We could have a situation like:
1131 * - mds A authpins item on mds B
1132 * - mds B starts to freeze tree containing item
1133 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1134 * - mds B lock is unstable, sets scatter_wanted
1135 * - mds B lock stabilizes, calls try_eval.
1137 * We can defer while freezing without causing a deadlock. Honor
1138 * scatter_wanted flag here. This will never get deferred by the
1139 * checks above due to the auth_pin held by the master.
1141 if (lock->is_scatterlock()) {
1142 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1143 if (slock->get_scatter_wanted() &&
1144 slock->get_state() != LOCK_MIX) {
1145 scatter_mix(slock, pneed_issue);
1146 if (!lock->is_stable())
1148 } else if (slock->get_unscatter_wanted() &&
1149 slock->get_state() != LOCK_LOCK) {
1150 simple_lock(slock, pneed_issue);
1151 if (!lock->is_stable()) {
1157 if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
1158 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1159 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1163 eval(lock, pneed_issue);
1166 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1168 bool need_issue = false;
1169 list<MDSInternalContextBase*> finishers;
1172 if (!in->filelock.is_stable())
1173 eval_gather(&in->filelock, false, &need_issue, &finishers);
1174 if (!in->authlock.is_stable())
1175 eval_gather(&in->authlock, false, &need_issue, &finishers);
1176 if (!in->linklock.is_stable())
1177 eval_gather(&in->linklock, false, &need_issue, &finishers);
1178 if (!in->xattrlock.is_stable())
1179 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1181 if (need_issue && in->is_head()) {
1183 issue_set->insert(in);
1188 finish_contexts(g_ceph_context, finishers);
1191 void Locker::eval_scatter_gathers(CInode *in)
1193 bool need_issue = false;
1194 list<MDSInternalContextBase*> finishers;
1196 dout(10) << "eval_scatter_gathers " << *in << dendl;
1199 if (!in->filelock.is_stable())
1200 eval_gather(&in->filelock, false, &need_issue, &finishers);
1201 if (!in->nestlock.is_stable())
1202 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1203 if (!in->dirfragtreelock.is_stable())
1204 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1206 if (need_issue && in->is_head())
1209 finish_contexts(g_ceph_context, finishers);
1212 void Locker::eval(SimpleLock *lock, bool *need_issue)
1214 switch (lock->get_type()) {
1215 case CEPH_LOCK_IFILE:
1216 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1217 case CEPH_LOCK_IDFT:
1218 case CEPH_LOCK_INEST:
1219 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1221 return simple_eval(lock, need_issue);
1226 // ------------------
1229 bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1232 if (lock->is_stable()) {
1233 if (lock->get_parent()->is_auth()) {
1234 if (lock->get_sm() == &sm_scatterlock) {
1235 // not until tempsync is fully implemented
1236 //if (lock->get_parent()->is_replicated())
1237 //scatter_tempsync((ScatterLock*)lock);
1240 } else if (lock->get_sm() == &sm_filelock) {
1241 CInode *in = static_cast<CInode*>(lock->get_parent());
1242 if (lock->get_state() == LOCK_EXCL &&
1243 in->get_target_loner() >= 0 &&
1244 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1252 // request rdlock state change from auth
1253 mds_rank_t auth = lock->get_parent()->authority().first;
1254 if (!mds->is_cluster_degraded() ||
1255 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1256 dout(10) << "requesting rdlock from auth on "
1257 << *lock << " on " << *lock->get_parent() << dendl;
1258 mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1263 if (lock->get_type() == CEPH_LOCK_IFILE) {
1264 CInode *in = static_cast<CInode *>(lock->get_parent());
1265 if (in->state_test(CInode::STATE_RECOVERING)) {
1266 mds->mdcache->recovery_queue.prioritize(in);
1273 bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
1275 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1277 // can read? grab ref.
1278 if (lock->can_rdlock(client))
1281 _rdlock_kick(lock, false);
1283 if (lock->can_rdlock(client))
1288 dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1289 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1294 bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1296 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1298 // client may be allowed to rdlock the same item it has xlocked.
1299 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1300 if (mut->snapid != CEPH_NOSNAP)
1302 client_t client = as_anon ? -1 : mut->get_client();
1305 if (lock->get_type() != CEPH_LOCK_DN)
1306 in = static_cast<CInode *>(lock->get_parent());
1309 if (!lock->get_parent()->is_auth() &&
1310 lock->fw_rdlock_to_auth()) {
1311 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1317 // can read? grab ref.
1318 if (lock->can_rdlock(client)) {
1320 mut->rdlocks.insert(lock);
1321 mut->locks.insert(lock);
1325 // hmm, wait a second.
1326 if (in && !in->is_head() && in->is_auth() &&
1327 lock->get_state() == LOCK_SNAP_SYNC) {
1328 // okay, we actually need to kick the head's lock to get ourselves synced up.
1329 CInode *head = mdcache->get_inode(in->ino());
1331 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1332 if (hlock->get_state() == LOCK_SYNC)
1333 hlock = head->get_lock(lock->get_type());
1335 if (hlock->get_state() != LOCK_SYNC) {
1336 dout(10) << "rdlock_start trying head inode " << *head << dendl;
1337 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
1339 // oh, check our lock again then
1343 if (!_rdlock_kick(lock, as_anon))
1349 if (lock->get_parent()->is_auth() && lock->is_stable())
1350 wait_on = SimpleLock::WAIT_RD;
1352 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1353 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1354 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1359 void Locker::nudge_log(SimpleLock *lock)
1361 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1362 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1363 mds->mdlog->flush();
1366 void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1371 mut->rdlocks.erase(lock);
1372 mut->locks.erase(lock);
1375 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1378 if (!lock->is_rdlocked()) {
1379 if (!lock->is_stable())
1380 eval_gather(lock, false, pneed_issue);
1381 else if (lock->get_parent()->is_auth())
1382 try_eval(lock, pneed_issue);
1387 bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
1389 dout(10) << "can_rdlock_set " << locks << dendl;
1390 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1391 if (!(*p)->can_rdlock(-1)) {
1392 dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1398 bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
1400 dout(10) << "rdlock_try_set " << locks << dendl;
1401 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1402 if (!rdlock_try(*p, -1, NULL)) {
1403 dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1409 void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
1411 dout(10) << "rdlock_take_set " << locks << dendl;
1412 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
1414 mut->rdlocks.insert(*p);
1415 mut->locks.insert(*p);
1419 // ------------------
1422 void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1424 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1425 lock->get_type() == CEPH_LOCK_DVERSION)
1426 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1428 dout(7) << "wrlock_force on " << *lock
1429 << " on " << *lock->get_parent() << dendl;
1430 lock->get_wrlock(true);
1431 mut->wrlocks.insert(lock);
1432 mut->locks.insert(lock);
1435 bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1437 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1438 lock->get_type() == CEPH_LOCK_DVERSION)
1439 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1441 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1443 CInode *in = static_cast<CInode *>(lock->get_parent());
1444 client_t client = mut->get_client();
1445 bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1446 (in->has_subtree_or_exporting_dirfrag() ||
1447 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1451 if (lock->can_wrlock(client) &&
1452 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1454 mut->wrlocks.insert(lock);
1455 mut->locks.insert(lock);
1459 if (lock->get_type() == CEPH_LOCK_IFILE &&
1460 in->state_test(CInode::STATE_RECOVERING)) {
1461 mds->mdcache->recovery_queue.prioritize(in);
1464 if (!lock->is_stable())
1467 if (in->is_auth()) {
1468 // don't do nested lock state change if we have dirty scatterdata and
1469 // may scatter_writebehind or start_scatter, because nowait==true implies
1470 // that the caller already has a log entry open!
1471 if (nowait && lock->is_dirty())
1475 scatter_mix(static_cast<ScatterLock*>(lock));
1479 if (nowait && !lock->can_wrlock(client))
1484 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1485 mds_rank_t auth = lock->get_parent()->authority().first;
1486 if (!mds->is_cluster_degraded() ||
1487 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1488 dout(10) << "requesting scatter from auth on "
1489 << *lock << " on " << *lock->get_parent() << dendl;
1490 mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1497 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1498 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1505 void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1507 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1508 lock->get_type() == CEPH_LOCK_DVERSION)
1509 return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
1511 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1514 mut->wrlocks.erase(lock);
1515 if (mut->remote_wrlocks.count(lock) == 0)
1516 mut->locks.erase(lock);
1519 if (!lock->is_wrlocked()) {
1520 if (!lock->is_stable())
1521 eval_gather(lock, false, pneed_issue);
1522 else if (lock->get_parent()->is_auth())
1523 try_eval(lock, pneed_issue);
1530 void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1532 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1534 // wait for active target
1535 if (mds->is_cluster_degraded() &&
1536 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1537 dout(7) << " mds." << target << " is not active" << dendl;
1538 if (mut->more()->waiting_on_slave.empty())
1539 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1543 // send lock request
1544 mut->start_locking(lock, target);
1545 mut->more()->slaves.insert(target);
1546 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1547 MMDSSlaveRequest::OP_WRLOCK);
1548 r->set_lock_type(lock->get_type());
1549 lock->get_parent()->set_object_info(r->get_object_info());
1550 mds->send_message_mds(r, target);
1552 assert(mut->more()->waiting_on_slave.count(target) == 0);
1553 mut->more()->waiting_on_slave.insert(target);
1556 void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
1560 mut->remote_wrlocks.erase(lock);
1561 if (mut->wrlocks.count(lock) == 0)
1562 mut->locks.erase(lock);
1564 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1565 << " " << *lock->get_parent() << dendl;
1566 if (!mds->is_cluster_degraded() ||
1567 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1568 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1569 MMDSSlaveRequest::OP_UNWRLOCK);
1570 slavereq->set_lock_type(lock->get_type());
1571 lock->get_parent()->set_object_info(slavereq->get_object_info());
1572 mds->send_message_mds(slavereq, target);
1577 // ------------------
1580 bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1582 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1583 lock->get_type() == CEPH_LOCK_DVERSION)
1584 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1586 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1587 client_t client = mut->get_client();
1590 if (lock->get_parent()->is_auth()) {
1593 if (lock->can_xlock(client)) {
1594 lock->set_state(LOCK_XLOCK);
1595 lock->get_xlock(mut, client);
1596 mut->xlocks.insert(lock);
1597 mut->locks.insert(lock);
1598 mut->finish_locking(lock);
1602 if (lock->get_type() == CEPH_LOCK_IFILE) {
1603 CInode *in = static_cast<CInode*>(lock->get_parent());
1604 if (in->state_test(CInode::STATE_RECOVERING)) {
1605 mds->mdcache->recovery_queue.prioritize(in);
1609 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1610 lock->get_xlock_by_client() != client ||
1611 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1614 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1615 mut->start_locking(lock);
1622 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1627 assert(lock->get_sm()->can_remote_xlock);
1628 assert(!mut->slave_request);
1630 // wait for single auth
1631 if (lock->get_parent()->is_ambiguous_auth()) {
1632 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1633 new C_MDS_RetryRequest(mdcache, mut));
1637 // wait for active auth
1638 mds_rank_t auth = lock->get_parent()->authority().first;
1639 if (mds->is_cluster_degraded() &&
1640 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1641 dout(7) << " mds." << auth << " is not active" << dendl;
1642 if (mut->more()->waiting_on_slave.empty())
1643 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1647 // send lock request
1648 mut->more()->slaves.insert(auth);
1649 mut->start_locking(lock, auth);
1650 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1651 MMDSSlaveRequest::OP_XLOCK);
1652 r->set_lock_type(lock->get_type());
1653 lock->get_parent()->set_object_info(r->get_object_info());
1654 mds->send_message_mds(r, auth);
1656 assert(mut->more()->waiting_on_slave.count(auth) == 0);
1657 mut->more()->waiting_on_slave.insert(auth);
1663 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1665 assert(!lock->is_stable());
1666 if (lock->get_num_rdlocks() == 0 &&
1667 lock->get_num_wrlocks() == 0 &&
1668 lock->get_num_client_lease() == 0 &&
1669 lock->get_state() != LOCK_XLOCKSNAP &&
1670 lock->get_type() != CEPH_LOCK_DN) {
1671 CInode *in = static_cast<CInode*>(lock->get_parent());
1672 client_t loner = in->get_target_loner();
1673 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1674 lock->set_state(LOCK_EXCL);
1675 lock->get_parent()->auth_unpin(lock);
1676 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1677 if (lock->get_cap_shift())
1678 *pneed_issue = true;
1679 if (lock->get_parent()->is_auth() &&
1681 try_eval(lock, pneed_issue);
1685 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1686 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1689 void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1691 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1692 lock->get_type() == CEPH_LOCK_DVERSION)
1693 return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
1695 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1697 client_t xlocker = lock->get_xlock_by_client();
1702 mut->xlocks.erase(lock);
1703 mut->locks.erase(lock);
1705 bool do_issue = false;
1708 if (!lock->get_parent()->is_auth()) {
1709 assert(lock->get_sm()->can_remote_xlock);
1712 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
1713 mds_rank_t auth = lock->get_parent()->authority().first;
1714 if (!mds->is_cluster_degraded() ||
1715 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1716 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1717 MMDSSlaveRequest::OP_UNXLOCK);
1718 slavereq->set_lock_type(lock->get_type());
1719 lock->get_parent()->set_object_info(slavereq->get_object_info());
1720 mds->send_message_mds(slavereq, auth);
1723 lock->finish_waiters(SimpleLock::WAIT_STABLE |
1724 SimpleLock::WAIT_WR |
1725 SimpleLock::WAIT_RD, 0);
1727 if (lock->get_num_xlocks() == 0) {
1728 if (lock->get_state() == LOCK_LOCK_XLOCK)
1729 lock->set_state(LOCK_XLOCKDONE);
1730 _finish_xlock(lock, xlocker, &do_issue);
1735 CInode *in = static_cast<CInode*>(lock->get_parent());
1736 if (in->is_head()) {
1738 *pneed_issue = true;
1745 void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
1747 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1750 mut->xlocks.erase(lock);
1751 mut->locks.erase(lock);
1753 MDSCacheObject *p = lock->get_parent();
1754 assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
1756 if (!lock->is_stable())
1757 lock->get_parent()->auth_unpin(lock);
1759 lock->set_state(LOCK_LOCK);
1762 void Locker::xlock_import(SimpleLock *lock)
1764 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1765 lock->get_parent()->auth_pin(lock);
1770 // file i/o -----------------------------------------
1772 version_t Locker::issue_file_data_version(CInode *in)
1774 dout(7) << "issue_file_data_version on " << *in << dendl;
1775 return in->inode.file_data_version;
1778 class C_Locker_FileUpdate_finish : public LockerLogContext {
1786 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
1787 bool sm=false, bool ni=false, client_t c=-1,
1788 MClientCaps *ac = 0)
1789 : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
1790 client(c), ack(ac) {
1791 in->get(CInode::PIN_PTRWAITER);
1793 void finish(int r) override {
1794 locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
1795 in->put(CInode::PIN_PTRWAITER);
1799 void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
1800 client_t client, MClientCaps *ack)
1802 dout(10) << "file_update_finish on " << *in << dendl;
1803 in->pop_and_dirty_projected_inode(mut->ls);
1808 Session *session = mds->get_session(client);
1810 // "oldest flush tid" > 0 means client uses unique TID for each flush
1811 if (ack->get_oldest_flush_tid() > 0)
1812 session->add_completed_flush(ack->get_client_tid());
1813 mds->send_message_client_counted(ack, session);
1815 dout(10) << " no session for client." << client << " " << *ack << dendl;
1820 set<CInode*> need_issue;
1821 drop_locks(mut.get(), &need_issue);
1823 if (!in->is_head() && !in->client_snap_caps.empty()) {
1824 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1825 // check for snap writeback completion
1826 bool gather = false;
1827 compact_map<int,set<client_t> >::iterator p = in->client_snap_caps.begin();
1828 while (p != in->client_snap_caps.end()) {
1829 SimpleLock *lock = in->get_lock(p->first);
1831 dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1832 << " lock " << *lock << " on " << *in << dendl;
1835 p->second.erase(client);
1836 if (p->second.empty()) {
1838 in->client_snap_caps.erase(p++);
1843 if (in->client_snap_caps.empty())
1844 in->item_open_file.remove_myself();
1845 eval_cap_gather(in, &need_issue);
1848 if (issue_client_cap && need_issue.count(in) == 0) {
1849 Capability *cap = in->get_client_cap(client);
1850 if (cap && (cap->wanted() & ~cap->pending()))
1851 issue_caps(in, cap);
1854 if (share_max && in->is_auth() &&
1855 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1856 share_inode_max_size(in);
1858 issue_caps_set(need_issue);
1860 // auth unpin after issuing caps
1864 Capability* Locker::issue_new_caps(CInode *in,
1870 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1873 // if replay, try to reconnect cap, and otherwise do nothing.
1875 mds->mdcache->try_reconnect_cap(in, session);
1880 assert(session->info.inst.name.is_client());
1881 client_t my_client = session->info.inst.name.num();
1882 int my_want = ceph_caps_for_mode(mode);
1884 // register a capability
1885 Capability *cap = in->get_client_cap(my_client);
1888 cap = in->add_client_cap(my_client, session, realm);
1889 cap->set_wanted(my_want);
1891 cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1895 // make sure it wants sufficient caps
1896 if (my_want & ~cap->wanted()) {
1897 // augment wanted caps for this client
1898 cap->set_wanted(cap->wanted() | my_want);
1902 if (in->is_auth()) {
1903 // [auth] twiddle mode?
1904 eval(in, CEPH_CAP_LOCKS);
1906 if (_need_flush_mdlog(in, my_want))
1907 mds->mdlog->flush();
1910 // [replica] tell auth about any new caps wanted
1911 request_inode_file_caps(in);
1914 // issue caps (pot. incl new one)
1915 //issue_caps(in); // note: _eval above may have done this already...
1917 // re-issue whatever we can
1918 //cap->issue(cap->pending());
1921 cap->dec_suppress();
1927 void Locker::issue_caps_set(set<CInode*>& inset)
1929 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1933 bool Locker::issue_caps(CInode *in, Capability *only_cap)
1935 // allowed caps are determined by the lock mode.
1936 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1937 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
1938 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
1940 client_t loner = in->get_loner();
1942 dout(7) << "issue_caps loner client." << loner
1943 << " allowed=" << ccap_string(loner_allowed)
1944 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1945 << ", others allowed=" << ccap_string(all_allowed)
1946 << " on " << *in << dendl;
1948 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
1949 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1950 << " on " << *in << dendl;
1953 assert(in->is_head());
1955 // count conflicts with
1959 map<client_t, Capability*>::iterator it;
1961 it = in->client_caps.find(only_cap->get_client());
1963 it = in->client_caps.begin();
1964 for (; it != in->client_caps.end(); ++it) {
1965 Capability *cap = it->second;
1966 if (cap->is_stale())
1969 // do not issue _new_ bits when size|mtime is projected
1971 if (loner == it->first)
1972 allowed = loner_allowed;
1974 allowed = all_allowed;
1976 // add in any xlocker-only caps (for locks this client is the xlocker for)
1977 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
1979 Session *session = mds->get_session(it->first);
1980 if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
1981 !(session && session->connection &&
1982 session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
1983 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
1985 int pending = cap->pending();
1986 int wanted = cap->wanted();
1988 dout(20) << " client." << it->first
1989 << " pending " << ccap_string(pending)
1990 << " allowed " << ccap_string(allowed)
1991 << " wanted " << ccap_string(wanted)
1994 if (!(pending & ~allowed)) {
1995 // skip if suppress or new, and not revocation
1996 if (cap->is_new() || cap->is_suppress()) {
1997 dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
2002 // notify clients about deleted inode, to make sure they release caps ASAP.
2003 if (in->inode.nlink == 0)
2004 wanted |= CEPH_CAP_LINK_SHARED;
2006 // are there caps that the client _wants_ and can have, but aren't pending?
2007 // or do we need to revoke?
2008 if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2009 (pending & ~allowed)) { // need to revoke ~allowed caps.
2013 // include caps that clients generally like, while we're at it.
2014 int likes = in->get_caps_liked();
2015 int before = pending;
2017 if (pending & ~allowed)
2018 seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
2020 seq = cap->issue((wanted|likes) & allowed);
2021 int after = cap->pending();
2023 if (cap->is_new()) {
2024 // haven't send caps to client yet
2025 if (before & ~after)
2026 cap->confirm_receipt(seq, after);
2028 dout(7) << " sending MClientCaps to client." << it->first
2029 << " seq " << cap->get_last_seq()
2030 << " new pending " << ccap_string(after) << " was " << ccap_string(before)
2033 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2034 if (op == CEPH_CAP_OP_REVOKE) {
2035 revoking_caps.push_back(&cap->item_revoking_caps);
2036 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2037 cap->set_last_revoke_stamp(ceph_clock_now());
2038 cap->reset_num_revoke_warnings();
2041 MClientCaps *m = new MClientCaps(op, in->ino(),
2042 in->find_snaprealm()->inode->ino(),
2043 cap->get_cap_id(), cap->get_last_seq(),
2046 mds->get_osd_epoch_barrier());
2047 in->encode_cap_message(m, cap);
2049 mds->send_message_client_counted(m, it->first);
2057 return (nissued == 0); // true if no re-issued, no callbacks
2060 void Locker::issue_truncate(CInode *in)
2062 dout(7) << "issue_truncate on " << *in << dendl;
2064 for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
2065 it != in->client_caps.end();
2067 Capability *cap = it->second;
2068 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
2070 in->find_snaprealm()->inode->ino(),
2071 cap->get_cap_id(), cap->get_last_seq(),
2072 cap->pending(), cap->wanted(), 0,
2074 mds->get_osd_epoch_barrier());
2075 in->encode_cap_message(m, cap);
2076 mds->send_message_client_counted(m, it->first);
2079 // should we increase max_size?
2080 if (in->is_auth() && in->is_file())
2081 check_inode_max_size(in);
2085 void Locker::revoke_stale_caps(Capability *cap)
2087 CInode *in = cap->get_inode();
2088 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2089 // if export succeeds, the cap will be removed. if export fails, we need to
2090 // revoke the cap if it's still stale.
2091 in->state_set(CInode::STATE_EVALSTALECAPS);
2095 int issued = cap->issued();
2096 if (issued & ~CEPH_CAP_PIN) {
2097 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2100 if (in->is_auth() &&
2101 in->inode.client_ranges.count(cap->get_client()))
2102 in->state_set(CInode::STATE_NEEDSRECOVER);
2104 if (!in->filelock.is_stable()) eval_gather(&in->filelock);
2105 if (!in->linklock.is_stable()) eval_gather(&in->linklock);
2106 if (!in->authlock.is_stable()) eval_gather(&in->authlock);
2107 if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
2109 if (in->is_auth()) {
2110 try_eval(in, CEPH_CAP_LOCKS);
2112 request_inode_file_caps(in);
2117 void Locker::revoke_stale_caps(Session *session)
2119 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2121 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2122 Capability *cap = *p;
2124 revoke_stale_caps(cap);
2128 void Locker::resume_stale_caps(Session *session)
2130 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2132 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2133 Capability *cap = *p;
2134 CInode *in = cap->get_inode();
2135 assert(in->is_head());
2136 if (cap->is_stale()) {
2137 dout(10) << " clearing stale flag on " << *in << dendl;
2140 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2141 // if export succeeds, the cap will be removed. if export fails,
2142 // we need to re-issue the cap if it's not stale.
2143 in->state_set(CInode::STATE_EVALSTALECAPS);
2147 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2148 issue_caps(in, cap);
2153 void Locker::remove_stale_leases(Session *session)
2155 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2156 xlist<ClientLease*>::iterator p = session->leases.begin();
2158 ClientLease *l = *p;
2160 CDentry *parent = static_cast<CDentry*>(l->parent);
2161 dout(15) << " removing lease on " << *parent << dendl;
2162 parent->remove_client_lease(l, this);
2167 class C_MDL_RequestInodeFileCaps : public LockerContext {
2170 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2171 in->get(CInode::PIN_PTRWAITER);
2173 void finish(int r) override {
2175 locker->request_inode_file_caps(in);
2176 in->put(CInode::PIN_PTRWAITER);
2180 void Locker::request_inode_file_caps(CInode *in)
2182 assert(!in->is_auth());
2184 int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
2185 if (wanted != in->replica_caps_wanted) {
2186 // wait for single auth
2187 if (in->is_ambiguous_auth()) {
2188 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2189 new C_MDL_RequestInodeFileCaps(this, in));
2193 mds_rank_t auth = in->authority().first;
2194 if (mds->is_cluster_degraded() &&
2195 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2196 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2200 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2201 << " was " << ccap_string(in->replica_caps_wanted)
2202 << " on " << *in << " to mds." << auth << dendl;
2204 in->replica_caps_wanted = wanted;
2206 if (!mds->is_cluster_degraded() ||
2207 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2208 mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
2213 /* This function DOES put the passed message before returning */
2214 void Locker::handle_inode_file_caps(MInodeFileCaps *m)
2216 // nobody should be talking to us during recovery.
2217 assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
2220 CInode *in = mdcache->get_inode(m->get_ino());
2221 mds_rank_t from = mds_rank_t(m->get_source().num());
2224 assert(in->is_auth());
2226 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2229 in->mds_caps_wanted[from] = m->get_caps();
2231 in->mds_caps_wanted.erase(from);
2233 try_eval(in, CEPH_CAP_LOCKS);
2238 class C_MDL_CheckMaxSize : public LockerContext {
2240 uint64_t new_max_size;
2245 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2246 uint64_t _newsize, utime_t _mtime) :
2247 LockerContext(l), in(i),
2248 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2250 in->get(CInode::PIN_PTRWAITER);
2252 void finish(int r) override {
2254 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2255 in->put(CInode::PIN_PTRWAITER);
2259 uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size)
2261 uint64_t new_max = (size + 1) << 1;
2262 uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
2264 max_inc *= pi->get_layout_size_increment();
2265 new_max = MIN(new_max, size + max_inc);
2267 return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
2270 void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
2271 map<client_t,client_writeable_range_t> *new_ranges,
2272 bool *max_increased)
2274 inode_t *latest = in->get_projected_inode();
2276 if(latest->has_layout()) {
2277 ms = calc_new_max_size(latest, size);
2279 // Layout-less directories like ~mds0/, have zero size
2283 // increase ranges as appropriate.
2284 // shrink to 0 if no WR|BUFFER caps issued.
2285 for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
2286 p != in->client_caps.end();
2288 if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2289 client_writeable_range_t& nr = (*new_ranges)[p->first];
2291 if (latest->client_ranges.count(p->first)) {
2292 client_writeable_range_t& oldr = latest->client_ranges[p->first];
2293 if (ms > oldr.range.last)
2294 *max_increased = true;
2295 nr.range.last = MAX(ms, oldr.range.last);
2296 nr.follows = oldr.follows;
2298 *max_increased = true;
2300 nr.follows = in->first - 1;
2306 bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2307 uint64_t new_max_size, uint64_t new_size,
2310 assert(in->is_auth());
2311 assert(in->is_file());
2313 inode_t *latest = in->get_projected_inode();
2314 map<client_t, client_writeable_range_t> new_ranges;
2315 uint64_t size = latest->size;
2316 bool update_size = new_size > 0;
2317 bool update_max = false;
2318 bool max_increased = false;
2321 new_size = size = MAX(size, new_size);
2322 new_mtime = MAX(new_mtime, latest->mtime);
2323 if (latest->size == new_size && latest->mtime == new_mtime)
2324 update_size = false;
2327 calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
2329 if (max_increased || latest->client_ranges != new_ranges)
2332 if (!update_size && !update_max) {
2333 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2337 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2338 << " update_size " << update_size
2339 << " on " << *in << dendl;
2341 if (in->is_frozen()) {
2342 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2343 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2347 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2350 if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2352 if (in->filelock.is_stable()) {
2353 if (in->get_target_loner() >= 0)
2354 file_excl(&in->filelock);
2356 simple_lock(&in->filelock);
2358 if (!in->filelock.can_wrlock(in->get_loner())) {
2360 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2365 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2366 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2371 MutationRef mut(new MutationImpl());
2372 mut->ls = mds->mdlog->get_current_segment();
2374 inode_t *pi = in->project_inode();
2375 pi->version = in->pre_dirty();
2378 dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
2379 pi->client_ranges = new_ranges;
2383 dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl;
2384 pi->size = new_size;
2385 pi->rstat.rbytes = new_size;
2386 dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl;
2387 pi->mtime = new_mtime;
2390 // use EOpen if the file is still open; otherwise, use EUpdate.
2391 // this is just an optimization to push open files forward into
2392 // newer log segments.
2394 EMetaBlob *metablob;
2395 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2396 EOpen *eo = new EOpen(mds->mdlog);
2397 eo->add_ino(in->ino());
2398 metablob = &eo->metablob;
2400 mut->ls->open_files.push_back(&in->item_open_file);
2402 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2403 metablob = &eu->metablob;
2406 mds->mdlog->start_entry(le);
2407 if (update_size) { // FIXME if/when we do max_size nested accounting
2408 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2410 CDentry *parent = in->get_projected_parent_dn();
2411 metablob->add_primary_dentry(parent, in, true);
2413 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2414 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2416 mds->mdlog->submit_entry(le,
2417 new C_Locker_FileUpdate_finish(this, in, mut, true));
2418 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2421 // make max_size _increase_ timely
2423 mds->mdlog->flush();
2429 void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2432 * only share if currently issued a WR cap. if client doesn't have it,
2433 * file_max doesn't matter, and the client will get it if/when they get
2436 dout(10) << "share_inode_max_size on " << *in << dendl;
2437 map<client_t, Capability*>::iterator it;
2439 it = in->client_caps.find(only_cap->get_client());
2441 it = in->client_caps.begin();
2442 for (; it != in->client_caps.end(); ++it) {
2443 const client_t client = it->first;
2444 Capability *cap = it->second;
2445 if (cap->is_suppress())
2447 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2448 dout(10) << "share_inode_max_size with client." << client << dendl;
2449 cap->inc_last_seq();
2450 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
2452 in->find_snaprealm()->inode->ino(),
2453 cap->get_cap_id(), cap->get_last_seq(),
2454 cap->pending(), cap->wanted(), 0,
2456 mds->get_osd_epoch_barrier());
2457 in->encode_cap_message(m, cap);
2458 mds->send_message_client_counted(m, client);
2465 bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2467 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2468 * pending mutations. */
2469 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2470 in->filelock.is_unstable_and_locked()) ||
2471 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2472 in->authlock.is_unstable_and_locked()) ||
2473 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2474 in->linklock.is_unstable_and_locked()) ||
2475 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2476 in->xattrlock.is_unstable_and_locked()))
2481 void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2483 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2484 dout(10) << " wanted " << ccap_string(cap->wanted())
2485 << " -> " << ccap_string(wanted) << dendl;
2486 cap->set_wanted(wanted);
2487 } else if (wanted & ~cap->wanted()) {
2488 dout(10) << " wanted " << ccap_string(cap->wanted())
2489 << " -> " << ccap_string(wanted)
2490 << " (added caps even though we had seq mismatch!)" << dendl;
2491 cap->set_wanted(wanted | cap->wanted());
2493 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2494 << " -> " << ccap_string(wanted)
2495 << " (issue_seq " << issue_seq << " != last_issue "
2496 << cap->get_last_issue() << ")" << dendl;
2500 CInode *cur = cap->get_inode();
2501 if (!cur->is_auth()) {
2502 request_inode_file_caps(cur);
2506 if (cap->wanted() == 0) {
2507 if (cur->item_open_file.is_on_list() &&
2508 !cur->is_any_caps_wanted()) {
2509 dout(10) << " removing unwanted file from open file list " << *cur << dendl;
2510 cur->item_open_file.remove_myself();
2513 if (cur->state_test(CInode::STATE_RECOVERING) &&
2514 (cap->wanted() & (CEPH_CAP_FILE_RD |
2515 CEPH_CAP_FILE_WR))) {
2516 mds->mdcache->recovery_queue.prioritize(cur);
2519 if (!cur->item_open_file.is_on_list()) {
2520 dout(10) << " adding to open file list " << *cur << dendl;
2521 assert(cur->last == CEPH_NOSNAP);
2522 LogSegment *ls = mds->mdlog->get_current_segment();
2523 EOpen *le = new EOpen(mds->mdlog);
2524 mds->mdlog->start_entry(le);
2525 le->add_clean_inode(cur);
2526 ls->open_files.push_back(&cur->item_open_file);
2527 mds->mdlog->submit_entry(le);
2534 void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
2536 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
2537 for (auto p = head_in->client_need_snapflush.begin();
2538 p != head_in->client_need_snapflush.end() && p->first < last; ) {
2539 snapid_t snapid = p->first;
2540 set<client_t>& clients = p->second;
2541 ++p; // be careful, q loop below depends on this
2543 if (clients.count(client)) {
2544 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
2545 CInode *sin = mdcache->get_inode(head_in->ino(), snapid);
2547 // hrm, look forward until we find the inode.
2548 // (we can only look it up by the last snapid it is valid for)
2549 dout(10) << " didn't have " << head_in->ino() << " snapid " << snapid << dendl;
2550 for (compact_map<snapid_t, set<client_t> >::iterator q = p; // p is already at next entry
2551 q != head_in->client_need_snapflush.end();
2553 dout(10) << " trying snapid " << q->first << dendl;
2554 sin = mdcache->get_inode(head_in->ino(), q->first);
2556 assert(sin->first <= snapid);
2559 dout(10) << " didn't have " << head_in->ino() << " snapid " << q->first << dendl;
2561 if (!sin && head_in->is_multiversion())
2565 _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
2566 head_in->remove_need_snapflush(sin, snapid, client);
2572 bool Locker::should_defer_client_cap_frozen(CInode *in)
2575 * This policy needs to be AT LEAST as permissive as allowing a client request
2576 * to go forward, or else a client request can release something, the release
2577 * gets deferred, but the request gets processed and deadlocks because when the
2578 * caps can't get revoked.
2580 * Currently, a request wait if anything locked is freezing (can't
2581 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2582 * _MUST_ be in the lock/auth_pin set.
2584 * auth_pins==0 implies no unstable lock and not auth pinnned by
2585 * client request, otherwise continue even it's freezing.
2587 return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2591 * This function DOES put the passed message before returning
2593 void Locker::handle_client_caps(MClientCaps *m)
2595 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
2596 client_t client = m->get_source().num();
2598 snapid_t follows = m->get_snap_follows();
2599 dout(7) << "handle_client_caps "
2600 << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
2601 << " on " << m->get_ino()
2602 << " tid " << m->get_client_tid() << " follows " << follows
2603 << " op " << ceph_cap_op_name(m->get_op()) << dendl;
2605 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2607 dout(5) << " no session, dropping " << *m << dendl;
2611 if (session->is_closed() ||
2612 session->is_closing() ||
2613 session->is_killing()) {
2614 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2618 if (mds->is_reconnect() &&
2619 m->get_dirty() && m->get_client_tid() > 0 &&
2620 !session->have_completed_flush(m->get_client_tid())) {
2621 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
2623 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2627 if (m->get_client_tid() > 0 && session &&
2628 session->have_completed_flush(m->get_client_tid())) {
2629 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2630 << " for client." << client << dendl;
2632 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2633 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
2634 m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2636 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
2637 m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
2638 mds->get_osd_epoch_barrier());
2640 ack->set_snap_follows(follows);
2641 ack->set_client_tid(m->get_client_tid());
2642 mds->send_message_client_counted(ack, m->get_connection());
2643 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2647 // fall-thru because the message may release some caps
2649 m->set_op(CEPH_CAP_OP_UPDATE);
2653 // "oldest flush tid" > 0 means client uses unique TID for each flush
2654 if (m->get_oldest_flush_tid() > 0 && session) {
2655 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2656 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2658 if (session->get_num_trim_flushes_warnings() > 0 &&
2659 session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
2660 session->reset_num_trim_flushes_warnings();
2662 if (session->get_num_completed_flushes() >=
2663 (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2664 session->inc_num_trim_flushes_warnings();
2666 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2667 << m->get_oldest_flush_tid() << "), "
2668 << session->get_num_completed_flushes()
2669 << " completed flushes recorded in session";
2670 mds->clog->warn() << ss.str();
2671 dout(20) << __func__ << " " << ss.str() << dendl;
2676 CInode *head_in = mdcache->get_inode(m->get_ino());
2678 if (mds->is_clientreplay()) {
2679 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2680 << ", will try again after replayed client requests" << dendl;
2681 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2684 dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2689 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2690 // Pause RADOS operations until we see the required epoch
2691 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2694 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2695 // Record the barrier so that we will retransmit it to clients
2696 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2699 CInode *in = head_in;
2701 in = mdcache->pick_inode_snap(head_in, follows);
2703 dout(10) << " head inode " << *head_in << dendl;
2705 dout(10) << " cap inode " << *in << dendl;
2707 Capability *cap = 0;
2708 cap = in->get_client_cap(client);
2709 if (!cap && in != head_in)
2710 cap = head_in->get_client_cap(client);
2712 dout(7) << "handle_client_caps no cap for client." << client << " on " << *in << dendl;
2719 if (should_defer_client_cap_frozen(in)) {
2720 dout(7) << "handle_client_caps freezing|frozen on " << *in << dendl;
2721 in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
2724 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2725 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2726 << ", dropping" << dendl;
2731 int op = m->get_op();
2734 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2735 if (!in->is_auth()) {
2736 dout(7) << " not auth, ignoring flushsnap on " << *in << dendl;
2740 SnapRealm *realm = in->find_snaprealm();
2741 snapid_t snap = realm->get_snap_following(follows);
2742 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
2744 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2745 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2746 // case we get a dup response, so whatever.)
2747 MClientCaps *ack = 0;
2748 if (m->get_dirty()) {
2749 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2750 ack->set_snap_follows(follows);
2751 ack->set_client_tid(m->get_client_tid());
2752 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2755 if (in == head_in ||
2756 (head_in->client_need_snapflush.count(snap) &&
2757 head_in->client_need_snapflush[snap].count(client))) {
2758 dout(7) << " flushsnap snap " << snap
2759 << " client." << client << " on " << *in << dendl;
2761 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2763 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
2764 else if (head_in->client_need_snapflush.begin()->first < snap)
2765 _do_null_snapflush(head_in, client, snap);
2767 _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
2770 head_in->remove_need_snapflush(in, snap, client);
2773 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2775 mds->send_message_client_counted(ack, m->get_connection());
2780 if (cap->get_cap_id() != m->get_cap_id()) {
2781 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2783 // intermediate snap inodes
2784 while (in != head_in) {
2785 assert(in->last != CEPH_NOSNAP);
2786 if (in->is_auth() && m->get_dirty()) {
2787 dout(10) << " updating intermediate snapped inode " << *in << dendl;
2788 _do_cap_update(in, NULL, m->get_dirty(), follows, m);
2790 in = mdcache->pick_inode_snap(head_in, in->last);
2793 // head inode, and cap
2794 MClientCaps *ack = 0;
2796 int caps = m->get_caps();
2797 if (caps & ~cap->issued()) {
2798 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2799 caps &= cap->issued();
2802 cap->confirm_receipt(m->get_seq(), caps);
2803 dout(10) << " follows " << follows
2804 << " retains " << ccap_string(m->get_caps())
2805 << " dirty " << ccap_string(m->get_dirty())
2806 << " on " << *in << dendl;
2809 // missing/skipped snapflush?
2810 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2811 // presently only does so when it has actual dirty metadata. But, we
2812 // set up the need_snapflush stuff based on the issued caps.
2813 // We can infer that the client WONT send a FLUSHSNAP once they have
2814 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2816 if (!head_in->client_need_snapflush.empty()) {
2817 if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2818 _do_null_snapflush(head_in, client);
2820 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2824 if (m->get_dirty() && in->is_auth()) {
2825 dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty())
2826 << " seq " << m->get_seq() << " on " << *in << dendl;
2827 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2828 m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2829 ack->set_client_tid(m->get_client_tid());
2830 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2833 // filter wanted based on what we could ever give out (given auth/replica status)
2834 bool need_flush = m->flags & CLIENT_CAPS_SYNC;
2835 int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
2836 if (new_wanted != cap->wanted()) {
2837 if (!need_flush && (new_wanted & ~cap->pending())) {
2838 // exapnding caps. make sure we aren't waiting for a log flush
2839 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2842 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2845 if (in->is_auth() &&
2846 _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
2848 eval(in, CEPH_CAP_LOCKS);
2850 if (!need_flush && (cap->wanted() & ~cap->pending()))
2851 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
2853 // no update, ack now.
2855 mds->send_message_client_counted(ack, m->get_connection());
2857 bool did_issue = eval(in, CEPH_CAP_LOCKS);
2858 if (!did_issue && (cap->wanted() & ~cap->pending()))
2859 issue_caps(in, cap);
2861 if (cap->get_last_seq() == 0 &&
2862 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
2863 cap->issue_norevoke(cap->issued());
2864 share_inode_max_size(in, cap);
2869 mds->mdlog->flush();
2877 class C_Locker_RetryRequestCapRelease : public LockerContext {
2879 ceph_mds_request_release item;
2881 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
2882 LockerContext(l), client(c), item(it) { }
2883 void finish(int r) override {
2885 MDRequestRef null_ref;
2886 locker->process_request_cap_release(null_ref, client, item, dname);
2890 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
2891 const string &dname)
2893 inodeno_t ino = (uint64_t)item.ino;
2894 uint64_t cap_id = item.cap_id;
2895 int caps = item.caps;
2896 int wanted = item.wanted;
2898 int issue_seq = item.issue_seq;
2899 int mseq = item.mseq;
2901 CInode *in = mdcache->get_inode(ino);
2905 if (dname.length()) {
2906 frag_t fg = in->pick_dirfrag(dname);
2907 CDir *dir = in->get_dirfrag(fg);
2909 CDentry *dn = dir->lookup(dname);
2911 ClientLease *l = dn->get_client_lease(client);
2913 dout(10) << "process_cap_release removing lease on " << *dn << dendl;
2914 dn->remove_client_lease(l, this);
2916 dout(7) << "process_cap_release client." << client
2917 << " doesn't have lease on " << *dn << dendl;
2920 dout(7) << "process_cap_release client." << client << " released lease on dn "
2921 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
2926 Capability *cap = in->get_client_cap(client);
2930 dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
2931 << (mdr ? "" : " (DEFERRED, no mdr)")
2934 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
2935 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
2939 if (cap->get_cap_id() != cap_id) {
2940 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
2944 if (should_defer_client_cap_frozen(in)) {
2945 dout(7) << " frozen, deferring" << dendl;
2946 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
2950 if (caps & ~cap->issued()) {
2951 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2952 caps &= cap->issued();
2954 cap->confirm_receipt(seq, caps);
2956 if (!in->client_need_snapflush.empty() &&
2957 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2958 _do_null_snapflush(in, client);
2961 adjust_cap_wanted(cap, wanted, issue_seq);
2964 cap->inc_suppress();
2965 eval(in, CEPH_CAP_LOCKS);
2967 cap->dec_suppress();
2969 // take note; we may need to reissue on this cap later
2971 mdr->cap_releases[in->vino()] = cap->get_last_seq();
2974 class C_Locker_RetryKickIssueCaps : public LockerContext {
2979 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
2980 LockerContext(l), in(i), client(c), seq(s) {
2981 in->get(CInode::PIN_PTRWAITER);
2983 void finish(int r) override {
2984 locker->kick_issue_caps(in, client, seq);
2985 in->put(CInode::PIN_PTRWAITER);
2989 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
2991 Capability *cap = in->get_client_cap(client);
2992 if (!cap || cap->get_last_sent() != seq)
2994 if (in->is_frozen()) {
2995 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
2996 in->add_waiter(CInode::WAIT_UNFREEZE,
2997 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3000 dout(10) << "kick_issue_caps released at current seq " << seq
3001 << ", reissuing" << dendl;
3002 issue_caps(in, cap);
3005 void Locker::kick_cap_releases(MDRequestRef& mdr)
3007 client_t client = mdr->get_client();
3008 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3009 p != mdr->cap_releases.end();
3011 CInode *in = mdcache->get_inode(p->first);
3014 kick_issue_caps(in, client, p->second);
3019 * m and ack might be NULL, so don't dereference them unless dirty != 0
3021 void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
3023 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3024 << " follows " << follows << " snap " << snap
3025 << " on " << *in << dendl;
3027 if (snap == CEPH_NOSNAP) {
3028 // hmm, i guess snap was already deleted? just ack!
3029 dout(10) << " wow, the snap following " << follows
3030 << " was already deleted. nothing to record, just ack." << dendl;
3032 mds->send_message_client_counted(ack, m->get_connection());
3036 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3037 mds->mdlog->start_entry(le);
3038 MutationRef mut = new MutationImpl();
3039 mut->ls = mds->mdlog->get_current_segment();
3041 // normal metadata updates that we can apply to the head as well.
3044 bool xattrs = false;
3045 map<string,bufferptr> *px = 0;
3046 if ((dirty & CEPH_CAP_XATTR_EXCL) &&
3047 m->xattrbl.length() &&
3048 m->head.xattr_version > in->get_projected_inode()->xattr_version)
3051 old_inode_t *oi = 0;
3052 if (in->is_multiversion()) {
3053 oi = in->pick_old_inode(snap);
3058 dout(10) << " writing into old inode" << dendl;
3059 pi = in->project_inode();
3060 pi->version = in->pre_dirty();
3061 if (snap > oi->first)
3062 in->split_old_inode(snap);
3068 px = new map<string,bufferptr>;
3069 pi = in->project_inode(px);
3070 pi->version = in->pre_dirty();
3073 _update_cap_fields(in, dirty, m, pi);
3077 dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version
3078 << " len " << m->xattrbl.length() << dendl;
3079 pi->xattr_version = m->head.xattr_version;
3080 bufferlist::iterator p = m->xattrbl.begin();
3084 if (pi->client_ranges.count(client)) {
3085 if (in->last == snap) {
3086 dout(10) << " removing client_range entirely" << dendl;
3087 pi->client_ranges.erase(client);
3089 dout(10) << " client_range now follows " << snap << dendl;
3090 pi->client_ranges[client].follows = snap;
3095 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3096 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3098 // "oldest flush tid" > 0 means client uses unique TID for each flush
3099 if (ack && ack->get_oldest_flush_tid() > 0)
3100 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3101 ack->get_oldest_flush_tid());
3103 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
3107 void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi)
3112 /* m must be valid if there are dirty caps */
3114 uint64_t features = m->get_connection()->get_features();
3116 if (m->get_ctime() > pi->ctime) {
3117 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3118 << " for " << *in << dendl;
3119 pi->ctime = m->get_ctime();
3122 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3123 m->get_change_attr() > pi->change_attr) {
3124 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3125 << " for " << *in << dendl;
3126 pi->change_attr = m->get_change_attr();
3130 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3131 utime_t atime = m->get_atime();
3132 utime_t mtime = m->get_mtime();
3133 uint64_t size = m->get_size();
3134 version_t inline_version = m->inline_version;
3136 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3137 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3138 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3139 << " for " << *in << dendl;
3142 if (in->inode.is_file() && // ONLY if regular file
3144 dout(7) << " size " << pi->size << " -> " << size
3145 << " for " << *in << dendl;
3147 pi->rstat.rbytes = size;
3149 if (in->inode.is_file() &&
3150 (dirty & CEPH_CAP_FILE_WR) &&
3151 inline_version > pi->inline_data.version) {
3152 pi->inline_data.version = inline_version;
3153 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3154 pi->inline_data.get_data() = m->inline_data;
3156 pi->inline_data.free_data();
3158 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3159 dout(7) << " atime " << pi->atime << " -> " << atime
3160 << " for " << *in << dendl;
3163 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3164 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3165 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3166 << " for " << *in << dendl;
3167 pi->time_warp_seq = m->get_time_warp_seq();
3171 if (dirty & CEPH_CAP_AUTH_EXCL) {
3172 if (m->head.uid != pi->uid) {
3173 dout(7) << " uid " << pi->uid
3174 << " -> " << m->head.uid
3175 << " for " << *in << dendl;
3176 pi->uid = m->head.uid;
3178 if (m->head.gid != pi->gid) {
3179 dout(7) << " gid " << pi->gid
3180 << " -> " << m->head.gid
3181 << " for " << *in << dendl;
3182 pi->gid = m->head.gid;
3184 if (m->head.mode != pi->mode) {
3185 dout(7) << " mode " << oct << pi->mode
3186 << " -> " << m->head.mode << dec
3187 << " for " << *in << dendl;
3188 pi->mode = m->head.mode;
3190 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3191 dout(7) << " btime " << oct << pi->btime
3192 << " -> " << m->get_btime() << dec
3193 << " for " << *in << dendl;
3194 pi->btime = m->get_btime();
3200 * update inode based on cap flush|flushsnap|wanted.
3201 * adjust max_size, if needed.
3202 * if we update, return true; otherwise, false (no updated needed).
3204 bool Locker::_do_cap_update(CInode *in, Capability *cap,
3205 int dirty, snapid_t follows,
3206 MClientCaps *m, MClientCaps *ack,
3209 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3210 << " issued " << ccap_string(cap ? cap->issued() : 0)
3211 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3212 << " on " << *in << dendl;
3213 assert(in->is_auth());
3214 client_t client = m->get_source().num();
3215 inode_t *latest = in->get_projected_inode();
3217 // increase or zero max_size?
3218 uint64_t size = m->get_size();
3219 bool change_max = false;
3220 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3221 uint64_t new_max = old_max;
3223 if (in->is_file()) {
3224 bool forced_change_max = false;
3225 dout(20) << "inode is file" << dendl;
3226 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3227 dout(20) << "client has write caps; m->get_max_size="
3228 << m->get_max_size() << "; old_max=" << old_max << dendl;
3229 if (m->get_max_size() > new_max) {
3230 dout(10) << "client requests file_max " << m->get_max_size()
3231 << " > max " << old_max << dendl;
3233 forced_change_max = true;
3234 new_max = calc_new_max_size(latest, m->get_max_size());
3236 new_max = calc_new_max_size(latest, size);
3238 if (new_max > old_max)
3250 if (in->last == CEPH_NOSNAP &&
3252 !in->filelock.can_wrlock(client) &&
3253 !in->filelock.can_force_wrlock(client)) {
3254 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3255 if (in->filelock.is_stable()) {
3256 bool need_issue = false;
3258 cap->inc_suppress();
3259 if (in->mds_caps_wanted.empty() &&
3260 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3261 if (in->filelock.get_state() != LOCK_EXCL)
3262 file_excl(&in->filelock, &need_issue);
3264 simple_lock(&in->filelock, &need_issue);
3268 cap->dec_suppress();
3270 if (!in->filelock.can_wrlock(client) &&
3271 !in->filelock.can_force_wrlock(client)) {
3272 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3273 forced_change_max ? new_max : 0,
3276 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3282 if (m->flockbl.length()) {
3284 bufferlist::iterator bli = m->flockbl.begin();
3285 ::decode(num_locks, bli);
3286 for ( int i=0; i < num_locks; ++i) {
3287 ceph_filelock decoded_lock;
3288 ::decode(decoded_lock, bli);
3289 in->get_fcntl_lock_state()->held_locks.
3290 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3291 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3293 ::decode(num_locks, bli);
3294 for ( int i=0; i < num_locks; ++i) {
3295 ceph_filelock decoded_lock;
3296 ::decode(decoded_lock, bli);
3297 in->get_flock_lock_state()->held_locks.
3298 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3299 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3303 if (!dirty && !change_max)
3306 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3307 if (session->check_access(in, MAY_WRITE,
3308 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3310 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3316 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3317 mds->mdlog->start_entry(le);
3320 map<string,bufferptr> *px = 0;
3321 if ((dirty & CEPH_CAP_XATTR_EXCL) &&
3322 m->xattrbl.length() &&
3323 m->head.xattr_version > in->get_projected_inode()->xattr_version)
3324 px = new map<string,bufferptr>;
3326 inode_t *pi = in->project_inode(px);
3327 pi->version = in->pre_dirty();
3329 MutationRef mut(new MutationImpl());
3330 mut->ls = mds->mdlog->get_current_segment();
3332 _update_cap_fields(in, dirty, m, pi);
3335 dout(7) << " max_size " << old_max << " -> " << new_max
3336 << " for " << *in << dendl;
3338 pi->client_ranges[client].range.first = 0;
3339 pi->client_ranges[client].range.last = new_max;
3340 pi->client_ranges[client].follows = in->first - 1;
3342 pi->client_ranges.erase(client);
3345 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3346 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3349 if (dirty & CEPH_CAP_AUTH_EXCL)
3350 wrlock_force(&in->authlock, mut);
3354 dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl;
3355 pi->xattr_version = m->head.xattr_version;
3356 bufferlist::iterator p = m->xattrbl.begin();
3359 wrlock_force(&in->xattrlock, mut);
3363 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3364 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3366 // "oldest flush tid" > 0 means client uses unique TID for each flush
3367 if (ack && ack->get_oldest_flush_tid() > 0)
3368 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3369 ack->get_oldest_flush_tid());
3371 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
3374 if (need_flush && !*need_flush &&
3375 ((change_max && new_max) || // max INCREASE
3376 _need_flush_mdlog(in, dirty)))
3382 /* This function DOES put the passed message before returning */
3383 void Locker::handle_client_cap_release(MClientCapRelease *m)
3385 client_t client = m->get_source().num();
3386 dout(10) << "handle_client_cap_release " << *m << dendl;
3388 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3389 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3393 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3394 // Pause RADOS operations until we see the required epoch
3395 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3398 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3399 // Record the barrier so that we will retransmit it to clients
3400 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3403 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3405 for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
3406 _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
3410 session->notify_cap_release(m->caps.size());
3416 class C_Locker_RetryCapRelease : public LockerContext {
3420 ceph_seq_t migrate_seq;
3421 ceph_seq_t issue_seq;
3423 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3424 ceph_seq_t mseq, ceph_seq_t seq) :
3425 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3426 void finish(int r) override {
3427 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3431 void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3432 ceph_seq_t mseq, ceph_seq_t seq)
3434 CInode *in = mdcache->get_inode(ino);
3436 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3439 Capability *cap = in->get_client_cap(client);
3441 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3445 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3446 if (cap->get_cap_id() != cap_id) {
3447 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3450 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3451 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3454 if (should_defer_client_cap_frozen(in)) {
3455 dout(7) << " freezing|frozen, deferring" << dendl;
3456 in->add_waiter(CInode::WAIT_UNFREEZE,
3457 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3460 if (seq != cap->get_last_issue()) {
3461 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3462 // clean out any old revoke history
3463 cap->clean_revoke_from(seq);
3464 eval_cap_gather(in);
3467 remove_client_cap(in, client);
3470 /* This function DOES put the passed message before returning */
3472 void Locker::remove_client_cap(CInode *in, client_t client)
3474 // clean out any pending snapflush state
3475 if (!in->client_need_snapflush.empty())
3476 _do_null_snapflush(in, client);
3478 in->remove_client_cap(client);
3480 if (in->is_auth()) {
3481 // make sure we clear out the client byte range
3482 if (in->get_projected_inode()->client_ranges.count(client) &&
3483 !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
3484 check_inode_max_size(in);
3486 request_inode_file_caps(in);
3489 try_eval(in, CEPH_CAP_LOCKS);
3494 * Return true if any currently revoking caps exceed the
3495 * mds_revoke_cap_timeout threshold.
3497 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
3499 xlist<Capability*>::const_iterator p = revoking.begin();
3501 // No revoking caps at the moment
3504 utime_t now = ceph_clock_now();
3505 utime_t age = now - (*p)->get_last_revoke_stamp();
3506 if (age <= g_conf->mds_revoke_cap_timeout) {
3515 void Locker::get_late_revoking_clients(std::list<client_t> *result) const
3517 if (!any_late_revoking_caps(revoking_caps)) {
3518 // Fast path: no misbehaving clients, execute in O(1)
3522 // Slow path: execute in O(N_clients)
3523 std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
3524 for (client_rc_iter = revoking_caps_by_client.begin();
3525 client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
3526 xlist<Capability*> const &client_rc = client_rc_iter->second;
3527 bool any_late = any_late_revoking_caps(client_rc);
3529 result->push_back(client_rc_iter->first);
3534 // Hard-code instead of surfacing a config settings because this is
3535 // really a hack that should go away at some point when we have better
3536 // inspection tools for getting at detailed cap state (#7316)
3537 #define MAX_WARN_CAPS 100
3539 void Locker::caps_tick()
3541 utime_t now = ceph_clock_now();
3543 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3546 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3547 Capability *cap = *p;
3549 utime_t age = now - cap->get_last_revoke_stamp();
3550 dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3551 if (age <= g_conf->mds_revoke_cap_timeout) {
3552 dout(20) << __func__ << " age below timeout " << g_conf->mds_revoke_cap_timeout << dendl;
3556 if (i > MAX_WARN_CAPS) {
3557 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3558 << "revoking, ignoring subsequent caps" << dendl;
3562 // exponential backoff of warning intervals
3563 if (age > g_conf->mds_revoke_cap_timeout * (1 << cap->get_num_revoke_warnings())) {
3564 cap->inc_num_revoke_warnings();
3566 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3567 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3568 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3569 mds->clog->warn() << ss.str();
3570 dout(20) << __func__ << " " << ss.str() << dendl;
3572 dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3578 void Locker::handle_client_lease(MClientLease *m)
3580 dout(10) << "handle_client_lease " << *m << dendl;
3582 assert(m->get_source().is_client());
3583 client_t client = m->get_source().num();
3585 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3587 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3593 frag_t fg = in->pick_dirfrag(m->dname);
3594 CDir *dir = in->get_dirfrag(fg);
3596 dn = dir->lookup(m->dname);
3598 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3602 dout(10) << " on " << *dn << dendl;
3605 ClientLease *l = dn->get_client_lease(client);
3607 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3612 switch (m->get_action()) {
3613 case CEPH_MDS_LEASE_REVOKE_ACK:
3614 case CEPH_MDS_LEASE_RELEASE:
3615 if (l->seq != m->get_seq()) {
3616 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3618 dout(7) << "handle_client_lease client." << client
3619 << " on " << *dn << dendl;
3620 dn->remove_client_lease(l, this);
3625 case CEPH_MDS_LEASE_RENEW:
3627 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3628 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3629 if (dn->lock.can_lease(client)) {
3630 int pool = 1; // fixme.. do something smart!
3631 m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3632 m->h.seq = ++l->seq;
3635 utime_t now = ceph_clock_now();
3636 now += mdcache->client_lease_durations[pool];
3637 mdcache->touch_client_lease(l, pool, now);
3639 mds->send_message_client_counted(m, m->get_connection());
3645 ceph_abort(); // implement me
3651 void Locker::issue_client_lease(CDentry *dn, client_t client,
3652 bufferlist &bl, utime_t now, Session *session)
3654 CInode *diri = dn->get_dir()->get_inode();
3655 if (!diri->is_stray() && // do not issue dn leases in stray dir!
3656 ((!diri->filelock.can_lease(client) &&
3657 (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3658 dn->lock.can_lease(client)) {
3659 int pool = 1; // fixme.. do something smart!
3660 // issue a dentry lease
3661 ClientLease *l = dn->add_client_lease(client, session);
3662 session->touch_lease(l);
3664 now += mdcache->client_lease_durations[pool];
3665 mdcache->touch_client_lease(l, pool, now);
3668 e.mask = 1 | CEPH_LOCK_DN; // old and new bit values
3670 e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3672 dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
3673 << " on " << *dn << dendl;
3681 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3686 void Locker::revoke_client_leases(SimpleLock *lock)
3689 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3690 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3691 p != dn->client_lease_map.end();
3693 ClientLease *l = p->second;
3696 assert(lock->get_type() == CEPH_LOCK_DN);
3698 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3699 int mask = 1 | CEPH_LOCK_DN; // old and new bits
3701 // i should also revoke the dir ICONTENT lease, if they have it!
3702 CInode *diri = dn->get_dir()->get_inode();
3703 mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
3706 diri->first, CEPH_NOSNAP,
3710 assert(n == lock->get_num_client_lease());
3715 // locks ----------------------------------------------------------------
3717 SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
3719 switch (lock_type) {
3722 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3723 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3728 fg = diri->pick_dirfrag(info.dname);
3729 dir = diri->get_dirfrag(fg);
3731 dn = dir->lookup(info.dname, info.snapid);
3734 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3740 case CEPH_LOCK_IAUTH:
3741 case CEPH_LOCK_ILINK:
3742 case CEPH_LOCK_IDFT:
3743 case CEPH_LOCK_IFILE:
3744 case CEPH_LOCK_INEST:
3745 case CEPH_LOCK_IXATTR:
3746 case CEPH_LOCK_ISNAP:
3747 case CEPH_LOCK_IFLOCK:
3748 case CEPH_LOCK_IPOLICY:
3750 CInode *in = mdcache->get_inode(info.ino, info.snapid);
3752 dout(7) << "get_lock don't have ino " << info.ino << dendl;
3755 switch (lock_type) {
3756 case CEPH_LOCK_IAUTH: return &in->authlock;
3757 case CEPH_LOCK_ILINK: return &in->linklock;
3758 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3759 case CEPH_LOCK_IFILE: return &in->filelock;
3760 case CEPH_LOCK_INEST: return &in->nestlock;
3761 case CEPH_LOCK_IXATTR: return &in->xattrlock;
3762 case CEPH_LOCK_ISNAP: return &in->snaplock;
3763 case CEPH_LOCK_IFLOCK: return &in->flocklock;
3764 case CEPH_LOCK_IPOLICY: return &in->policylock;
3769 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3777 /* This function DOES put the passed message before returning */
3778 void Locker::handle_lock(MLock *m)
3780 // nobody should be talking to us during recovery.
3781 assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3783 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3785 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3790 switch (lock->get_type()) {
3792 case CEPH_LOCK_IAUTH:
3793 case CEPH_LOCK_ILINK:
3794 case CEPH_LOCK_ISNAP:
3795 case CEPH_LOCK_IXATTR:
3796 case CEPH_LOCK_IFLOCK:
3797 case CEPH_LOCK_IPOLICY:
3798 handle_simple_lock(lock, m);
3801 case CEPH_LOCK_IDFT:
3802 case CEPH_LOCK_INEST:
3803 //handle_scatter_lock((ScatterLock*)lock, m);
3806 case CEPH_LOCK_IFILE:
3807 handle_file_lock(static_cast<ScatterLock*>(lock), m);
3811 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3821 // ==========================================================================
3824 /** This function may take a reference to m if it needs one, but does
3825 * not put references. */
3826 void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
3828 MDSCacheObject *parent = lock->get_parent();
3829 if (parent->is_auth() &&
3830 lock->get_state() != LOCK_SYNC &&
3831 !parent->is_frozen()) {
3832 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3833 << " on " << *parent << dendl;
3834 assert(parent->is_auth()); // replica auth pinned if they're doing this!
3835 if (lock->is_stable()) {
3838 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
3839 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
3840 new C_MDS_RetryMessage(mds, m->get()));
3843 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3844 << " on " << *parent << dendl;
3845 // replica should retry
3849 /* This function DOES put the passed message before returning */
3850 void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
3852 int from = m->get_asker();
3854 dout(10) << "handle_simple_lock " << *m
3855 << " on " << *lock << " " << *lock->get_parent() << dendl;
3857 if (mds->is_rejoin()) {
3858 if (lock->get_parent()->is_rejoining()) {
3859 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
3860 << ", dropping " << *m << dendl;
3866 switch (m->get_action()) {
3869 assert(lock->get_state() == LOCK_LOCK);
3870 lock->decode_locked_state(m->get_data());
3871 lock->set_state(LOCK_SYNC);
3872 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
3876 assert(lock->get_state() == LOCK_SYNC);
3877 lock->set_state(LOCK_SYNC_LOCK);
3878 if (lock->is_leased())
3879 revoke_client_leases(lock);
3880 eval_gather(lock, true);
3881 if (lock->is_unstable_and_locked())
3882 mds->mdlog->flush();
3887 case LOCK_AC_LOCKACK:
3888 assert(lock->get_state() == LOCK_SYNC_LOCK ||
3889 lock->get_state() == LOCK_SYNC_EXCL);
3890 assert(lock->is_gathering(from));
3891 lock->remove_gather(from);
3893 if (lock->is_gathering()) {
3894 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3895 << ", still gathering " << lock->get_gather_set() << dendl;
3897 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3898 << ", last one" << dendl;
3903 case LOCK_AC_REQRDLOCK:
3904 handle_reqrdlock(lock, m);
3912 /* unused, currently.
3914 class C_Locker_SimpleEval : public Context {
3918 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3919 void finish(int r) {
3920 locker->try_simple_eval(lock);
3924 void Locker::try_simple_eval(SimpleLock *lock)
3926 // unstable and ambiguous auth?
3927 if (!lock->is_stable() &&
3928 lock->get_parent()->is_ambiguous_auth()) {
3929 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3930 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3931 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3935 if (!lock->get_parent()->is_auth()) {
3936 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3940 if (!lock->get_parent()->can_auth_pin()) {
3941 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3942 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3943 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3947 if (lock->is_stable())
3953 void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
3955 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
3957 assert(lock->get_parent()->is_auth());
3958 assert(lock->is_stable());
3960 if (lock->get_parent()->is_freezing_or_frozen()) {
3961 // dentry lock in unreadable state can block path traverse
3962 if ((lock->get_type() != CEPH_LOCK_DN ||
3963 lock->get_state() == LOCK_SYNC ||
3964 lock->get_parent()->is_frozen()))
3968 if (mdcache->is_readonly()) {
3969 if (lock->get_state() != LOCK_SYNC) {
3970 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
3971 simple_sync(lock, need_issue);
3978 if (lock->get_type() != CEPH_LOCK_DN) {
3979 in = static_cast<CInode*>(lock->get_parent());
3980 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
3984 if (lock->get_state() != LOCK_EXCL &&
3985 in && in->get_target_loner() >= 0 &&
3986 (wanted & CEPH_CAP_GEXCL)) {
3987 dout(7) << "simple_eval stable, going to excl " << *lock
3988 << " on " << *lock->get_parent() << dendl;
3989 simple_excl(lock, need_issue);
3993 else if (lock->get_state() != LOCK_SYNC &&
3994 !lock->is_wrlocked() &&
3995 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
3996 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
3997 dout(7) << "simple_eval stable, syncing " << *lock
3998 << " on " << *lock->get_parent() << dendl;
3999 simple_sync(lock, need_issue);
4006 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4008 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4009 assert(lock->get_parent()->is_auth());
4010 assert(lock->is_stable());
4013 if (lock->get_cap_shift())
4014 in = static_cast<CInode *>(lock->get_parent());
4016 int old_state = lock->get_state();
4018 if (old_state != LOCK_TSYN) {
4020 switch (lock->get_state()) {
4021 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4022 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4023 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4024 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4025 default: ceph_abort();
4029 if (lock->is_wrlocked())
4032 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4033 send_lock_message(lock, LOCK_AC_SYNC);
4034 lock->init_gather();
4038 if (in && in->is_head()) {
4039 if (in->issued_caps_need_gather(lock)) {
4048 bool need_recover = false;
4049 if (lock->get_type() == CEPH_LOCK_IFILE) {
4051 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4052 mds->mdcache->queue_file_recover(in);
4053 need_recover = true;
4058 if (!gather && lock->is_dirty()) {
4059 lock->get_parent()->auth_pin(lock);
4060 scatter_writebehind(static_cast<ScatterLock*>(lock));
4061 mds->mdlog->flush();
4066 lock->get_parent()->auth_pin(lock);
4068 mds->mdcache->do_file_recover();
4073 if (lock->get_parent()->is_replicated()) { // FIXME
4075 lock->encode_locked_state(data);
4076 send_lock_message(lock, LOCK_AC_SYNC, data);
4078 lock->set_state(LOCK_SYNC);
4079 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4080 if (in && in->is_head()) {
4089 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4091 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4092 assert(lock->get_parent()->is_auth());
4093 assert(lock->is_stable());
4096 if (lock->get_cap_shift())
4097 in = static_cast<CInode *>(lock->get_parent());
4099 switch (lock->get_state()) {
4100 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4101 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4102 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4103 default: ceph_abort();
4107 if (lock->is_rdlocked())
4109 if (lock->is_wrlocked())
4112 if (lock->get_parent()->is_replicated() &&
4113 lock->get_state() != LOCK_LOCK_EXCL &&
4114 lock->get_state() != LOCK_XSYN_EXCL) {
4115 send_lock_message(lock, LOCK_AC_LOCK);
4116 lock->init_gather();
4120 if (in && in->is_head()) {
4121 if (in->issued_caps_need_gather(lock)) {
4131 lock->get_parent()->auth_pin(lock);
4133 lock->set_state(LOCK_EXCL);
4134 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4144 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4146 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4147 assert(lock->get_parent()->is_auth());
4148 assert(lock->is_stable());
4149 assert(lock->get_state() != LOCK_LOCK);
4152 if (lock->get_cap_shift())
4153 in = static_cast<CInode *>(lock->get_parent());
4155 int old_state = lock->get_state();
4157 switch (lock->get_state()) {
4158 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4160 file_excl(static_cast<ScatterLock*>(lock), need_issue);
4161 if (lock->get_state() != LOCK_EXCL)
4164 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4165 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4166 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4168 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4169 default: ceph_abort();
4173 if (lock->is_leased()) {
4175 revoke_client_leases(lock);
4177 if (lock->is_rdlocked())
4179 if (in && in->is_head()) {
4180 if (in->issued_caps_need_gather(lock)) {
4189 bool need_recover = false;
4190 if (lock->get_type() == CEPH_LOCK_IFILE) {
4192 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4193 mds->mdcache->queue_file_recover(in);
4194 need_recover = true;
4199 if (lock->get_parent()->is_replicated() &&
4200 lock->get_state() == LOCK_MIX_LOCK &&
4202 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4204 // move to second stage of gather now, so we don't send the lock action later.
4205 if (lock->get_state() == LOCK_MIX_LOCK)
4206 lock->set_state(LOCK_MIX_LOCK2);
4208 if (lock->get_parent()->is_replicated() &&
4209 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4211 send_lock_message(lock, LOCK_AC_LOCK);
4212 lock->init_gather();
4216 if (!gather && lock->is_dirty()) {
4217 lock->get_parent()->auth_pin(lock);
4218 scatter_writebehind(static_cast<ScatterLock*>(lock));
4219 mds->mdlog->flush();
4224 lock->get_parent()->auth_pin(lock);
4226 mds->mdcache->do_file_recover();
4228 lock->set_state(LOCK_LOCK);
4229 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4234 void Locker::simple_xlock(SimpleLock *lock)
4236 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4237 assert(lock->get_parent()->is_auth());
4238 //assert(lock->is_stable());
4239 assert(lock->get_state() != LOCK_XLOCK);
4242 if (lock->get_cap_shift())
4243 in = static_cast<CInode *>(lock->get_parent());
4245 if (lock->is_stable())
4246 lock->get_parent()->auth_pin(lock);
4248 switch (lock->get_state()) {
4250 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4251 default: ceph_abort();
4255 if (lock->is_rdlocked())
4257 if (lock->is_wrlocked())
4260 if (in && in->is_head()) {
4261 if (in->issued_caps_need_gather(lock)) {
4268 lock->set_state(LOCK_PREXLOCK);
4269 //assert("shouldn't be called if we are already xlockable" == 0);
4277 // ==========================================================================
4282 Some notes on scatterlocks.
4284 - The scatter/gather is driven by the inode lock. The scatter always
4285 brings in the latest metadata from the fragments.
4287 - When in a scattered/MIX state, fragments are only allowed to
4288 update/be written to if the accounted stat matches the inode's
4291 - That means, on gather, we _only_ assimilate diffs for frag metadata
4292 that match the current version, because those are the only ones
4293 written during this scatter/gather cycle. (Others didn't permit
4294 it.) We increment the version and journal this to disk.
4296 - When possible, we also simultaneously update our local frag
4297 accounted stats to match.
4299 - On scatter, the new inode info is broadcast to frags, both local
4300 and remote. If possible (auth and !frozen), the dirfrag auth
4301 should update the accounted state (if it isn't already up to date).
4302 Note that this may occur on both the local inode auth node and
4303 inode replicas, so there are two potential paths. If it is NOT
4304 possible, they need to mark_stale to prevent any possible writes.
4306 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4307 only). Both are opportunities to update the frag accounted stats,
4308 even though only the MIX case is affected by a stale dirfrag.
4310 - Because many scatter/gather cycles can potentially go by without a
4311 frag being able to update its accounted stats (due to being frozen
4312 by exports/refragments in progress), the frag may have (even very)
4313 old stat versions. That's fine. If when we do want to update it,
4314 we can update accounted_* and the version first.
4318 class C_Locker_ScatterWB : public LockerLogContext {
4322 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4323 LockerLogContext(l), lock(sl), mut(m) {}
4324 void finish(int r) override {
4325 locker->scatter_writebehind_finish(lock, mut);
4329 void Locker::scatter_writebehind(ScatterLock *lock)
4331 CInode *in = static_cast<CInode*>(lock->get_parent());
4332 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4335 MutationRef mut(new MutationImpl());
4336 mut->ls = mds->mdlog->get_current_segment();
4338 // forcefully take a wrlock
4339 lock->get_wrlock(true);
4340 mut->wrlocks.insert(lock);
4341 mut->locks.insert(lock);
4343 in->pre_cow_old_inode(); // avoid cow mayhem
4345 inode_t *pi = in->project_inode();
4346 pi->version = in->pre_dirty();
4348 in->finish_scatter_gather_update(lock->get_type());
4349 lock->start_flush();
4351 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4352 mds->mdlog->start_entry(le);
4354 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4355 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4357 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4359 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4362 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4364 CInode *in = static_cast<CInode*>(lock->get_parent());
4365 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4366 in->pop_and_dirty_projected_inode(mut->ls);
4368 lock->finish_flush();
4370 // if replicas may have flushed in a mix->lock state, send another
4371 // message so they can finish_flush().
4372 if (in->is_replicated()) {
4373 switch (lock->get_state()) {
4375 case LOCK_MIX_LOCK2:
4378 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4383 drop_locks(mut.get());
4386 if (lock->is_stable())
4387 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4389 //scatter_eval_gather(lock);
4392 void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4394 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4396 assert(lock->get_parent()->is_auth());
4397 assert(lock->is_stable());
4399 if (lock->get_parent()->is_freezing_or_frozen()) {
4400 dout(20) << " freezing|frozen" << dendl;
4404 if (mdcache->is_readonly()) {
4405 if (lock->get_state() != LOCK_SYNC) {
4406 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4407 simple_sync(lock, need_issue);
4412 if (!lock->is_rdlocked() &&
4413 lock->get_state() != LOCK_MIX &&
4414 lock->get_scatter_wanted()) {
4415 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4416 << " on " << *lock->get_parent() << dendl;
4417 scatter_mix(lock, need_issue);
4421 if (lock->get_type() == CEPH_LOCK_INEST) {
4422 // in general, we want to keep INEST writable at all times.
4423 if (!lock->is_rdlocked()) {
4424 if (lock->get_parent()->is_replicated()) {
4425 if (lock->get_state() != LOCK_MIX)
4426 scatter_mix(lock, need_issue);
4428 if (lock->get_state() != LOCK_LOCK)
4429 simple_lock(lock, need_issue);
4435 CInode *in = static_cast<CInode*>(lock->get_parent());
4436 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4437 // i _should_ be sync.
4438 if (!lock->is_wrlocked() &&
4439 lock->get_state() != LOCK_SYNC) {
4440 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4441 simple_sync(lock, need_issue);
4448 * mark a scatterlock to indicate that the dir fnode has some dirty data
4450 void Locker::mark_updated_scatterlock(ScatterLock *lock)
4453 if (lock->get_updated_item()->is_on_list()) {
4454 dout(10) << "mark_updated_scatterlock " << *lock
4455 << " - already on list since " << lock->get_update_stamp() << dendl;
4457 updated_scatterlocks.push_back(lock->get_updated_item());
4458 utime_t now = ceph_clock_now();
4459 lock->set_update_stamp(now);
4460 dout(10) << "mark_updated_scatterlock " << *lock
4461 << " - added at " << now << dendl;
4466 * this is called by scatter_tick and LogSegment::try_to_trim() when
4467 * trying to flush dirty scattered data (i.e. updated fnode) back to
4470 * we need to lock|scatter in order to push fnode changes into the
4473 void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
4475 CInode *p = static_cast<CInode *>(lock->get_parent());
4477 if (p->is_frozen() || p->is_freezing()) {
4478 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4480 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
4482 // just requeue. not ideal.. starvation prone..
4483 updated_scatterlocks.push_back(lock->get_updated_item());
4487 if (p->is_ambiguous_auth()) {
4488 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4490 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
4492 // just requeue. not ideal.. starvation prone..
4493 updated_scatterlocks.push_back(lock->get_updated_item());
4500 if (lock->is_stable()) {
4501 // can we do it now?
4502 // (only if we're not replicated.. if we are, we really do need
4503 // to nudge the lock state!)
4505 actually, even if we're not replicated, we can't stay in MIX, because another mds
4506 could discover and replicate us at any time. if that happens while we're flushing,
4507 they end up in MIX but their inode has the old scatterstat version.
4509 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4510 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4511 scatter_writebehind(lock);
4513 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4518 if (mdcache->is_readonly()) {
4519 if (lock->get_state() != LOCK_SYNC) {
4520 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4521 simple_sync(static_cast<ScatterLock*>(lock));
4526 // adjust lock state
4527 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4528 switch (lock->get_type()) {
4529 case CEPH_LOCK_IFILE:
4530 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4531 scatter_mix(static_cast<ScatterLock*>(lock));
4532 else if (lock->get_state() != LOCK_LOCK)
4533 simple_lock(static_cast<ScatterLock*>(lock));
4535 simple_sync(static_cast<ScatterLock*>(lock));
4538 case CEPH_LOCK_IDFT:
4539 case CEPH_LOCK_INEST:
4540 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4542 else if (lock->get_state() != LOCK_LOCK)
4551 if (lock->is_stable() && count == 2) {
4552 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4553 // this should only realy happen when called via
4554 // handle_file_lock due to AC_NUDGE, because the rest of the
4555 // time we are replicated or have dirty data and won't get
4556 // called. bailing here avoids an infinite loop.
4561 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4563 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4568 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4569 << *lock << " on " << *p << dendl;
4570 // request unscatter?
4571 mds_rank_t auth = lock->get_parent()->authority().first;
4572 if (!mds->is_cluster_degraded() ||
4573 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
4574 mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4578 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4580 // also, requeue, in case we had wrong auth or something
4581 updated_scatterlocks.push_back(lock->get_updated_item());
4585 void Locker::scatter_tick()
4587 dout(10) << "scatter_tick" << dendl;
4590 utime_t now = ceph_clock_now();
4591 int n = updated_scatterlocks.size();
4592 while (!updated_scatterlocks.empty()) {
4593 ScatterLock *lock = updated_scatterlocks.front();
4595 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
4597 if (!lock->is_dirty()) {
4598 updated_scatterlocks.pop_front();
4599 dout(10) << " removing from updated_scatterlocks "
4600 << *lock << " " << *lock->get_parent() << dendl;
4603 if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
4605 updated_scatterlocks.pop_front();
4606 scatter_nudge(lock, 0);
4608 mds->mdlog->flush();
4612 void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4614 dout(10) << "scatter_tempsync " << *lock
4615 << " on " << *lock->get_parent() << dendl;
4616 assert(lock->get_parent()->is_auth());
4617 assert(lock->is_stable());
4619 assert(0 == "not fully implemented, at least not for filelock");
4621 CInode *in = static_cast<CInode *>(lock->get_parent());
4623 switch (lock->get_state()) {
4624 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
4625 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4626 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4627 default: ceph_abort();
4631 if (lock->is_wrlocked())
4634 if (lock->get_cap_shift() &&
4636 in->issued_caps_need_gather(lock)) {
4644 if (lock->get_state() == LOCK_MIX_TSYN &&
4645 in->is_replicated()) {
4646 lock->init_gather();
4647 send_lock_message(lock, LOCK_AC_LOCK);
4655 lock->set_state(LOCK_TSYN);
4656 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4657 if (lock->get_cap_shift()) {
4668 // ==========================================================================
4671 void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4673 dout(7) << "local_wrlock_grab on " << *lock
4674 << " on " << *lock->get_parent() << dendl;
4676 assert(lock->get_parent()->is_auth());
4677 assert(lock->can_wrlock());
4678 assert(!mut->wrlocks.count(lock));
4679 lock->get_wrlock(mut->get_client());
4680 mut->wrlocks.insert(lock);
4681 mut->locks.insert(lock);
4684 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4686 dout(7) << "local_wrlock_start on " << *lock
4687 << " on " << *lock->get_parent() << dendl;
4689 assert(lock->get_parent()->is_auth());
4690 if (lock->can_wrlock()) {
4691 assert(!mut->wrlocks.count(lock));
4692 lock->get_wrlock(mut->get_client());
4693 mut->wrlocks.insert(lock);
4694 mut->locks.insert(lock);
4697 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4702 void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
4704 dout(7) << "local_wrlock_finish on " << *lock
4705 << " on " << *lock->get_parent() << dendl;
4707 mut->wrlocks.erase(lock);
4708 mut->locks.erase(lock);
4709 if (lock->get_num_wrlocks() == 0) {
4710 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4711 SimpleLock::WAIT_WR |
4712 SimpleLock::WAIT_RD);
4716 bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4718 dout(7) << "local_xlock_start on " << *lock
4719 << " on " << *lock->get_parent() << dendl;
4721 assert(lock->get_parent()->is_auth());
4722 if (!lock->can_xlock_local()) {
4723 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4727 lock->get_xlock(mut, mut->get_client());
4728 mut->xlocks.insert(lock);
4729 mut->locks.insert(lock);
4733 void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
4735 dout(7) << "local_xlock_finish on " << *lock
4736 << " on " << *lock->get_parent() << dendl;
4738 mut->xlocks.erase(lock);
4739 mut->locks.erase(lock);
4741 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4742 SimpleLock::WAIT_WR |
4743 SimpleLock::WAIT_RD);
4748 // ==========================================================================
4752 void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4754 CInode *in = static_cast<CInode*>(lock->get_parent());
4755 int loner_wanted, other_wanted;
4756 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4757 dout(7) << "file_eval wanted=" << gcap_string(wanted)
4758 << " loner_wanted=" << gcap_string(loner_wanted)
4759 << " other_wanted=" << gcap_string(other_wanted)
4760 << " filelock=" << *lock << " on " << *lock->get_parent()
4763 assert(lock->get_parent()->is_auth());
4764 assert(lock->is_stable());
4766 if (lock->get_parent()->is_freezing_or_frozen())
4769 if (mdcache->is_readonly()) {
4770 if (lock->get_state() != LOCK_SYNC) {
4771 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4772 simple_sync(lock, need_issue);
4778 if (lock->get_state() == LOCK_EXCL) {
4779 dout(20) << " is excl" << dendl;
4780 int loner_issued, other_issued, xlocker_issued;
4781 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4782 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4783 << " other_issued=" << gcap_string(other_issued)
4784 << " xlocker_issued=" << gcap_string(xlocker_issued)
4786 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4787 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4788 (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
4789 dout(20) << " should lose it" << dendl;
4790 // we should lose it.
4801 // -> any writer means MIX; RD doesn't matter.
4802 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4803 lock->is_waiter_for(SimpleLock::WAIT_WR))
4804 scatter_mix(lock, need_issue);
4805 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
4806 simple_sync(lock, need_issue);
4808 dout(10) << " waiting for wrlock to drain" << dendl;
4813 else if (lock->get_state() != LOCK_EXCL &&
4814 !lock->is_rdlocked() &&
4815 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4816 ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4817 (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4818 in->get_target_loner() >= 0) {
4819 dout(7) << "file_eval stable, bump to loner " << *lock
4820 << " on " << *lock->get_parent() << dendl;
4821 file_excl(lock, need_issue);
4825 else if (lock->get_state() != LOCK_MIX &&
4826 !lock->is_rdlocked() &&
4827 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4828 (lock->get_scatter_wanted() ||
4829 (in->get_wanted_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
4830 dout(7) << "file_eval stable, bump to mixed " << *lock
4831 << " on " << *lock->get_parent() << dendl;
4832 scatter_mix(lock, need_issue);
4836 else if (lock->get_state() != LOCK_SYNC &&
4837 !lock->is_wrlocked() && // drain wrlocks first!
4838 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4839 !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
4840 !((lock->get_state() == LOCK_MIX) &&
4841 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4842 //((wanted & CEPH_CAP_RD) ||
4843 //in->is_replicated() ||
4844 //lock->get_num_client_lease() ||
4845 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4847 dout(7) << "file_eval stable, bump to sync " << *lock
4848 << " on " << *lock->get_parent() << dendl;
4849 simple_sync(lock, need_issue);
4855 void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
4857 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
4859 CInode *in = static_cast<CInode*>(lock->get_parent());
4860 assert(in->is_auth());
4861 assert(lock->is_stable());
4863 if (lock->get_state() == LOCK_LOCK) {
4864 in->start_scatter(lock);
4865 if (in->is_replicated()) {
4867 bufferlist softdata;
4868 lock->encode_locked_state(softdata);
4870 // bcast to replicas
4871 send_lock_message(lock, LOCK_AC_MIX, softdata);
4875 lock->set_state(LOCK_MIX);
4876 lock->clear_scatter_wanted();
4877 if (lock->get_cap_shift()) {
4885 switch (lock->get_state()) {
4886 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
4888 file_excl(lock, need_issue);
4889 if (lock->get_state() != LOCK_EXCL)
4892 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
4893 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
4894 default: ceph_abort();
4898 if (lock->is_rdlocked())
4900 if (in->is_replicated()) {
4901 if (lock->get_state() != LOCK_EXCL_MIX && // EXCL replica is already LOCK
4902 lock->get_state() != LOCK_XSYN_EXCL) { // XSYN replica is already LOCK; ** FIXME here too!
4903 send_lock_message(lock, LOCK_AC_MIX);
4904 lock->init_gather();
4908 if (lock->is_leased()) {
4909 revoke_client_leases(lock);
4912 if (lock->get_cap_shift() &&
4914 in->issued_caps_need_gather(lock)) {
4921 bool need_recover = false;
4922 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4923 mds->mdcache->queue_file_recover(in);
4924 need_recover = true;
4929 lock->get_parent()->auth_pin(lock);
4931 mds->mdcache->do_file_recover();
4933 in->start_scatter(lock);
4934 lock->set_state(LOCK_MIX);
4935 lock->clear_scatter_wanted();
4936 if (in->is_replicated()) {
4937 bufferlist softdata;
4938 lock->encode_locked_state(softdata);
4939 send_lock_message(lock, LOCK_AC_MIX, softdata);
4941 if (lock->get_cap_shift()) {
4952 void Locker::file_excl(ScatterLock *lock, bool *need_issue)
4954 CInode *in = static_cast<CInode*>(lock->get_parent());
4955 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
4957 assert(in->is_auth());
4958 assert(lock->is_stable());
4960 assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
4961 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
4963 switch (lock->get_state()) {
4964 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4965 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
4966 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4967 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4968 default: ceph_abort();
4972 if (lock->is_rdlocked())
4974 if (lock->is_wrlocked())
4977 if (in->is_replicated() &&
4978 lock->get_state() != LOCK_LOCK_EXCL &&
4979 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
4980 send_lock_message(lock, LOCK_AC_LOCK);
4981 lock->init_gather();
4984 if (lock->is_leased()) {
4985 revoke_client_leases(lock);
4988 if (in->is_head() &&
4989 in->issued_caps_need_gather(lock)) {
4996 bool need_recover = false;
4997 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4998 mds->mdcache->queue_file_recover(in);
4999 need_recover = true;
5004 lock->get_parent()->auth_pin(lock);
5006 mds->mdcache->do_file_recover();
5008 lock->set_state(LOCK_EXCL);
5016 void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5018 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5019 CInode *in = static_cast<CInode *>(lock->get_parent());
5020 assert(in->is_auth());
5021 assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
5023 switch (lock->get_state()) {
5024 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5025 default: ceph_abort();
5029 if (lock->is_wrlocked())
5032 if (in->is_head() &&
5033 in->issued_caps_need_gather(lock)) {
5042 lock->get_parent()->auth_pin(lock);
5044 lock->set_state(LOCK_XSYN);
5045 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5053 void Locker::file_recover(ScatterLock *lock)
5055 CInode *in = static_cast<CInode *>(lock->get_parent());
5056 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5058 assert(in->is_auth());
5059 //assert(lock->is_stable());
5060 assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5065 if (in->is_replicated()
5066 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5067 send_lock_message(lock, LOCK_AC_LOCK);
5068 lock->init_gather();
5072 if (in->is_head() &&
5073 in->issued_caps_need_gather(lock)) {
5078 lock->set_state(LOCK_SCAN);
5080 in->state_set(CInode::STATE_NEEDSRECOVER);
5082 mds->mdcache->queue_file_recover(in);
5087 /* This function DOES put the passed message before returning */
5088 void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
5090 CInode *in = static_cast<CInode*>(lock->get_parent());
5091 int from = m->get_asker();
5093 if (mds->is_rejoin()) {
5094 if (in->is_rejoining()) {
5095 dout(7) << "handle_file_lock still rejoining " << *in
5096 << ", dropping " << *m << dendl;
5102 dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
5104 << " from mds." << from << " "
5107 bool caps = lock->get_cap_shift();
5109 switch (m->get_action()) {
5112 assert(lock->get_state() == LOCK_LOCK ||
5113 lock->get_state() == LOCK_MIX ||
5114 lock->get_state() == LOCK_MIX_SYNC2);
5116 if (lock->get_state() == LOCK_MIX) {
5117 lock->set_state(LOCK_MIX_SYNC);
5118 eval_gather(lock, true);
5119 if (lock->is_unstable_and_locked())
5120 mds->mdlog->flush();
5124 (static_cast<ScatterLock *>(lock))->finish_flush();
5125 (static_cast<ScatterLock *>(lock))->clear_flushed();
5128 lock->decode_locked_state(m->get_data());
5129 lock->set_state(LOCK_SYNC);
5134 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5139 switch (lock->get_state()) {
5140 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5141 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5142 default: ceph_abort();
5145 eval_gather(lock, true);
5146 if (lock->is_unstable_and_locked())
5147 mds->mdlog->flush();
5151 case LOCK_AC_LOCKFLUSHED:
5152 (static_cast<ScatterLock *>(lock))->finish_flush();
5153 (static_cast<ScatterLock *>(lock))->clear_flushed();
5154 // wake up scatter_nudge waiters
5155 if (lock->is_stable())
5156 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5160 assert(lock->get_state() == LOCK_SYNC ||
5161 lock->get_state() == LOCK_LOCK ||
5162 lock->get_state() == LOCK_SYNC_MIX2);
5164 if (lock->get_state() == LOCK_SYNC) {
5166 lock->set_state(LOCK_SYNC_MIX);
5167 eval_gather(lock, true);
5168 if (lock->is_unstable_and_locked())
5169 mds->mdlog->flush();
5174 lock->set_state(LOCK_MIX);
5175 lock->decode_locked_state(m->get_data());
5180 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5185 case LOCK_AC_LOCKACK:
5186 assert(lock->get_state() == LOCK_SYNC_LOCK ||
5187 lock->get_state() == LOCK_MIX_LOCK ||
5188 lock->get_state() == LOCK_MIX_LOCK2 ||
5189 lock->get_state() == LOCK_MIX_EXCL ||
5190 lock->get_state() == LOCK_SYNC_EXCL ||
5191 lock->get_state() == LOCK_SYNC_MIX ||
5192 lock->get_state() == LOCK_MIX_TSYN);
5193 assert(lock->is_gathering(from));
5194 lock->remove_gather(from);
5196 if (lock->get_state() == LOCK_MIX_LOCK ||
5197 lock->get_state() == LOCK_MIX_LOCK2 ||
5198 lock->get_state() == LOCK_MIX_EXCL ||
5199 lock->get_state() == LOCK_MIX_TSYN) {
5200 lock->decode_locked_state(m->get_data());
5201 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5202 // delay calling scatter_writebehind().
5203 lock->clear_flushed();
5206 if (lock->is_gathering()) {
5207 dout(7) << "handle_file_lock " << *in << " from " << from
5208 << ", still gathering " << lock->get_gather_set() << dendl;
5210 dout(7) << "handle_file_lock " << *in << " from " << from
5211 << ", last one" << dendl;
5216 case LOCK_AC_SYNCACK:
5217 assert(lock->get_state() == LOCK_MIX_SYNC);
5218 assert(lock->is_gathering(from));
5219 lock->remove_gather(from);
5221 lock->decode_locked_state(m->get_data());
5223 if (lock->is_gathering()) {
5224 dout(7) << "handle_file_lock " << *in << " from " << from
5225 << ", still gathering " << lock->get_gather_set() << dendl;
5227 dout(7) << "handle_file_lock " << *in << " from " << from
5228 << ", last one" << dendl;
5233 case LOCK_AC_MIXACK:
5234 assert(lock->get_state() == LOCK_SYNC_MIX);
5235 assert(lock->is_gathering(from));
5236 lock->remove_gather(from);
5238 if (lock->is_gathering()) {
5239 dout(7) << "handle_file_lock " << *in << " from " << from
5240 << ", still gathering " << lock->get_gather_set() << dendl;
5242 dout(7) << "handle_file_lock " << *in << " from " << from
5243 << ", last one" << dendl;
5250 case LOCK_AC_REQSCATTER:
5251 if (lock->is_stable()) {
5252 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5253 * because the replica should be holding an auth_pin if they're
5254 * doing this (and thus, we are freezing, not frozen, and indefinite
5255 * starvation isn't an issue).
5257 dout(7) << "handle_file_lock got scatter request on " << *lock
5258 << " on " << *lock->get_parent() << dendl;
5259 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5262 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5263 << " on " << *lock->get_parent() << dendl;
5264 lock->set_scatter_wanted();
5268 case LOCK_AC_REQUNSCATTER:
5269 if (lock->is_stable()) {
5270 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5271 * because the replica should be holding an auth_pin if they're
5272 * doing this (and thus, we are freezing, not frozen, and indefinite
5273 * starvation isn't an issue).
5275 dout(7) << "handle_file_lock got unscatter request on " << *lock
5276 << " on " << *lock->get_parent() << dendl;
5277 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5278 simple_lock(lock); // FIXME tempsync?
5280 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5281 << " on " << *lock->get_parent() << dendl;
5282 lock->set_unscatter_wanted();
5286 case LOCK_AC_REQRDLOCK:
5287 handle_reqrdlock(lock, m);
5291 if (!lock->get_parent()->is_auth()) {
5292 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5293 << " on " << *lock->get_parent() << dendl;
5294 } else if (!lock->get_parent()->is_replicated()) {
5295 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5296 << " on " << *lock->get_parent() << dendl;
5298 dout(7) << "handle_file_lock trying nudge on " << *lock
5299 << " on " << *lock->get_parent() << dendl;
5300 scatter_nudge(lock, 0, true);
5301 mds->mdlog->flush();