X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmds%2FLocker.cc;fp=src%2Fceph%2Fsrc%2Fmds%2FLocker.cc;h=a0ccf96016be557ca2ed2661894243659344e9da;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/mds/Locker.cc b/src/ceph/src/mds/Locker.cc new file mode 100644 index 0000000..a0ccf96 --- /dev/null +++ b/src/ceph/src/mds/Locker.cc @@ -0,0 +1,5316 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include "MDSRank.h" +#include "MDCache.h" +#include "Locker.h" +#include "CInode.h" +#include "CDir.h" +#include "CDentry.h" +#include "Mutation.h" +#include "MDSContext.h" + +#include "MDLog.h" +#include "MDSMap.h" + +#include "events/EUpdate.h" +#include "events/EOpen.h" + +#include "msg/Messenger.h" +#include "osdc/Objecter.h" + +#include "messages/MInodeFileCaps.h" +#include "messages/MLock.h" +#include "messages/MClientLease.h" +#include "messages/MClientReply.h" +#include "messages/MClientCaps.h" +#include "messages/MClientCapRelease.h" + +#include "messages/MMDSSlaveRequest.h" + +#include + +#include "common/config.h" + + +#define dout_subsys ceph_subsys_mds +#undef dout_prefix +#define dout_context g_ceph_context +#define dout_prefix _prefix(_dout, mds) +static ostream& _prefix(std::ostream *_dout, MDSRank *mds) { + return *_dout << "mds." << mds->get_nodeid() << ".locker "; +} + + +class LockerContext : public MDSInternalContextBase { +protected: + Locker *locker; + MDSRank *get_mds() override + { + return locker->mds; + } + +public: + explicit LockerContext(Locker *locker_) : locker(locker_) { + assert(locker != NULL); + } +}; + +class LockerLogContext : public MDSLogContextBase { +protected: + Locker *locker; + MDSRank *get_mds() override + { + return locker->mds; + } + +public: + explicit LockerLogContext(Locker *locker_) : locker(locker_) { + assert(locker != NULL); + } +}; + +/* This function DOES put the passed message before returning */ +void Locker::dispatch(Message *m) +{ + + switch (m->get_type()) { + + // inter-mds locking + case MSG_MDS_LOCK: + handle_lock(static_cast(m)); + break; + // inter-mds caps + case MSG_MDS_INODEFILECAPS: + handle_inode_file_caps(static_cast(m)); + break; + + // client sync + case CEPH_MSG_CLIENT_CAPS: + handle_client_caps(static_cast(m)); + + break; + case CEPH_MSG_CLIENT_CAPRELEASE: + handle_client_cap_release(static_cast(m)); + break; + case CEPH_MSG_CLIENT_LEASE: + handle_client_lease(static_cast(m)); + break; + + default: + derr << "locker unknown message " << m->get_type() << dendl; + assert(0 == "locker unknown message"); + } +} + +void Locker::tick() +{ + scatter_tick(); + caps_tick(); +} + +/* + * locks vs rejoin + * + * + * + */ + +void Locker::send_lock_message(SimpleLock *lock, int msg) +{ + for (const auto &it : lock->get_parent()->get_replicas()) { + if (mds->is_cluster_degraded() && + mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN) + continue; + MLock *m = new MLock(lock, msg, mds->get_nodeid()); + mds->send_message_mds(m, it.first); + } +} + +void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data) +{ + for (const auto &it : lock->get_parent()->get_replicas()) { + if (mds->is_cluster_degraded() && + mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN) + continue; + MLock *m = new MLock(lock, msg, mds->get_nodeid()); + m->set_data(data); + mds->send_message_mds(m, it.first); + } +} + + + + +void Locker::include_snap_rdlocks(set& rdlocks, CInode *in) +{ + // rdlock ancestor snaps + CInode *t = in; + rdlocks.insert(&in->snaplock); + while (t->get_projected_parent_dn()) { + t = t->get_projected_parent_dn()->get_dir()->get_inode(); + rdlocks.insert(&t->snaplock); + } +} + +void Locker::include_snap_rdlocks_wlayout(set& rdlocks, CInode *in, + file_layout_t **layout) +{ + //rdlock ancestor snaps + CInode *t = in; + rdlocks.insert(&in->snaplock); + rdlocks.insert(&in->policylock); + bool found_layout = false; + while (t) { + rdlocks.insert(&t->snaplock); + if (!found_layout) { + rdlocks.insert(&t->policylock); + if (t->get_projected_inode()->has_layout()) { + *layout = &t->get_projected_inode()->layout; + found_layout = true; + } + } + if (t->get_projected_parent_dn() && + t->get_projected_parent_dn()->get_dir()) + t = t->get_projected_parent_dn()->get_dir()->get_inode(); + else t = NULL; + } +} + +struct MarkEventOnDestruct { + MDRequestRef& mdr; + const char* message; + bool mark_event; + MarkEventOnDestruct(MDRequestRef& _mdr, + const char *_message) : mdr(_mdr), + message(_message), + mark_event(true) {} + ~MarkEventOnDestruct() { + if (mark_event) + mdr->mark_event(message); + } +}; + +/* If this function returns false, the mdr has been placed + * on the appropriate wait list */ +bool Locker::acquire_locks(MDRequestRef& mdr, + set &rdlocks, + set &wrlocks, + set &xlocks, + map *remote_wrlocks, + CInode *auth_pin_freeze, + bool auth_pin_nonblock) +{ + if (mdr->done_locking && + !mdr->is_slave()) { // not on slaves! master requests locks piecemeal. + dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl; + return true; // at least we had better be! + } + dout(10) << "acquire_locks " << *mdr << dendl; + + MarkEventOnDestruct marker(mdr, "failed to acquire_locks"); + + client_t client = mdr->get_client(); + + set sorted; // sort everything we will lock + set mustpin; // items to authpin + + // xlocks + for (set::iterator p = xlocks.begin(); p != xlocks.end(); ++p) { + dout(20) << " must xlock " << **p << " " << *(*p)->get_parent() << dendl; + sorted.insert(*p); + mustpin.insert((*p)->get_parent()); + + // augment xlock with a versionlock? + if ((*p)->get_type() == CEPH_LOCK_DN) { + CDentry *dn = (CDentry*)(*p)->get_parent(); + if (!dn->is_auth()) + continue; + + if (xlocks.count(&dn->versionlock)) + continue; // we're xlocking the versionlock too; don't wrlock it! + + if (mdr->is_master()) { + // master. wrlock versionlock so we can pipeline dentry updates to journal. + wrlocks.insert(&dn->versionlock); + } else { + // slave. exclusively lock the dentry version (i.e. block other journal updates). + // this makes rollback safe. + xlocks.insert(&dn->versionlock); + sorted.insert(&dn->versionlock); + } + } + if ((*p)->get_type() > CEPH_LOCK_IVERSION) { + // inode version lock? + CInode *in = (CInode*)(*p)->get_parent(); + if (!in->is_auth()) + continue; + if (mdr->is_master()) { + // master. wrlock versionlock so we can pipeline inode updates to journal. + wrlocks.insert(&in->versionlock); + } else { + // slave. exclusively lock the inode version (i.e. block other journal updates). + // this makes rollback safe. + xlocks.insert(&in->versionlock); + sorted.insert(&in->versionlock); + } + } + } + + // wrlocks + for (set::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) { + MDSCacheObject *object = (*p)->get_parent(); + dout(20) << " must wrlock " << **p << " " << *object << dendl; + sorted.insert(*p); + if (object->is_auth()) + mustpin.insert(object); + else if (!object->is_auth() && + !(*p)->can_wrlock(client) && // we might have to request a scatter + !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned + dout(15) << " will also auth_pin " << *object + << " in case we need to request a scatter" << dendl; + mustpin.insert(object); + } + } + + // remote_wrlocks + if (remote_wrlocks) { + for (map::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) { + MDSCacheObject *object = p->first->get_parent(); + dout(20) << " must remote_wrlock on mds." << p->second << " " + << *p->first << " " << *object << dendl; + sorted.insert(p->first); + mustpin.insert(object); + } + } + + // rdlocks + for (set::iterator p = rdlocks.begin(); + p != rdlocks.end(); + ++p) { + MDSCacheObject *object = (*p)->get_parent(); + dout(20) << " must rdlock " << **p << " " << *object << dendl; + sorted.insert(*p); + if (object->is_auth()) + mustpin.insert(object); + else if (!object->is_auth() && + !(*p)->can_rdlock(client)) { // we might have to request an rdlock + dout(15) << " will also auth_pin " << *object + << " in case we need to request a rdlock" << dendl; + mustpin.insert(object); + } + } + + + // AUTH PINS + map > mustpin_remote; // mds -> (object set) + + // can i auth pin them all now? + marker.message = "failed to authpin local pins"; + for (set::iterator p = mustpin.begin(); + p != mustpin.end(); + ++p) { + MDSCacheObject *object = *p; + + dout(10) << " must authpin " << *object << dendl; + + if (mdr->is_auth_pinned(object)) { + if (object != (MDSCacheObject*)auth_pin_freeze) + continue; + if (mdr->more()->is_remote_frozen_authpin) { + if (mdr->more()->rename_inode == auth_pin_freeze) + continue; + // unfreeze auth pin for the wrong inode + mustpin_remote[mdr->more()->rename_inode->authority().first].size(); + } + } + + if (!object->is_auth()) { + if (!mdr->locks.empty()) + drop_locks(mdr.get()); + if (object->is_ambiguous_auth()) { + // wait + dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl; + object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr)); + mdr->drop_local_auth_pins(); + return false; + } + mustpin_remote[object->authority().first].insert(object); + continue; + } + if (!object->can_auth_pin()) { + // wait + drop_locks(mdr.get()); + mdr->drop_local_auth_pins(); + if (auth_pin_nonblock) { + dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl; + mdr->aborted = true; + return false; + } + dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl; + object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr)); + + if (!mdr->remote_auth_pins.empty()) + notify_freeze_waiter(object); + + return false; + } + } + + // ok, grab local auth pins + for (set::iterator p = mustpin.begin(); + p != mustpin.end(); + ++p) { + MDSCacheObject *object = *p; + if (mdr->is_auth_pinned(object)) { + dout(10) << " already auth_pinned " << *object << dendl; + } else if (object->is_auth()) { + dout(10) << " auth_pinning " << *object << dendl; + mdr->auth_pin(object); + } + } + + // request remote auth_pins + if (!mustpin_remote.empty()) { + marker.message = "requesting remote authpins"; + for (map::iterator p = mdr->remote_auth_pins.begin(); + p != mdr->remote_auth_pins.end(); + ++p) { + if (mustpin.count(p->first)) { + assert(p->second == p->first->authority().first); + map >::iterator q = mustpin_remote.find(p->second); + if (q != mustpin_remote.end()) + q->second.insert(p->first); + } + } + for (map >::iterator p = mustpin_remote.begin(); + p != mustpin_remote.end(); + ++p) { + dout(10) << "requesting remote auth_pins from mds." << p->first << dendl; + + // wait for active auth + if (mds->is_cluster_degraded() && + !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) { + dout(10) << " mds." << p->first << " is not active" << dendl; + if (mdr->more()->waiting_on_slave.empty()) + mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr)); + return false; + } + + MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt, + MMDSSlaveRequest::OP_AUTHPIN); + for (set::iterator q = p->second.begin(); + q != p->second.end(); + ++q) { + dout(10) << " req remote auth_pin of " << **q << dendl; + MDSCacheObjectInfo info; + (*q)->set_object_info(info); + req->get_authpins().push_back(info); + if (*q == auth_pin_freeze) + (*q)->set_object_info(req->get_authpin_freeze()); + mdr->pin(*q); + } + if (auth_pin_nonblock) + req->mark_nonblock(); + mds->send_message_mds(req, p->first); + + // put in waiting list + assert(mdr->more()->waiting_on_slave.count(p->first) == 0); + mdr->more()->waiting_on_slave.insert(p->first); + } + return false; + } + + // caps i'll need to issue + set issue_set; + bool result = false; + + // acquire locks. + // make sure they match currently acquired locks. + set::iterator existing = mdr->locks.begin(); + for (set::iterator p = sorted.begin(); + p != sorted.end(); + ++p) { + bool need_wrlock = !!wrlocks.count(*p); + bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p)); + + // already locked? + if (existing != mdr->locks.end() && *existing == *p) { + // right kind? + SimpleLock *have = *existing; + ++existing; + if (xlocks.count(have) && mdr->xlocks.count(have)) { + dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl; + continue; + } + if (mdr->remote_wrlocks.count(have)) { + if (!need_remote_wrlock || + mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) { + dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have] + << " " << *have << " " << *have->get_parent() << dendl; + remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get()); + } + } + if (need_wrlock || need_remote_wrlock) { + if (need_wrlock == !!mdr->wrlocks.count(have) && + need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) { + if (need_wrlock) + dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl; + if (need_remote_wrlock) + dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl; + continue; + } + } + if (rdlocks.count(have) && mdr->rdlocks.count(have)) { + dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl; + continue; + } + } + + // hose any stray locks + if (existing != mdr->locks.end() && *existing == *p) { + assert(need_wrlock || need_remote_wrlock); + SimpleLock *lock = *existing; + if (mdr->wrlocks.count(lock)) { + if (!need_wrlock) + dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl; + else if (need_remote_wrlock) // acquire remote_wrlock first + dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl; + bool need_issue = false; + wrlock_finish(lock, mdr.get(), &need_issue); + if (need_issue) + issue_set.insert(static_cast(lock->get_parent())); + } + ++existing; + } + while (existing != mdr->locks.end()) { + SimpleLock *stray = *existing; + ++existing; + dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl; + bool need_issue = false; + if (mdr->xlocks.count(stray)) { + xlock_finish(stray, mdr.get(), &need_issue); + } else if (mdr->rdlocks.count(stray)) { + rdlock_finish(stray, mdr.get(), &need_issue); + } else { + // may have acquired both wrlock and remore wrlock + if (mdr->wrlocks.count(stray)) + wrlock_finish(stray, mdr.get(), &need_issue); + if (mdr->remote_wrlocks.count(stray)) + remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get()); + } + if (need_issue) + issue_set.insert(static_cast(stray->get_parent())); + } + + // lock + if (mdr->locking && *p != mdr->locking) { + cancel_locking(mdr.get(), &issue_set); + } + if (xlocks.count(*p)) { + marker.message = "failed to xlock, waiting"; + if (!xlock_start(*p, mdr)) + goto out; + dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl; + } else if (need_wrlock || need_remote_wrlock) { + if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) { + marker.message = "waiting for remote wrlocks"; + remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr); + goto out; + } + if (need_wrlock && !mdr->wrlocks.count(*p)) { + marker.message = "failed to wrlock, waiting"; + if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) { + marker.message = "failed to wrlock, dropping remote wrlock and waiting"; + // can't take the wrlock because the scatter lock is gathering. need to + // release the remote wrlock, so that the gathering process can finish. + remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get()); + remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr); + goto out; + } + // nowait if we have already gotten remote wrlock + if (!wrlock_start(*p, mdr, need_remote_wrlock)) + goto out; + dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl; + } + } else { + assert(mdr->is_master()); + if ((*p)->is_scatterlock()) { + ScatterLock *slock = static_cast(*p); + if (slock->is_rejoin_mix()) { + // If there is a recovering mds who replcated an object when it failed + // and scatterlock in the object was in MIX state, It's possible that + // the recovering mds needs to take wrlock on the scatterlock when it + // replays unsafe requests. So this mds should delay taking rdlock on + // the scatterlock until the recovering mds finishes replaying unsafe. + // Otherwise unsafe requests may get replayed after current request. + // + // For example: + // The recovering mds is auth mds of a dirfrag, this mds is auth mds + // of correspinding inode. when 'rm -rf' the direcotry, this mds should + // delay the rmdir request until the recovering mds has replayed unlink + // requests. + if (mds->is_cluster_degraded()) { + if (!mdr->is_replay()) { + drop_locks(mdr.get()); + mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr)); + dout(10) << " rejoin mix scatterlock " << *slock << " " << *(*p)->get_parent() + << ", waiting for cluster recovered" << dendl; + marker.message = "rejoin mix scatterlock, waiting for cluster recovered"; + return false; + } + } else { + slock->clear_rejoin_mix(); + } + } + } + + marker.message = "failed to rdlock, waiting"; + if (!rdlock_start(*p, mdr)) + goto out; + dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl; + } + } + + // any extra unneeded locks? + while (existing != mdr->locks.end()) { + SimpleLock *stray = *existing; + ++existing; + dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl; + bool need_issue = false; + if (mdr->xlocks.count(stray)) { + xlock_finish(stray, mdr.get(), &need_issue); + } else if (mdr->rdlocks.count(stray)) { + rdlock_finish(stray, mdr.get(), &need_issue); + } else { + // may have acquired both wrlock and remore wrlock + if (mdr->wrlocks.count(stray)) + wrlock_finish(stray, mdr.get(), &need_issue); + if (mdr->remote_wrlocks.count(stray)) + remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get()); + } + if (need_issue) + issue_set.insert(static_cast(stray->get_parent())); + } + + mdr->done_locking = true; + mdr->set_mds_stamp(ceph_clock_now()); + result = true; + marker.message = "acquired locks"; + + out: + issue_caps_set(issue_set); + return result; +} + +void Locker::notify_freeze_waiter(MDSCacheObject *o) +{ + CDir *dir = NULL; + if (CInode *in = dynamic_cast(o)) { + if (!in->is_root()) + dir = in->get_parent_dir(); + } else if (CDentry *dn = dynamic_cast(o)) { + dir = dn->get_dir(); + } else { + dir = dynamic_cast(o); + assert(dir); + } + if (dir) { + if (dir->is_freezing_dir()) + mdcache->fragment_freeze_inc_num_waiters(dir); + if (dir->is_freezing_tree()) { + while (!dir->is_freezing_tree_root()) + dir = dir->get_parent_dir(); + mdcache->migrator->export_freeze_inc_num_waiters(dir); + } + } +} + +void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry) +{ + for (set::iterator p = mut->xlocks.begin(); + p != mut->xlocks.end(); + ++p) { + MDSCacheObject *object = (*p)->get_parent(); + assert(object->is_auth()); + if (skip_dentry && + ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION)) + continue; + dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl; + (*p)->set_xlock_done(); + } +} + +void Locker::_drop_rdlocks(MutationImpl *mut, set *pneed_issue) +{ + while (!mut->rdlocks.empty()) { + bool ni = false; + MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent(); + rdlock_finish(*mut->rdlocks.begin(), mut, &ni); + if (ni) + pneed_issue->insert(static_cast(p)); + } +} + +void Locker::_drop_non_rdlocks(MutationImpl *mut, set *pneed_issue) +{ + set slaves; + + while (!mut->xlocks.empty()) { + SimpleLock *lock = *mut->xlocks.begin(); + MDSCacheObject *p = lock->get_parent(); + if (!p->is_auth()) { + assert(lock->get_sm()->can_remote_xlock); + slaves.insert(p->authority().first); + lock->put_xlock(); + mut->locks.erase(lock); + mut->xlocks.erase(lock); + continue; + } + bool ni = false; + xlock_finish(lock, mut, &ni); + if (ni) + pneed_issue->insert(static_cast(p)); + } + + while (!mut->remote_wrlocks.empty()) { + map::iterator p = mut->remote_wrlocks.begin(); + slaves.insert(p->second); + if (mut->wrlocks.count(p->first) == 0) + mut->locks.erase(p->first); + mut->remote_wrlocks.erase(p); + } + + while (!mut->wrlocks.empty()) { + bool ni = false; + MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent(); + wrlock_finish(*mut->wrlocks.begin(), mut, &ni); + if (ni) + pneed_issue->insert(static_cast(p)); + } + + for (set::iterator p = slaves.begin(); p != slaves.end(); ++p) { + if (!mds->is_cluster_degraded() || + mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) { + dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl; + MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt, + MMDSSlaveRequest::OP_DROPLOCKS); + mds->send_message_mds(slavereq, *p); + } + } +} + +void Locker::cancel_locking(MutationImpl *mut, set *pneed_issue) +{ + SimpleLock *lock = mut->locking; + assert(lock); + dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl; + + if (lock->get_parent()->is_auth()) { + bool need_issue = false; + if (lock->get_state() == LOCK_PREXLOCK) { + _finish_xlock(lock, -1, &need_issue); + } else if (lock->get_state() == LOCK_LOCK_XLOCK && + lock->get_num_xlocks() == 0) { + lock->set_state(LOCK_XLOCKDONE); + eval_gather(lock, true, &need_issue); + } + if (need_issue) + pneed_issue->insert(static_cast(lock->get_parent())); + } + mut->finish_locking(lock); +} + +void Locker::drop_locks(MutationImpl *mut, set *pneed_issue) +{ + // leftover locks + set my_need_issue; + if (!pneed_issue) + pneed_issue = &my_need_issue; + + if (mut->locking) + cancel_locking(mut, pneed_issue); + _drop_non_rdlocks(mut, pneed_issue); + _drop_rdlocks(mut, pneed_issue); + + if (pneed_issue == &my_need_issue) + issue_caps_set(*pneed_issue); + mut->done_locking = false; +} + +void Locker::drop_non_rdlocks(MutationImpl *mut, set *pneed_issue) +{ + set my_need_issue; + if (!pneed_issue) + pneed_issue = &my_need_issue; + + _drop_non_rdlocks(mut, pneed_issue); + + if (pneed_issue == &my_need_issue) + issue_caps_set(*pneed_issue); +} + +void Locker::drop_rdlocks(MutationImpl *mut, set *pneed_issue) +{ + set my_need_issue; + if (!pneed_issue) + pneed_issue = &my_need_issue; + + _drop_rdlocks(mut, pneed_issue); + + if (pneed_issue == &my_need_issue) + issue_caps_set(*pneed_issue); +} + + +// generics + +void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list *pfinishers) +{ + dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl; + assert(!lock->is_stable()); + + int next = lock->get_next_state(); + + CInode *in = 0; + bool caps = lock->get_cap_shift(); + if (lock->get_type() != CEPH_LOCK_DN) + in = static_cast(lock->get_parent()); + + bool need_issue = false; + + int loner_issued = 0, other_issued = 0, xlocker_issued = 0; + assert(!caps || in != NULL); + if (caps && in->is_head()) { + in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, + lock->get_cap_shift(), lock->get_cap_mask()); + dout(10) << " next state is " << lock->get_state_name(next) + << " issued/allows loner " << gcap_string(loner_issued) + << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next)) + << " xlocker " << gcap_string(xlocker_issued) + << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next)) + << " other " << gcap_string(other_issued) + << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next)) + << dendl; + + if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) || + (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) || + (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued))) + need_issue = true; + } + +#define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH))) + bool auth = lock->get_parent()->is_auth(); + if (!lock->is_gathering() && + (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) && + (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) && + (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) && + (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) && + !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind! + (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 && + (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 && + (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) && + lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds + lock->get_state() != LOCK_MIX_SYNC2 + ) { + dout(7) << "eval_gather finished gather on " << *lock + << " on " << *lock->get_parent() << dendl; + + if (lock->get_sm() == &sm_filelock) { + assert(in); + if (in->state_test(CInode::STATE_RECOVERING)) { + dout(7) << "eval_gather finished gather, but still recovering" << dendl; + return; + } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) { + dout(7) << "eval_gather finished gather, but need to recover" << dendl; + mds->mdcache->queue_file_recover(in); + mds->mdcache->do_file_recover(); + return; + } + } + + if (!lock->get_parent()->is_auth()) { + // replica: tell auth + mds_rank_t auth = lock->get_parent()->authority().first; + + if (lock->get_parent()->is_rejoining() && + mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) { + dout(7) << "eval_gather finished gather, but still rejoining " + << *lock->get_parent() << dendl; + return; + } + + if (!mds->is_cluster_degraded() || + mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) { + switch (lock->get_state()) { + case LOCK_SYNC_LOCK: + mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), + auth); + break; + + case LOCK_MIX_SYNC: + { + MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid()); + lock->encode_locked_state(reply->get_data()); + mds->send_message_mds(reply, auth); + next = LOCK_MIX_SYNC2; + (static_cast(lock))->start_flush(); + } + break; + + case LOCK_MIX_SYNC2: + (static_cast(lock))->finish_flush(); + (static_cast(lock))->clear_flushed(); + + case LOCK_SYNC_MIX2: + // do nothing, we already acked + break; + + case LOCK_SYNC_MIX: + { + MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid()); + mds->send_message_mds(reply, auth); + next = LOCK_SYNC_MIX2; + } + break; + + case LOCK_MIX_LOCK: + { + bufferlist data; + lock->encode_locked_state(data); + mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth); + (static_cast(lock))->start_flush(); + // we'll get an AC_LOCKFLUSHED to complete + } + break; + + default: + ceph_abort(); + } + } + } else { + // auth + + // once the first (local) stage of mix->lock gather complete we can + // gather from replicas + if (lock->get_state() == LOCK_MIX_LOCK && + lock->get_parent()->is_replicated()) { + dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl; + send_lock_message(lock, LOCK_AC_LOCK); + lock->init_gather(); + lock->set_state(LOCK_MIX_LOCK2); + return; + } + + if (lock->is_dirty() && !lock->is_flushed()) { + scatter_writebehind(static_cast(lock)); + mds->mdlog->flush(); + return; + } + lock->clear_flushed(); + + switch (lock->get_state()) { + // to mixed + case LOCK_TSYN_MIX: + case LOCK_SYNC_MIX: + case LOCK_EXCL_MIX: + in->start_scatter(static_cast(lock)); + if (lock->get_parent()->is_replicated()) { + bufferlist softdata; + lock->encode_locked_state(softdata); + send_lock_message(lock, LOCK_AC_MIX, softdata); + } + (static_cast(lock))->clear_scatter_wanted(); + break; + + case LOCK_XLOCK: + case LOCK_XLOCKDONE: + if (next != LOCK_SYNC) + break; + // fall-thru + + // to sync + case LOCK_EXCL_SYNC: + case LOCK_LOCK_SYNC: + case LOCK_MIX_SYNC: + case LOCK_XSYN_SYNC: + if (lock->get_parent()->is_replicated()) { + bufferlist softdata; + lock->encode_locked_state(softdata); + send_lock_message(lock, LOCK_AC_SYNC, softdata); + } + break; + } + + } + + lock->set_state(next); + + if (lock->get_parent()->is_auth() && + lock->is_stable()) + lock->get_parent()->auth_unpin(lock); + + // drop loner before doing waiters + if (caps && + in->is_head() && + in->is_auth() && + in->get_wanted_loner() != in->get_loner()) { + dout(10) << " trying to drop loner" << dendl; + if (in->try_drop_loner()) { + dout(10) << " dropped loner" << dendl; + need_issue = true; + } + } + + if (pfinishers) + lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK, + *pfinishers); + else + lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK); + + if (caps && in->is_head()) + need_issue = true; + + if (lock->get_parent()->is_auth() && + lock->is_stable()) + try_eval(lock, &need_issue); + } + + if (need_issue) { + if (pneed_issue) + *pneed_issue = true; + else if (in->is_head()) + issue_caps(in); + } + +} + +bool Locker::eval(CInode *in, int mask, bool caps_imported) +{ + bool need_issue = caps_imported; + list finishers; + + dout(10) << "eval " << mask << " " << *in << dendl; + + // choose loner? + if (in->is_auth() && in->is_head()) { + if (in->choose_ideal_loner() >= 0) { + if (in->try_set_loner()) { + dout(10) << "eval set loner to client." << in->get_loner() << dendl; + need_issue = true; + mask = -1; + } else + dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl; + } else + dout(10) << "eval doesn't want loner" << dendl; + } + + retry: + if (mask & CEPH_LOCK_IFILE) + eval_any(&in->filelock, &need_issue, &finishers, caps_imported); + if (mask & CEPH_LOCK_IAUTH) + eval_any(&in->authlock, &need_issue, &finishers, caps_imported); + if (mask & CEPH_LOCK_ILINK) + eval_any(&in->linklock, &need_issue, &finishers, caps_imported); + if (mask & CEPH_LOCK_IXATTR) + eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported); + if (mask & CEPH_LOCK_INEST) + eval_any(&in->nestlock, &need_issue, &finishers, caps_imported); + if (mask & CEPH_LOCK_IFLOCK) + eval_any(&in->flocklock, &need_issue, &finishers, caps_imported); + if (mask & CEPH_LOCK_IPOLICY) + eval_any(&in->policylock, &need_issue, &finishers, caps_imported); + + // drop loner? + if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) { + dout(10) << " trying to drop loner" << dendl; + if (in->try_drop_loner()) { + dout(10) << " dropped loner" << dendl; + need_issue = true; + + if (in->get_wanted_loner() >= 0) { + if (in->try_set_loner()) { + dout(10) << "eval end set loner to client." << in->get_loner() << dendl; + mask = -1; + goto retry; + } else { + dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl; + } + } + } + } + + finish_contexts(g_ceph_context, finishers); + + if (need_issue && in->is_head()) + issue_caps(in); + + dout(10) << "eval done" << dendl; + return need_issue; +} + +class C_Locker_Eval : public LockerContext { + MDSCacheObject *p; + int mask; +public: + C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) { + // We are used as an MDSCacheObject waiter, so should + // only be invoked by someone already holding the big lock. + assert(locker->mds->mds_lock.is_locked_by_me()); + p->get(MDSCacheObject::PIN_PTRWAITER); + } + void finish(int r) override { + locker->try_eval(p, mask); + p->put(MDSCacheObject::PIN_PTRWAITER); + } +}; + +void Locker::try_eval(MDSCacheObject *p, int mask) +{ + // unstable and ambiguous auth? + if (p->is_ambiguous_auth()) { + dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl; + p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask)); + return; + } + + if (p->is_auth() && p->is_frozen()) { + dout(7) << "try_eval frozen, waiting on " << *p << dendl; + p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask)); + return; + } + + if (mask & CEPH_LOCK_DN) { + assert(mask == CEPH_LOCK_DN); + bool need_issue = false; // ignore this, no caps on dentries + CDentry *dn = static_cast(p); + eval_any(&dn->lock, &need_issue); + } else { + CInode *in = static_cast(p); + eval(in, mask); + } +} + +void Locker::try_eval(SimpleLock *lock, bool *pneed_issue) +{ + MDSCacheObject *p = lock->get_parent(); + + // unstable and ambiguous auth? + if (p->is_ambiguous_auth()) { + dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl; + p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type())); + return; + } + + if (!p->is_auth()) { + dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl; + return; + } + + if (p->is_frozen()) { + dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl; + p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type())); + return; + } + + /* + * We could have a situation like: + * + * - mds A authpins item on mds B + * - mds B starts to freeze tree containing item + * - mds A tries wrlock_start on A, sends REQSCATTER to B + * - mds B lock is unstable, sets scatter_wanted + * - mds B lock stabilizes, calls try_eval. + * + * We can defer while freezing without causing a deadlock. Honor + * scatter_wanted flag here. This will never get deferred by the + * checks above due to the auth_pin held by the master. + */ + if (lock->is_scatterlock()) { + ScatterLock *slock = static_cast(lock); + if (slock->get_scatter_wanted() && + slock->get_state() != LOCK_MIX) { + scatter_mix(slock, pneed_issue); + if (!lock->is_stable()) + return; + } else if (slock->get_unscatter_wanted() && + slock->get_state() != LOCK_LOCK) { + simple_lock(slock, pneed_issue); + if (!lock->is_stable()) { + return; + } + } + } + + if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) { + dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl; + p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type())); + return; + } + + eval(lock, pneed_issue); +} + +void Locker::eval_cap_gather(CInode *in, set *issue_set) +{ + bool need_issue = false; + list finishers; + + // kick locks now + if (!in->filelock.is_stable()) + eval_gather(&in->filelock, false, &need_issue, &finishers); + if (!in->authlock.is_stable()) + eval_gather(&in->authlock, false, &need_issue, &finishers); + if (!in->linklock.is_stable()) + eval_gather(&in->linklock, false, &need_issue, &finishers); + if (!in->xattrlock.is_stable()) + eval_gather(&in->xattrlock, false, &need_issue, &finishers); + + if (need_issue && in->is_head()) { + if (issue_set) + issue_set->insert(in); + else + issue_caps(in); + } + + finish_contexts(g_ceph_context, finishers); +} + +void Locker::eval_scatter_gathers(CInode *in) +{ + bool need_issue = false; + list finishers; + + dout(10) << "eval_scatter_gathers " << *in << dendl; + + // kick locks now + if (!in->filelock.is_stable()) + eval_gather(&in->filelock, false, &need_issue, &finishers); + if (!in->nestlock.is_stable()) + eval_gather(&in->nestlock, false, &need_issue, &finishers); + if (!in->dirfragtreelock.is_stable()) + eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers); + + if (need_issue && in->is_head()) + issue_caps(in); + + finish_contexts(g_ceph_context, finishers); +} + +void Locker::eval(SimpleLock *lock, bool *need_issue) +{ + switch (lock->get_type()) { + case CEPH_LOCK_IFILE: + return file_eval(static_cast(lock), need_issue); + case CEPH_LOCK_IDFT: + case CEPH_LOCK_INEST: + return scatter_eval(static_cast(lock), need_issue); + default: + return simple_eval(lock, need_issue); + } +} + + +// ------------------ +// rdlock + +bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon) +{ + // kick the lock + if (lock->is_stable()) { + if (lock->get_parent()->is_auth()) { + if (lock->get_sm() == &sm_scatterlock) { + // not until tempsync is fully implemented + //if (lock->get_parent()->is_replicated()) + //scatter_tempsync((ScatterLock*)lock); + //else + simple_sync(lock); + } else if (lock->get_sm() == &sm_filelock) { + CInode *in = static_cast(lock->get_parent()); + if (lock->get_state() == LOCK_EXCL && + in->get_target_loner() >= 0 && + !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN + file_xsyn(lock); + else + simple_sync(lock); + } else + simple_sync(lock); + return true; + } else { + // request rdlock state change from auth + mds_rank_t auth = lock->get_parent()->authority().first; + if (!mds->is_cluster_degraded() || + mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { + dout(10) << "requesting rdlock from auth on " + << *lock << " on " << *lock->get_parent() << dendl; + mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth); + } + return false; + } + } + if (lock->get_type() == CEPH_LOCK_IFILE) { + CInode *in = static_cast(lock->get_parent()); + if (in->state_test(CInode::STATE_RECOVERING)) { + mds->mdcache->recovery_queue.prioritize(in); + } + } + + return false; +} + +bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con) +{ + dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl; + + // can read? grab ref. + if (lock->can_rdlock(client)) + return true; + + _rdlock_kick(lock, false); + + if (lock->can_rdlock(client)) + return true; + + // wait! + if (con) { + dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl; + lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con); + } + return false; +} + +bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon) +{ + dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl; + + // client may be allowed to rdlock the same item it has xlocked. + // UNLESS someone passes in as_anon, or we're reading snapped version here. + if (mut->snapid != CEPH_NOSNAP) + as_anon = true; + client_t client = as_anon ? -1 : mut->get_client(); + + CInode *in = 0; + if (lock->get_type() != CEPH_LOCK_DN) + in = static_cast(lock->get_parent()); + + /* + if (!lock->get_parent()->is_auth() && + lock->fw_rdlock_to_auth()) { + mdcache->request_forward(mut, lock->get_parent()->authority().first); + return false; + } + */ + + while (1) { + // can read? grab ref. + if (lock->can_rdlock(client)) { + lock->get_rdlock(); + mut->rdlocks.insert(lock); + mut->locks.insert(lock); + return true; + } + + // hmm, wait a second. + if (in && !in->is_head() && in->is_auth() && + lock->get_state() == LOCK_SNAP_SYNC) { + // okay, we actually need to kick the head's lock to get ourselves synced up. + CInode *head = mdcache->get_inode(in->ino()); + assert(head); + SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE); + if (hlock->get_state() == LOCK_SYNC) + hlock = head->get_lock(lock->get_type()); + + if (hlock->get_state() != LOCK_SYNC) { + dout(10) << "rdlock_start trying head inode " << *head << dendl; + if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL ** + return false; + // oh, check our lock again then + } + } + + if (!_rdlock_kick(lock, as_anon)) + break; + } + + // wait! + int wait_on; + if (lock->get_parent()->is_auth() && lock->is_stable()) + wait_on = SimpleLock::WAIT_RD; + else + wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry. + dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl; + lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut)); + nudge_log(lock); + return false; +} + +void Locker::nudge_log(SimpleLock *lock) +{ + dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl; + if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush + mds->mdlog->flush(); +} + +void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue) +{ + // drop ref + lock->put_rdlock(); + if (mut) { + mut->rdlocks.erase(lock); + mut->locks.erase(lock); + } + + dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl; + + // last one? + if (!lock->is_rdlocked()) { + if (!lock->is_stable()) + eval_gather(lock, false, pneed_issue); + else if (lock->get_parent()->is_auth()) + try_eval(lock, pneed_issue); + } +} + + +bool Locker::can_rdlock_set(set& locks) +{ + dout(10) << "can_rdlock_set " << locks << dendl; + for (set::iterator p = locks.begin(); p != locks.end(); ++p) + if (!(*p)->can_rdlock(-1)) { + dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl; + return false; + } + return true; +} + +bool Locker::rdlock_try_set(set& locks) +{ + dout(10) << "rdlock_try_set " << locks << dendl; + for (set::iterator p = locks.begin(); p != locks.end(); ++p) + if (!rdlock_try(*p, -1, NULL)) { + dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl; + return false; + } + return true; +} + +void Locker::rdlock_take_set(set& locks, MutationRef& mut) +{ + dout(10) << "rdlock_take_set " << locks << dendl; + for (set::iterator p = locks.begin(); p != locks.end(); ++p) { + (*p)->get_rdlock(); + mut->rdlocks.insert(*p); + mut->locks.insert(*p); + } +} + +// ------------------ +// wrlock + +void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut) +{ + if (lock->get_type() == CEPH_LOCK_IVERSION || + lock->get_type() == CEPH_LOCK_DVERSION) + return local_wrlock_grab(static_cast(lock), mut); + + dout(7) << "wrlock_force on " << *lock + << " on " << *lock->get_parent() << dendl; + lock->get_wrlock(true); + mut->wrlocks.insert(lock); + mut->locks.insert(lock); +} + +bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait) +{ + if (lock->get_type() == CEPH_LOCK_IVERSION || + lock->get_type() == CEPH_LOCK_DVERSION) + return local_wrlock_start(static_cast(lock), mut); + + dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl; + + CInode *in = static_cast(lock->get_parent()); + client_t client = mut->get_client(); + bool want_scatter = !nowait && lock->get_parent()->is_auth() && + (in->has_subtree_or_exporting_dirfrag() || + static_cast(lock)->get_scatter_wanted()); + + while (1) { + // wrlock? + if (lock->can_wrlock(client) && + (!want_scatter || lock->get_state() == LOCK_MIX)) { + lock->get_wrlock(); + mut->wrlocks.insert(lock); + mut->locks.insert(lock); + return true; + } + + if (lock->get_type() == CEPH_LOCK_IFILE && + in->state_test(CInode::STATE_RECOVERING)) { + mds->mdcache->recovery_queue.prioritize(in); + } + + if (!lock->is_stable()) + break; + + if (in->is_auth()) { + // don't do nested lock state change if we have dirty scatterdata and + // may scatter_writebehind or start_scatter, because nowait==true implies + // that the caller already has a log entry open! + if (nowait && lock->is_dirty()) + return false; + + if (want_scatter) + scatter_mix(static_cast(lock)); + else + simple_lock(lock); + + if (nowait && !lock->can_wrlock(client)) + return false; + + } else { + // replica. + // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case). + mds_rank_t auth = lock->get_parent()->authority().first; + if (!mds->is_cluster_degraded() || + mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { + dout(10) << "requesting scatter from auth on " + << *lock << " on " << *lock->get_parent() << dendl; + mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth); + } + break; + } + } + + if (!nowait) { + dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl; + lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); + nudge_log(lock); + } + + return false; +} + +void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue) +{ + if (lock->get_type() == CEPH_LOCK_IVERSION || + lock->get_type() == CEPH_LOCK_DVERSION) + return local_wrlock_finish(static_cast(lock), mut); + + dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl; + lock->put_wrlock(); + if (mut) { + mut->wrlocks.erase(lock); + if (mut->remote_wrlocks.count(lock) == 0) + mut->locks.erase(lock); + } + + if (!lock->is_wrlocked()) { + if (!lock->is_stable()) + eval_gather(lock, false, pneed_issue); + else if (lock->get_parent()->is_auth()) + try_eval(lock, pneed_issue); + } +} + + +// remote wrlock + +void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut) +{ + dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl; + + // wait for active target + if (mds->is_cluster_degraded() && + !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) { + dout(7) << " mds." << target << " is not active" << dendl; + if (mut->more()->waiting_on_slave.empty()) + mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut)); + return; + } + + // send lock request + mut->start_locking(lock, target); + mut->more()->slaves.insert(target); + MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt, + MMDSSlaveRequest::OP_WRLOCK); + r->set_lock_type(lock->get_type()); + lock->get_parent()->set_object_info(r->get_object_info()); + mds->send_message_mds(r, target); + + assert(mut->more()->waiting_on_slave.count(target) == 0); + mut->more()->waiting_on_slave.insert(target); +} + +void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target, + MutationImpl *mut) +{ + // drop ref + mut->remote_wrlocks.erase(lock); + if (mut->wrlocks.count(lock) == 0) + mut->locks.erase(lock); + + dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target + << " " << *lock->get_parent() << dendl; + if (!mds->is_cluster_degraded() || + mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) { + MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt, + MMDSSlaveRequest::OP_UNWRLOCK); + slavereq->set_lock_type(lock->get_type()); + lock->get_parent()->set_object_info(slavereq->get_object_info()); + mds->send_message_mds(slavereq, target); + } +} + + +// ------------------ +// xlock + +bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut) +{ + if (lock->get_type() == CEPH_LOCK_IVERSION || + lock->get_type() == CEPH_LOCK_DVERSION) + return local_xlock_start(static_cast(lock), mut); + + dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl; + client_t client = mut->get_client(); + + // auth? + if (lock->get_parent()->is_auth()) { + // auth + while (1) { + if (lock->can_xlock(client)) { + lock->set_state(LOCK_XLOCK); + lock->get_xlock(mut, client); + mut->xlocks.insert(lock); + mut->locks.insert(lock); + mut->finish_locking(lock); + return true; + } + + if (lock->get_type() == CEPH_LOCK_IFILE) { + CInode *in = static_cast(lock->get_parent()); + if (in->state_test(CInode::STATE_RECOVERING)) { + mds->mdcache->recovery_queue.prioritize(in); + } + } + + if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE || + lock->get_xlock_by_client() != client || + lock->is_waiter_for(SimpleLock::WAIT_STABLE))) + break; + + if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) { + mut->start_locking(lock); + simple_xlock(lock); + } else { + simple_lock(lock); + } + } + + lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); + nudge_log(lock); + return false; + } else { + // replica + assert(lock->get_sm()->can_remote_xlock); + assert(!mut->slave_request); + + // wait for single auth + if (lock->get_parent()->is_ambiguous_auth()) { + lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, + new C_MDS_RetryRequest(mdcache, mut)); + return false; + } + + // wait for active auth + mds_rank_t auth = lock->get_parent()->authority().first; + if (mds->is_cluster_degraded() && + !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { + dout(7) << " mds." << auth << " is not active" << dendl; + if (mut->more()->waiting_on_slave.empty()) + mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut)); + return false; + } + + // send lock request + mut->more()->slaves.insert(auth); + mut->start_locking(lock, auth); + MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt, + MMDSSlaveRequest::OP_XLOCK); + r->set_lock_type(lock->get_type()); + lock->get_parent()->set_object_info(r->get_object_info()); + mds->send_message_mds(r, auth); + + assert(mut->more()->waiting_on_slave.count(auth) == 0); + mut->more()->waiting_on_slave.insert(auth); + + return false; + } +} + +void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue) +{ + assert(!lock->is_stable()); + if (lock->get_num_rdlocks() == 0 && + lock->get_num_wrlocks() == 0 && + lock->get_num_client_lease() == 0 && + lock->get_state() != LOCK_XLOCKSNAP && + lock->get_type() != CEPH_LOCK_DN) { + CInode *in = static_cast(lock->get_parent()); + client_t loner = in->get_target_loner(); + if (loner >= 0 && (xlocker < 0 || xlocker == loner)) { + lock->set_state(LOCK_EXCL); + lock->get_parent()->auth_unpin(lock); + lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD); + if (lock->get_cap_shift()) + *pneed_issue = true; + if (lock->get_parent()->is_auth() && + lock->is_stable()) + try_eval(lock, pneed_issue); + return; + } + } + // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK + eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue); +} + +void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue) +{ + if (lock->get_type() == CEPH_LOCK_IVERSION || + lock->get_type() == CEPH_LOCK_DVERSION) + return local_xlock_finish(static_cast(lock), mut); + + dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl; + + client_t xlocker = lock->get_xlock_by_client(); + + // drop ref + lock->put_xlock(); + assert(mut); + mut->xlocks.erase(lock); + mut->locks.erase(lock); + + bool do_issue = false; + + // remote xlock? + if (!lock->get_parent()->is_auth()) { + assert(lock->get_sm()->can_remote_xlock); + + // tell auth + dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl; + mds_rank_t auth = lock->get_parent()->authority().first; + if (!mds->is_cluster_degraded() || + mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) { + MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt, + MMDSSlaveRequest::OP_UNXLOCK); + slavereq->set_lock_type(lock->get_type()); + lock->get_parent()->set_object_info(slavereq->get_object_info()); + mds->send_message_mds(slavereq, auth); + } + // others waiting? + lock->finish_waiters(SimpleLock::WAIT_STABLE | + SimpleLock::WAIT_WR | + SimpleLock::WAIT_RD, 0); + } else { + if (lock->get_num_xlocks() == 0) { + if (lock->get_state() == LOCK_LOCK_XLOCK) + lock->set_state(LOCK_XLOCKDONE); + _finish_xlock(lock, xlocker, &do_issue); + } + } + + if (do_issue) { + CInode *in = static_cast(lock->get_parent()); + if (in->is_head()) { + if (pneed_issue) + *pneed_issue = true; + else + issue_caps(in); + } + } +} + +void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut) +{ + dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl; + + lock->put_xlock(); + mut->xlocks.erase(lock); + mut->locks.erase(lock); + + MDSCacheObject *p = lock->get_parent(); + assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode) + + if (!lock->is_stable()) + lock->get_parent()->auth_unpin(lock); + + lock->set_state(LOCK_LOCK); +} + +void Locker::xlock_import(SimpleLock *lock) +{ + dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl; + lock->get_parent()->auth_pin(lock); +} + + + +// file i/o ----------------------------------------- + +version_t Locker::issue_file_data_version(CInode *in) +{ + dout(7) << "issue_file_data_version on " << *in << dendl; + return in->inode.file_data_version; +} + +class C_Locker_FileUpdate_finish : public LockerLogContext { + CInode *in; + MutationRef mut; + bool share_max; + bool need_issue; + client_t client; + MClientCaps *ack; +public: + C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, + bool sm=false, bool ni=false, client_t c=-1, + MClientCaps *ac = 0) + : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni), + client(c), ack(ac) { + in->get(CInode::PIN_PTRWAITER); + } + void finish(int r) override { + locker->file_update_finish(in, mut, share_max, need_issue, client, ack); + in->put(CInode::PIN_PTRWAITER); + } +}; + +void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap, + client_t client, MClientCaps *ack) +{ + dout(10) << "file_update_finish on " << *in << dendl; + in->pop_and_dirty_projected_inode(mut->ls); + + mut->apply(); + + if (ack) { + Session *session = mds->get_session(client); + if (session) { + // "oldest flush tid" > 0 means client uses unique TID for each flush + if (ack->get_oldest_flush_tid() > 0) + session->add_completed_flush(ack->get_client_tid()); + mds->send_message_client_counted(ack, session); + } else { + dout(10) << " no session for client." << client << " " << *ack << dendl; + ack->put(); + } + } + + set need_issue; + drop_locks(mut.get(), &need_issue); + + if (!in->is_head() && !in->client_snap_caps.empty()) { + dout(10) << " client_snap_caps " << in->client_snap_caps << dendl; + // check for snap writeback completion + bool gather = false; + compact_map >::iterator p = in->client_snap_caps.begin(); + while (p != in->client_snap_caps.end()) { + SimpleLock *lock = in->get_lock(p->first); + assert(lock); + dout(10) << " completing client_snap_caps for " << ccap_string(p->first) + << " lock " << *lock << " on " << *in << dendl; + lock->put_wrlock(); + + p->second.erase(client); + if (p->second.empty()) { + gather = true; + in->client_snap_caps.erase(p++); + } else + ++p; + } + if (gather) { + if (in->client_snap_caps.empty()) + in->item_open_file.remove_myself(); + eval_cap_gather(in, &need_issue); + } + } else { + if (issue_client_cap && need_issue.count(in) == 0) { + Capability *cap = in->get_client_cap(client); + if (cap && (cap->wanted() & ~cap->pending())) + issue_caps(in, cap); + } + + if (share_max && in->is_auth() && + (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER))) + share_inode_max_size(in); + } + issue_caps_set(need_issue); + + // auth unpin after issuing caps + mut->cleanup(); +} + +Capability* Locker::issue_new_caps(CInode *in, + int mode, + Session *session, + SnapRealm *realm, + bool is_replay) +{ + dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl; + bool is_new; + + // if replay, try to reconnect cap, and otherwise do nothing. + if (is_replay) { + mds->mdcache->try_reconnect_cap(in, session); + return 0; + } + + // my needs + assert(session->info.inst.name.is_client()); + client_t my_client = session->info.inst.name.num(); + int my_want = ceph_caps_for_mode(mode); + + // register a capability + Capability *cap = in->get_client_cap(my_client); + if (!cap) { + // new cap + cap = in->add_client_cap(my_client, session, realm); + cap->set_wanted(my_want); + cap->mark_new(); + cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply) + is_new = true; + } else { + is_new = false; + // make sure it wants sufficient caps + if (my_want & ~cap->wanted()) { + // augment wanted caps for this client + cap->set_wanted(cap->wanted() | my_want); + } + } + + if (in->is_auth()) { + // [auth] twiddle mode? + eval(in, CEPH_CAP_LOCKS); + + if (_need_flush_mdlog(in, my_want)) + mds->mdlog->flush(); + + } else { + // [replica] tell auth about any new caps wanted + request_inode_file_caps(in); + } + + // issue caps (pot. incl new one) + //issue_caps(in); // note: _eval above may have done this already... + + // re-issue whatever we can + //cap->issue(cap->pending()); + + if (is_new) + cap->dec_suppress(); + + return cap; +} + + +void Locker::issue_caps_set(set& inset) +{ + for (set::iterator p = inset.begin(); p != inset.end(); ++p) + issue_caps(*p); +} + +bool Locker::issue_caps(CInode *in, Capability *only_cap) +{ + // allowed caps are determined by the lock mode. + int all_allowed = in->get_caps_allowed_by_type(CAP_ANY); + int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER); + int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER); + + client_t loner = in->get_loner(); + if (loner >= 0) { + dout(7) << "issue_caps loner client." << loner + << " allowed=" << ccap_string(loner_allowed) + << ", xlocker allowed=" << ccap_string(xlocker_allowed) + << ", others allowed=" << ccap_string(all_allowed) + << " on " << *in << dendl; + } else { + dout(7) << "issue_caps allowed=" << ccap_string(all_allowed) + << ", xlocker allowed=" << ccap_string(xlocker_allowed) + << " on " << *in << dendl; + } + + assert(in->is_head()); + + // count conflicts with + int nissued = 0; + + // client caps + map::iterator it; + if (only_cap) + it = in->client_caps.find(only_cap->get_client()); + else + it = in->client_caps.begin(); + for (; it != in->client_caps.end(); ++it) { + Capability *cap = it->second; + if (cap->is_stale()) + continue; + + // do not issue _new_ bits when size|mtime is projected + int allowed; + if (loner == it->first) + allowed = loner_allowed; + else + allowed = all_allowed; + + // add in any xlocker-only caps (for locks this client is the xlocker for) + allowed |= xlocker_allowed & in->get_xlocker_mask(it->first); + + Session *session = mds->get_session(it->first); + if (in->inode.inline_data.version != CEPH_INLINE_NONE && + !(session && session->connection && + session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA))) + allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR); + + int pending = cap->pending(); + int wanted = cap->wanted(); + + dout(20) << " client." << it->first + << " pending " << ccap_string(pending) + << " allowed " << ccap_string(allowed) + << " wanted " << ccap_string(wanted) + << dendl; + + if (!(pending & ~allowed)) { + // skip if suppress or new, and not revocation + if (cap->is_new() || cap->is_suppress()) { + dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl; + continue; + } + } + + // notify clients about deleted inode, to make sure they release caps ASAP. + if (in->inode.nlink == 0) + wanted |= CEPH_CAP_LINK_SHARED; + + // are there caps that the client _wants_ and can have, but aren't pending? + // or do we need to revoke? + if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps + (pending & ~allowed)) { // need to revoke ~allowed caps. + // issue + nissued++; + + // include caps that clients generally like, while we're at it. + int likes = in->get_caps_liked(); + int before = pending; + long seq; + if (pending & ~allowed) + seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new. + else + seq = cap->issue((wanted|likes) & allowed); + int after = cap->pending(); + + if (cap->is_new()) { + // haven't send caps to client yet + if (before & ~after) + cap->confirm_receipt(seq, after); + } else { + dout(7) << " sending MClientCaps to client." << it->first + << " seq " << cap->get_last_seq() + << " new pending " << ccap_string(after) << " was " << ccap_string(before) + << dendl; + + int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT; + if (op == CEPH_CAP_OP_REVOKE) { + revoking_caps.push_back(&cap->item_revoking_caps); + revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps); + cap->set_last_revoke_stamp(ceph_clock_now()); + cap->reset_num_revoke_warnings(); + } + + MClientCaps *m = new MClientCaps(op, in->ino(), + in->find_snaprealm()->inode->ino(), + cap->get_cap_id(), cap->get_last_seq(), + after, wanted, 0, + cap->get_mseq(), + mds->get_osd_epoch_barrier()); + in->encode_cap_message(m, cap); + + mds->send_message_client_counted(m, it->first); + } + } + + if (only_cap) + break; + } + + return (nissued == 0); // true if no re-issued, no callbacks +} + +void Locker::issue_truncate(CInode *in) +{ + dout(7) << "issue_truncate on " << *in << dendl; + + for (map::iterator it = in->client_caps.begin(); + it != in->client_caps.end(); + ++it) { + Capability *cap = it->second; + MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC, + in->ino(), + in->find_snaprealm()->inode->ino(), + cap->get_cap_id(), cap->get_last_seq(), + cap->pending(), cap->wanted(), 0, + cap->get_mseq(), + mds->get_osd_epoch_barrier()); + in->encode_cap_message(m, cap); + mds->send_message_client_counted(m, it->first); + } + + // should we increase max_size? + if (in->is_auth() && in->is_file()) + check_inode_max_size(in); +} + + +void Locker::revoke_stale_caps(Capability *cap) +{ + CInode *in = cap->get_inode(); + if (in->state_test(CInode::STATE_EXPORTINGCAPS)) { + // if export succeeds, the cap will be removed. if export fails, we need to + // revoke the cap if it's still stale. + in->state_set(CInode::STATE_EVALSTALECAPS); + return; + } + + int issued = cap->issued(); + if (issued & ~CEPH_CAP_PIN) { + dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl; + cap->revoke(); + + if (in->is_auth() && + in->inode.client_ranges.count(cap->get_client())) + in->state_set(CInode::STATE_NEEDSRECOVER); + + if (!in->filelock.is_stable()) eval_gather(&in->filelock); + if (!in->linklock.is_stable()) eval_gather(&in->linklock); + if (!in->authlock.is_stable()) eval_gather(&in->authlock); + if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock); + + if (in->is_auth()) { + try_eval(in, CEPH_CAP_LOCKS); + } else { + request_inode_file_caps(in); + } + } +} + +void Locker::revoke_stale_caps(Session *session) +{ + dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl; + + for (xlist::iterator p = session->caps.begin(); !p.end(); ++p) { + Capability *cap = *p; + cap->mark_stale(); + revoke_stale_caps(cap); + } +} + +void Locker::resume_stale_caps(Session *session) +{ + dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl; + + for (xlist::iterator p = session->caps.begin(); !p.end(); ++p) { + Capability *cap = *p; + CInode *in = cap->get_inode(); + assert(in->is_head()); + if (cap->is_stale()) { + dout(10) << " clearing stale flag on " << *in << dendl; + cap->clear_stale(); + + if (in->state_test(CInode::STATE_EXPORTINGCAPS)) { + // if export succeeds, the cap will be removed. if export fails, + // we need to re-issue the cap if it's not stale. + in->state_set(CInode::STATE_EVALSTALECAPS); + continue; + } + + if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS)) + issue_caps(in, cap); + } + } +} + +void Locker::remove_stale_leases(Session *session) +{ + dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl; + xlist::iterator p = session->leases.begin(); + while (!p.end()) { + ClientLease *l = *p; + ++p; + CDentry *parent = static_cast(l->parent); + dout(15) << " removing lease on " << *parent << dendl; + parent->remove_client_lease(l, this); + } +} + + +class C_MDL_RequestInodeFileCaps : public LockerContext { + CInode *in; +public: + C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) { + in->get(CInode::PIN_PTRWAITER); + } + void finish(int r) override { + if (!in->is_auth()) + locker->request_inode_file_caps(in); + in->put(CInode::PIN_PTRWAITER); + } +}; + +void Locker::request_inode_file_caps(CInode *in) +{ + assert(!in->is_auth()); + + int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN; + if (wanted != in->replica_caps_wanted) { + // wait for single auth + if (in->is_ambiguous_auth()) { + in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, + new C_MDL_RequestInodeFileCaps(this, in)); + return; + } + + mds_rank_t auth = in->authority().first; + if (mds->is_cluster_degraded() && + mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) { + mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in)); + return; + } + + dout(7) << "request_inode_file_caps " << ccap_string(wanted) + << " was " << ccap_string(in->replica_caps_wanted) + << " on " << *in << " to mds." << auth << dendl; + + in->replica_caps_wanted = wanted; + + if (!mds->is_cluster_degraded() || + mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) + mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted), + auth); + } +} + +/* This function DOES put the passed message before returning */ +void Locker::handle_inode_file_caps(MInodeFileCaps *m) +{ + // nobody should be talking to us during recovery. + assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping()); + + // ok + CInode *in = mdcache->get_inode(m->get_ino()); + mds_rank_t from = mds_rank_t(m->get_source().num()); + + assert(in); + assert(in->is_auth()); + + dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl; + + if (m->get_caps()) + in->mds_caps_wanted[from] = m->get_caps(); + else + in->mds_caps_wanted.erase(from); + + try_eval(in, CEPH_CAP_LOCKS); + m->put(); +} + + +class C_MDL_CheckMaxSize : public LockerContext { + CInode *in; + uint64_t new_max_size; + uint64_t newsize; + utime_t mtime; + +public: + C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size, + uint64_t _newsize, utime_t _mtime) : + LockerContext(l), in(i), + new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime) + { + in->get(CInode::PIN_PTRWAITER); + } + void finish(int r) override { + if (in->is_auth()) + locker->check_inode_max_size(in, false, new_max_size, newsize, mtime); + in->put(CInode::PIN_PTRWAITER); + } +}; + +uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size) +{ + uint64_t new_max = (size + 1) << 1; + uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs; + if (max_inc > 0) { + max_inc *= pi->get_layout_size_increment(); + new_max = MIN(new_max, size + max_inc); + } + return ROUND_UP_TO(new_max, pi->get_layout_size_increment()); +} + +void Locker::calc_new_client_ranges(CInode *in, uint64_t size, + map *new_ranges, + bool *max_increased) +{ + inode_t *latest = in->get_projected_inode(); + uint64_t ms; + if(latest->has_layout()) { + ms = calc_new_max_size(latest, size); + } else { + // Layout-less directories like ~mds0/, have zero size + ms = 0; + } + + // increase ranges as appropriate. + // shrink to 0 if no WR|BUFFER caps issued. + for (map::iterator p = in->client_caps.begin(); + p != in->client_caps.end(); + ++p) { + if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) { + client_writeable_range_t& nr = (*new_ranges)[p->first]; + nr.range.first = 0; + if (latest->client_ranges.count(p->first)) { + client_writeable_range_t& oldr = latest->client_ranges[p->first]; + if (ms > oldr.range.last) + *max_increased = true; + nr.range.last = MAX(ms, oldr.range.last); + nr.follows = oldr.follows; + } else { + *max_increased = true; + nr.range.last = ms; + nr.follows = in->first - 1; + } + } + } +} + +bool Locker::check_inode_max_size(CInode *in, bool force_wrlock, + uint64_t new_max_size, uint64_t new_size, + utime_t new_mtime) +{ + assert(in->is_auth()); + assert(in->is_file()); + + inode_t *latest = in->get_projected_inode(); + map new_ranges; + uint64_t size = latest->size; + bool update_size = new_size > 0; + bool update_max = false; + bool max_increased = false; + + if (update_size) { + new_size = size = MAX(size, new_size); + new_mtime = MAX(new_mtime, latest->mtime); + if (latest->size == new_size && latest->mtime == new_mtime) + update_size = false; + } + + calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased); + + if (max_increased || latest->client_ranges != new_ranges) + update_max = true; + + if (!update_size && !update_max) { + dout(20) << "check_inode_max_size no-op on " << *in << dendl; + return false; + } + + dout(10) << "check_inode_max_size new_ranges " << new_ranges + << " update_size " << update_size + << " on " << *in << dendl; + + if (in->is_frozen()) { + dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl; + C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in, + new_max_size, + new_size, + new_mtime); + in->add_waiter(CInode::WAIT_UNFREEZE, cms); + return false; + } + if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) { + // lock? + if (in->filelock.is_stable()) { + if (in->get_target_loner() >= 0) + file_excl(&in->filelock); + else + simple_lock(&in->filelock); + } + if (!in->filelock.can_wrlock(in->get_loner())) { + // try again later + C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in, + new_max_size, + new_size, + new_mtime); + + in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms); + dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl; + return false; + } + } + + MutationRef mut(new MutationImpl()); + mut->ls = mds->mdlog->get_current_segment(); + + inode_t *pi = in->project_inode(); + pi->version = in->pre_dirty(); + + if (update_max) { + dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl; + pi->client_ranges = new_ranges; + } + + if (update_size) { + dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl; + pi->size = new_size; + pi->rstat.rbytes = new_size; + dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl; + pi->mtime = new_mtime; + } + + // use EOpen if the file is still open; otherwise, use EUpdate. + // this is just an optimization to push open files forward into + // newer log segments. + LogEvent *le; + EMetaBlob *metablob; + if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) { + EOpen *eo = new EOpen(mds->mdlog); + eo->add_ino(in->ino()); + metablob = &eo->metablob; + le = eo; + mut->ls->open_files.push_back(&in->item_open_file); + } else { + EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size"); + metablob = &eu->metablob; + le = eu; + } + mds->mdlog->start_entry(le); + if (update_size) { // FIXME if/when we do max_size nested accounting + mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY); + // no cow, here! + CDentry *parent = in->get_projected_parent_dn(); + metablob->add_primary_dentry(parent, in, true); + } else { + metablob->add_dir_context(in->get_projected_parent_dn()->get_dir()); + mdcache->journal_dirty_inode(mut.get(), metablob, in); + } + mds->mdlog->submit_entry(le, + new C_Locker_FileUpdate_finish(this, in, mut, true)); + wrlock_force(&in->filelock, mut); // wrlock for duration of journal + mut->auth_pin(in); + + // make max_size _increase_ timely + if (max_increased) + mds->mdlog->flush(); + + return true; +} + + +void Locker::share_inode_max_size(CInode *in, Capability *only_cap) +{ + /* + * only share if currently issued a WR cap. if client doesn't have it, + * file_max doesn't matter, and the client will get it if/when they get + * the cap later. + */ + dout(10) << "share_inode_max_size on " << *in << dendl; + map::iterator it; + if (only_cap) + it = in->client_caps.find(only_cap->get_client()); + else + it = in->client_caps.begin(); + for (; it != in->client_caps.end(); ++it) { + const client_t client = it->first; + Capability *cap = it->second; + if (cap->is_suppress()) + continue; + if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) { + dout(10) << "share_inode_max_size with client." << client << dendl; + cap->inc_last_seq(); + MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT, + in->ino(), + in->find_snaprealm()->inode->ino(), + cap->get_cap_id(), cap->get_last_seq(), + cap->pending(), cap->wanted(), 0, + cap->get_mseq(), + mds->get_osd_epoch_barrier()); + in->encode_cap_message(m, cap); + mds->send_message_client_counted(m, client); + } + if (only_cap) + break; + } +} + +bool Locker::_need_flush_mdlog(CInode *in, int wanted) +{ + /* flush log if caps are wanted by client but corresponding lock is unstable and locked by + * pending mutations. */ + if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) && + in->filelock.is_unstable_and_locked()) || + ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) && + in->authlock.is_unstable_and_locked()) || + ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) && + in->linklock.is_unstable_and_locked()) || + ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) && + in->xattrlock.is_unstable_and_locked())) + return true; + return false; +} + +void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq) +{ + if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) { + dout(10) << " wanted " << ccap_string(cap->wanted()) + << " -> " << ccap_string(wanted) << dendl; + cap->set_wanted(wanted); + } else if (wanted & ~cap->wanted()) { + dout(10) << " wanted " << ccap_string(cap->wanted()) + << " -> " << ccap_string(wanted) + << " (added caps even though we had seq mismatch!)" << dendl; + cap->set_wanted(wanted | cap->wanted()); + } else { + dout(10) << " NOT changing wanted " << ccap_string(cap->wanted()) + << " -> " << ccap_string(wanted) + << " (issue_seq " << issue_seq << " != last_issue " + << cap->get_last_issue() << ")" << dendl; + return; + } + + CInode *cur = cap->get_inode(); + if (!cur->is_auth()) { + request_inode_file_caps(cur); + return; + } + + if (cap->wanted() == 0) { + if (cur->item_open_file.is_on_list() && + !cur->is_any_caps_wanted()) { + dout(10) << " removing unwanted file from open file list " << *cur << dendl; + cur->item_open_file.remove_myself(); + } + } else { + if (cur->state_test(CInode::STATE_RECOVERING) && + (cap->wanted() & (CEPH_CAP_FILE_RD | + CEPH_CAP_FILE_WR))) { + mds->mdcache->recovery_queue.prioritize(cur); + } + + if (!cur->item_open_file.is_on_list()) { + dout(10) << " adding to open file list " << *cur << dendl; + assert(cur->last == CEPH_NOSNAP); + LogSegment *ls = mds->mdlog->get_current_segment(); + EOpen *le = new EOpen(mds->mdlog); + mds->mdlog->start_entry(le); + le->add_clean_inode(cur); + ls->open_files.push_back(&cur->item_open_file); + mds->mdlog->submit_entry(le); + } + } +} + + + +void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last) +{ + dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl; + for (auto p = head_in->client_need_snapflush.begin(); + p != head_in->client_need_snapflush.end() && p->first < last; ) { + snapid_t snapid = p->first; + set& clients = p->second; + ++p; // be careful, q loop below depends on this + + if (clients.count(client)) { + dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl; + CInode *sin = mdcache->get_inode(head_in->ino(), snapid); + if (!sin) { + // hrm, look forward until we find the inode. + // (we can only look it up by the last snapid it is valid for) + dout(10) << " didn't have " << head_in->ino() << " snapid " << snapid << dendl; + for (compact_map >::iterator q = p; // p is already at next entry + q != head_in->client_need_snapflush.end(); + ++q) { + dout(10) << " trying snapid " << q->first << dendl; + sin = mdcache->get_inode(head_in->ino(), q->first); + if (sin) { + assert(sin->first <= snapid); + break; + } + dout(10) << " didn't have " << head_in->ino() << " snapid " << q->first << dendl; + } + if (!sin && head_in->is_multiversion()) + sin = head_in; + assert(sin); + } + _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL); + head_in->remove_need_snapflush(sin, snapid, client); + } + } +} + + +bool Locker::should_defer_client_cap_frozen(CInode *in) +{ + /* + * This policy needs to be AT LEAST as permissive as allowing a client request + * to go forward, or else a client request can release something, the release + * gets deferred, but the request gets processed and deadlocks because when the + * caps can't get revoked. + * + * Currently, a request wait if anything locked is freezing (can't + * auth_pin), which would avoid any deadlock with cap release. Thus @in + * _MUST_ be in the lock/auth_pin set. + * + * auth_pins==0 implies no unstable lock and not auth pinnned by + * client request, otherwise continue even it's freezing. + */ + return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen(); +} + +/* + * This function DOES put the passed message before returning + */ +void Locker::handle_client_caps(MClientCaps *m) +{ + Session *session = static_cast(m->get_connection()->get_priv()); + client_t client = m->get_source().num(); + + snapid_t follows = m->get_snap_follows(); + dout(7) << "handle_client_caps " + << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async") + << " on " << m->get_ino() + << " tid " << m->get_client_tid() << " follows " << follows + << " op " << ceph_cap_op_name(m->get_op()) << dendl; + + if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) { + if (!session) { + dout(5) << " no session, dropping " << *m << dendl; + m->put(); + return; + } + if (session->is_closed() || + session->is_closing() || + session->is_killing()) { + dout(7) << " session closed|closing|killing, dropping " << *m << dendl; + m->put(); + return; + } + if (mds->is_reconnect() && + m->get_dirty() && m->get_client_tid() > 0 && + !session->have_completed_flush(m->get_client_tid())) { + mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty()); + } + mds->wait_for_replay(new C_MDS_RetryMessage(mds, m)); + return; + } + + if (m->get_client_tid() > 0 && session && + session->have_completed_flush(m->get_client_tid())) { + dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid() + << " for client." << client << dendl; + MClientCaps *ack; + if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) { + ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, + m->get_dirty(), 0, mds->get_osd_epoch_barrier()); + } else { + ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), + m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0, + mds->get_osd_epoch_barrier()); + } + ack->set_snap_follows(follows); + ack->set_client_tid(m->get_client_tid()); + mds->send_message_client_counted(ack, m->get_connection()); + if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) { + m->put(); + return; + } else { + // fall-thru because the message may release some caps + m->clear_dirty(); + m->set_op(CEPH_CAP_OP_UPDATE); + } + } + + // "oldest flush tid" > 0 means client uses unique TID for each flush + if (m->get_oldest_flush_tid() > 0 && session) { + if (session->trim_completed_flushes(m->get_oldest_flush_tid())) { + mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name); + + if (session->get_num_trim_flushes_warnings() > 0 && + session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes) + session->reset_num_trim_flushes_warnings(); + } else { + if (session->get_num_completed_flushes() >= + (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) { + session->inc_num_trim_flushes_warnings(); + stringstream ss; + ss << "client." << session->get_client() << " does not advance its oldest_flush_tid (" + << m->get_oldest_flush_tid() << "), " + << session->get_num_completed_flushes() + << " completed flushes recorded in session"; + mds->clog->warn() << ss.str(); + dout(20) << __func__ << " " << ss.str() << dendl; + } + } + } + + CInode *head_in = mdcache->get_inode(m->get_ino()); + if (!head_in) { + if (mds->is_clientreplay()) { + dout(7) << "handle_client_caps on unknown ino " << m->get_ino() + << ", will try again after replayed client requests" << dendl; + mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m)); + return; + } + dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl; + m->put(); + return; + } + + if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) { + // Pause RADOS operations until we see the required epoch + mds->objecter->set_epoch_barrier(m->osd_epoch_barrier); + } + + if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) { + // Record the barrier so that we will retransmit it to clients + mds->set_osd_epoch_barrier(m->osd_epoch_barrier); + } + + CInode *in = head_in; + if (follows > 0) { + in = mdcache->pick_inode_snap(head_in, follows); + if (in != head_in) + dout(10) << " head inode " << *head_in << dendl; + } + dout(10) << " cap inode " << *in << dendl; + + Capability *cap = 0; + cap = in->get_client_cap(client); + if (!cap && in != head_in) + cap = head_in->get_client_cap(client); + if (!cap) { + dout(7) << "handle_client_caps no cap for client." << client << " on " << *in << dendl; + m->put(); + return; + } + assert(cap); + + // freezing|frozen? + if (should_defer_client_cap_frozen(in)) { + dout(7) << "handle_client_caps freezing|frozen on " << *in << dendl; + in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m)); + return; + } + if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) { + dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq() + << ", dropping" << dendl; + m->put(); + return; + } + + int op = m->get_op(); + + // flushsnap? + if (op == CEPH_CAP_OP_FLUSHSNAP) { + if (!in->is_auth()) { + dout(7) << " not auth, ignoring flushsnap on " << *in << dendl; + goto out; + } + + SnapRealm *realm = in->find_snaprealm(); + snapid_t snap = realm->get_snap_following(follows); + dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl; + + // we can prepare the ack now, since this FLUSHEDSNAP is independent of any + // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst + // case we get a dup response, so whatever.) + MClientCaps *ack = 0; + if (m->get_dirty()) { + ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier()); + ack->set_snap_follows(follows); + ack->set_client_tid(m->get_client_tid()); + ack->set_oldest_flush_tid(m->get_oldest_flush_tid()); + } + + if (in == head_in || + (head_in->client_need_snapflush.count(snap) && + head_in->client_need_snapflush[snap].count(client))) { + dout(7) << " flushsnap snap " << snap + << " client." << client << " on " << *in << dendl; + + // this cap now follows a later snap (i.e. the one initiating this flush, or later) + if (in == head_in) + cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq(); + else if (head_in->client_need_snapflush.begin()->first < snap) + _do_null_snapflush(head_in, client, snap); + + _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack); + + if (in != head_in) + head_in->remove_need_snapflush(in, snap, client); + + } else { + dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl; + if (ack) + mds->send_message_client_counted(ack, m->get_connection()); + } + goto out; + } + + if (cap->get_cap_id() != m->get_cap_id()) { + dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl; + } else { + // intermediate snap inodes + while (in != head_in) { + assert(in->last != CEPH_NOSNAP); + if (in->is_auth() && m->get_dirty()) { + dout(10) << " updating intermediate snapped inode " << *in << dendl; + _do_cap_update(in, NULL, m->get_dirty(), follows, m); + } + in = mdcache->pick_inode_snap(head_in, in->last); + } + + // head inode, and cap + MClientCaps *ack = 0; + + int caps = m->get_caps(); + if (caps & ~cap->issued()) { + dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl; + caps &= cap->issued(); + } + + cap->confirm_receipt(m->get_seq(), caps); + dout(10) << " follows " << follows + << " retains " << ccap_string(m->get_caps()) + << " dirty " << ccap_string(m->get_dirty()) + << " on " << *in << dendl; + + + // missing/skipped snapflush? + // The client MAY send a snapflush if it is issued WR/EXCL caps, but + // presently only does so when it has actual dirty metadata. But, we + // set up the need_snapflush stuff based on the issued caps. + // We can infer that the client WONT send a FLUSHSNAP once they have + // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap + // update/release). + if (!head_in->client_need_snapflush.empty()) { + if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) { + _do_null_snapflush(head_in, client); + } else { + dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl; + } + } + + if (m->get_dirty() && in->is_auth()) { + dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty()) + << " seq " << m->get_seq() << " on " << *in << dendl; + ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(), + m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier()); + ack->set_client_tid(m->get_client_tid()); + ack->set_oldest_flush_tid(m->get_oldest_flush_tid()); + } + + // filter wanted based on what we could ever give out (given auth/replica status) + bool need_flush = m->flags & CLIENT_CAPS_SYNC; + int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever(); + if (new_wanted != cap->wanted()) { + if (!need_flush && (new_wanted & ~cap->pending())) { + // exapnding caps. make sure we aren't waiting for a log flush + need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending()); + } + + adjust_cap_wanted(cap, new_wanted, m->get_issue_seq()); + } + + if (in->is_auth() && + _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) { + // updated + eval(in, CEPH_CAP_LOCKS); + + if (!need_flush && (cap->wanted() & ~cap->pending())) + need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending()); + } else { + // no update, ack now. + if (ack) + mds->send_message_client_counted(ack, m->get_connection()); + + bool did_issue = eval(in, CEPH_CAP_LOCKS); + if (!did_issue && (cap->wanted() & ~cap->pending())) + issue_caps(in, cap); + + if (cap->get_last_seq() == 0 && + (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) { + cap->issue_norevoke(cap->issued()); + share_inode_max_size(in, cap); + } + } + + if (need_flush) + mds->mdlog->flush(); + } + + out: + m->put(); +} + + +class C_Locker_RetryRequestCapRelease : public LockerContext { + client_t client; + ceph_mds_request_release item; +public: + C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) : + LockerContext(l), client(c), item(it) { } + void finish(int r) override { + string dname; + MDRequestRef null_ref; + locker->process_request_cap_release(null_ref, client, item, dname); + } +}; + +void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item, + const string &dname) +{ + inodeno_t ino = (uint64_t)item.ino; + uint64_t cap_id = item.cap_id; + int caps = item.caps; + int wanted = item.wanted; + int seq = item.seq; + int issue_seq = item.issue_seq; + int mseq = item.mseq; + + CInode *in = mdcache->get_inode(ino); + if (!in) + return; + + if (dname.length()) { + frag_t fg = in->pick_dirfrag(dname); + CDir *dir = in->get_dirfrag(fg); + if (dir) { + CDentry *dn = dir->lookup(dname); + if (dn) { + ClientLease *l = dn->get_client_lease(client); + if (l) { + dout(10) << "process_cap_release removing lease on " << *dn << dendl; + dn->remove_client_lease(l, this); + } else { + dout(7) << "process_cap_release client." << client + << " doesn't have lease on " << *dn << dendl; + } + } else { + dout(7) << "process_cap_release client." << client << " released lease on dn " + << dir->dirfrag() << "/" << dname << " which dne" << dendl; + } + } + } + + Capability *cap = in->get_client_cap(client); + if (!cap) + return; + + dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in + << (mdr ? "" : " (DEFERRED, no mdr)") + << dendl; + + if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) { + dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl; + return; + } + + if (cap->get_cap_id() != cap_id) { + dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl; + return; + } + + if (should_defer_client_cap_frozen(in)) { + dout(7) << " frozen, deferring" << dendl; + in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item)); + return; + } + + if (caps & ~cap->issued()) { + dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl; + caps &= cap->issued(); + } + cap->confirm_receipt(seq, caps); + + if (!in->client_need_snapflush.empty() && + (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) { + _do_null_snapflush(in, client); + } + + adjust_cap_wanted(cap, wanted, issue_seq); + + if (mdr) + cap->inc_suppress(); + eval(in, CEPH_CAP_LOCKS); + if (mdr) + cap->dec_suppress(); + + // take note; we may need to reissue on this cap later + if (mdr) + mdr->cap_releases[in->vino()] = cap->get_last_seq(); +} + +class C_Locker_RetryKickIssueCaps : public LockerContext { + CInode *in; + client_t client; + ceph_seq_t seq; +public: + C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) : + LockerContext(l), in(i), client(c), seq(s) { + in->get(CInode::PIN_PTRWAITER); + } + void finish(int r) override { + locker->kick_issue_caps(in, client, seq); + in->put(CInode::PIN_PTRWAITER); + } +}; + +void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq) +{ + Capability *cap = in->get_client_cap(client); + if (!cap || cap->get_last_sent() != seq) + return; + if (in->is_frozen()) { + dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl; + in->add_waiter(CInode::WAIT_UNFREEZE, + new C_Locker_RetryKickIssueCaps(this, in, client, seq)); + return; + } + dout(10) << "kick_issue_caps released at current seq " << seq + << ", reissuing" << dendl; + issue_caps(in, cap); +} + +void Locker::kick_cap_releases(MDRequestRef& mdr) +{ + client_t client = mdr->get_client(); + for (map::iterator p = mdr->cap_releases.begin(); + p != mdr->cap_releases.end(); + ++p) { + CInode *in = mdcache->get_inode(p->first); + if (!in) + continue; + kick_issue_caps(in, client, p->second); + } +} + +/** + * m and ack might be NULL, so don't dereference them unless dirty != 0 + */ +void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack) +{ + dout(10) << "_do_snap_update dirty " << ccap_string(dirty) + << " follows " << follows << " snap " << snap + << " on " << *in << dendl; + + if (snap == CEPH_NOSNAP) { + // hmm, i guess snap was already deleted? just ack! + dout(10) << " wow, the snap following " << follows + << " was already deleted. nothing to record, just ack." << dendl; + if (ack) + mds->send_message_client_counted(ack, m->get_connection()); + return; + } + + EUpdate *le = new EUpdate(mds->mdlog, "snap flush"); + mds->mdlog->start_entry(le); + MutationRef mut = new MutationImpl(); + mut->ls = mds->mdlog->get_current_segment(); + + // normal metadata updates that we can apply to the head as well. + + // update xattrs? + bool xattrs = false; + map *px = 0; + if ((dirty & CEPH_CAP_XATTR_EXCL) && + m->xattrbl.length() && + m->head.xattr_version > in->get_projected_inode()->xattr_version) + xattrs = true; + + old_inode_t *oi = 0; + if (in->is_multiversion()) { + oi = in->pick_old_inode(snap); + } + + inode_t *pi; + if (oi) { + dout(10) << " writing into old inode" << dendl; + pi = in->project_inode(); + pi->version = in->pre_dirty(); + if (snap > oi->first) + in->split_old_inode(snap); + pi = &oi->inode; + if (xattrs) + px = &oi->xattrs; + } else { + if (xattrs) + px = new map; + pi = in->project_inode(px); + pi->version = in->pre_dirty(); + } + + _update_cap_fields(in, dirty, m, pi); + + // xattr + if (px) { + dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version + << " len " << m->xattrbl.length() << dendl; + pi->xattr_version = m->head.xattr_version; + bufferlist::iterator p = m->xattrbl.begin(); + ::decode(*px, p); + } + + if (pi->client_ranges.count(client)) { + if (in->last == snap) { + dout(10) << " removing client_range entirely" << dendl; + pi->client_ranges.erase(client); + } else { + dout(10) << " client_range now follows " << snap << dendl; + pi->client_ranges[client].follows = snap; + } + } + + mut->auth_pin(in); + mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows); + mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows); + + // "oldest flush tid" > 0 means client uses unique TID for each flush + if (ack && ack->get_oldest_flush_tid() > 0) + le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()), + ack->get_oldest_flush_tid()); + + mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false, + client, ack)); +} + +void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi) +{ + if (dirty == 0) + return; + + /* m must be valid if there are dirty caps */ + assert(m); + uint64_t features = m->get_connection()->get_features(); + + if (m->get_ctime() > pi->ctime) { + dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime() + << " for " << *in << dendl; + pi->ctime = m->get_ctime(); + } + + if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) && + m->get_change_attr() > pi->change_attr) { + dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr() + << " for " << *in << dendl; + pi->change_attr = m->get_change_attr(); + } + + // file + if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { + utime_t atime = m->get_atime(); + utime_t mtime = m->get_mtime(); + uint64_t size = m->get_size(); + version_t inline_version = m->inline_version; + + if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) || + ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) { + dout(7) << " mtime " << pi->mtime << " -> " << mtime + << " for " << *in << dendl; + pi->mtime = mtime; + } + if (in->inode.is_file() && // ONLY if regular file + size > pi->size) { + dout(7) << " size " << pi->size << " -> " << size + << " for " << *in << dendl; + pi->size = size; + pi->rstat.rbytes = size; + } + if (in->inode.is_file() && + (dirty & CEPH_CAP_FILE_WR) && + inline_version > pi->inline_data.version) { + pi->inline_data.version = inline_version; + if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0) + pi->inline_data.get_data() = m->inline_data; + else + pi->inline_data.free_data(); + } + if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) { + dout(7) << " atime " << pi->atime << " -> " << atime + << " for " << *in << dendl; + pi->atime = atime; + } + if ((dirty & CEPH_CAP_FILE_EXCL) && + ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) { + dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq() + << " for " << *in << dendl; + pi->time_warp_seq = m->get_time_warp_seq(); + } + } + // auth + if (dirty & CEPH_CAP_AUTH_EXCL) { + if (m->head.uid != pi->uid) { + dout(7) << " uid " << pi->uid + << " -> " << m->head.uid + << " for " << *in << dendl; + pi->uid = m->head.uid; + } + if (m->head.gid != pi->gid) { + dout(7) << " gid " << pi->gid + << " -> " << m->head.gid + << " for " << *in << dendl; + pi->gid = m->head.gid; + } + if (m->head.mode != pi->mode) { + dout(7) << " mode " << oct << pi->mode + << " -> " << m->head.mode << dec + << " for " << *in << dendl; + pi->mode = m->head.mode; + } + if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) { + dout(7) << " btime " << oct << pi->btime + << " -> " << m->get_btime() << dec + << " for " << *in << dendl; + pi->btime = m->get_btime(); + } + } +} + +/* + * update inode based on cap flush|flushsnap|wanted. + * adjust max_size, if needed. + * if we update, return true; otherwise, false (no updated needed). + */ +bool Locker::_do_cap_update(CInode *in, Capability *cap, + int dirty, snapid_t follows, + MClientCaps *m, MClientCaps *ack, + bool *need_flush) +{ + dout(10) << "_do_cap_update dirty " << ccap_string(dirty) + << " issued " << ccap_string(cap ? cap->issued() : 0) + << " wanted " << ccap_string(cap ? cap->wanted() : 0) + << " on " << *in << dendl; + assert(in->is_auth()); + client_t client = m->get_source().num(); + inode_t *latest = in->get_projected_inode(); + + // increase or zero max_size? + uint64_t size = m->get_size(); + bool change_max = false; + uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0; + uint64_t new_max = old_max; + + if (in->is_file()) { + bool forced_change_max = false; + dout(20) << "inode is file" << dendl; + if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) { + dout(20) << "client has write caps; m->get_max_size=" + << m->get_max_size() << "; old_max=" << old_max << dendl; + if (m->get_max_size() > new_max) { + dout(10) << "client requests file_max " << m->get_max_size() + << " > max " << old_max << dendl; + change_max = true; + forced_change_max = true; + new_max = calc_new_max_size(latest, m->get_max_size()); + } else { + new_max = calc_new_max_size(latest, size); + + if (new_max > old_max) + change_max = true; + else + new_max = old_max; + } + } else { + if (old_max) { + change_max = true; + new_max = 0; + } + } + + if (in->last == CEPH_NOSNAP && + change_max && + !in->filelock.can_wrlock(client) && + !in->filelock.can_force_wrlock(client)) { + dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl; + if (in->filelock.is_stable()) { + bool need_issue = false; + if (cap) + cap->inc_suppress(); + if (in->mds_caps_wanted.empty() && + (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) { + if (in->filelock.get_state() != LOCK_EXCL) + file_excl(&in->filelock, &need_issue); + } else + simple_lock(&in->filelock, &need_issue); + if (need_issue) + issue_caps(in); + if (cap) + cap->dec_suppress(); + } + if (!in->filelock.can_wrlock(client) && + !in->filelock.can_force_wrlock(client)) { + C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in, + forced_change_max ? new_max : 0, + 0, utime_t()); + + in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms); + change_max = false; + } + } + } + + if (m->flockbl.length()) { + int32_t num_locks; + bufferlist::iterator bli = m->flockbl.begin(); + ::decode(num_locks, bli); + for ( int i=0; i < num_locks; ++i) { + ceph_filelock decoded_lock; + ::decode(decoded_lock, bli); + in->get_fcntl_lock_state()->held_locks. + insert(pair(decoded_lock.start, decoded_lock)); + ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)]; + } + ::decode(num_locks, bli); + for ( int i=0; i < num_locks; ++i) { + ceph_filelock decoded_lock; + ::decode(decoded_lock, bli); + in->get_flock_lock_state()->held_locks. + insert(pair(decoded_lock.start, decoded_lock)); + ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)]; + } + } + + if (!dirty && !change_max) + return false; + + Session *session = static_cast(m->get_connection()->get_priv()); + if (session->check_access(in, MAY_WRITE, + m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) { + session->put(); + dout(10) << "check_access failed, dropping cap update on " << *in << dendl; + return false; + } + session->put(); + + // do the update. + EUpdate *le = new EUpdate(mds->mdlog, "cap update"); + mds->mdlog->start_entry(le); + + // xattrs update? + map *px = 0; + if ((dirty & CEPH_CAP_XATTR_EXCL) && + m->xattrbl.length() && + m->head.xattr_version > in->get_projected_inode()->xattr_version) + px = new map; + + inode_t *pi = in->project_inode(px); + pi->version = in->pre_dirty(); + + MutationRef mut(new MutationImpl()); + mut->ls = mds->mdlog->get_current_segment(); + + _update_cap_fields(in, dirty, m, pi); + + if (change_max) { + dout(7) << " max_size " << old_max << " -> " << new_max + << " for " << *in << dendl; + if (new_max) { + pi->client_ranges[client].range.first = 0; + pi->client_ranges[client].range.last = new_max; + pi->client_ranges[client].follows = in->first - 1; + } else + pi->client_ranges.erase(client); + } + + if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) + wrlock_force(&in->filelock, mut); // wrlock for duration of journal + + // auth + if (dirty & CEPH_CAP_AUTH_EXCL) + wrlock_force(&in->authlock, mut); + + // xattr + if (px) { + dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl; + pi->xattr_version = m->head.xattr_version; + bufferlist::iterator p = m->xattrbl.begin(); + ::decode(*px, p); + + wrlock_force(&in->xattrlock, mut); + } + + mut->auth_pin(in); + mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows); + mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows); + + // "oldest flush tid" > 0 means client uses unique TID for each flush + if (ack && ack->get_oldest_flush_tid() > 0) + le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()), + ack->get_oldest_flush_tid()); + + mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, + change_max, !!cap, + client, ack)); + if (need_flush && !*need_flush && + ((change_max && new_max) || // max INCREASE + _need_flush_mdlog(in, dirty))) + *need_flush = true; + + return true; +} + +/* This function DOES put the passed message before returning */ +void Locker::handle_client_cap_release(MClientCapRelease *m) +{ + client_t client = m->get_source().num(); + dout(10) << "handle_client_cap_release " << *m << dendl; + + if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) { + mds->wait_for_replay(new C_MDS_RetryMessage(mds, m)); + return; + } + + if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) { + // Pause RADOS operations until we see the required epoch + mds->objecter->set_epoch_barrier(m->osd_epoch_barrier); + } + + if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) { + // Record the barrier so that we will retransmit it to clients + mds->set_osd_epoch_barrier(m->osd_epoch_barrier); + } + + Session *session = static_cast(m->get_connection()->get_priv()); + + for (vector::iterator p = m->caps.begin(); p != m->caps.end(); ++p) { + _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq); + } + + if (session) { + session->notify_cap_release(m->caps.size()); + } + + m->put(); +} + +class C_Locker_RetryCapRelease : public LockerContext { + client_t client; + inodeno_t ino; + uint64_t cap_id; + ceph_seq_t migrate_seq; + ceph_seq_t issue_seq; +public: + C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id, + ceph_seq_t mseq, ceph_seq_t seq) : + LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {} + void finish(int r) override { + locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq); + } +}; + +void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id, + ceph_seq_t mseq, ceph_seq_t seq) +{ + CInode *in = mdcache->get_inode(ino); + if (!in) { + dout(7) << "_do_cap_release missing ino " << ino << dendl; + return; + } + Capability *cap = in->get_client_cap(client); + if (!cap) { + dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl; + return; + } + + dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl; + if (cap->get_cap_id() != cap_id) { + dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl; + return; + } + if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) { + dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl; + return; + } + if (should_defer_client_cap_frozen(in)) { + dout(7) << " freezing|frozen, deferring" << dendl; + in->add_waiter(CInode::WAIT_UNFREEZE, + new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq)); + return; + } + if (seq != cap->get_last_issue()) { + dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl; + // clean out any old revoke history + cap->clean_revoke_from(seq); + eval_cap_gather(in); + return; + } + remove_client_cap(in, client); +} + +/* This function DOES put the passed message before returning */ + +void Locker::remove_client_cap(CInode *in, client_t client) +{ + // clean out any pending snapflush state + if (!in->client_need_snapflush.empty()) + _do_null_snapflush(in, client); + + in->remove_client_cap(client); + + if (in->is_auth()) { + // make sure we clear out the client byte range + if (in->get_projected_inode()->client_ranges.count(client) && + !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray + check_inode_max_size(in); + } else { + request_inode_file_caps(in); + } + + try_eval(in, CEPH_CAP_LOCKS); +} + + +/** + * Return true if any currently revoking caps exceed the + * mds_revoke_cap_timeout threshold. + */ +bool Locker::any_late_revoking_caps(xlist const &revoking) const +{ + xlist::const_iterator p = revoking.begin(); + if (p.end()) { + // No revoking caps at the moment + return false; + } else { + utime_t now = ceph_clock_now(); + utime_t age = now - (*p)->get_last_revoke_stamp(); + if (age <= g_conf->mds_revoke_cap_timeout) { + return false; + } else { + return true; + } + } +} + + +void Locker::get_late_revoking_clients(std::list *result) const +{ + if (!any_late_revoking_caps(revoking_caps)) { + // Fast path: no misbehaving clients, execute in O(1) + return; + } + + // Slow path: execute in O(N_clients) + std::map >::const_iterator client_rc_iter; + for (client_rc_iter = revoking_caps_by_client.begin(); + client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) { + xlist const &client_rc = client_rc_iter->second; + bool any_late = any_late_revoking_caps(client_rc); + if (any_late) { + result->push_back(client_rc_iter->first); + } + } +} + +// Hard-code instead of surfacing a config settings because this is +// really a hack that should go away at some point when we have better +// inspection tools for getting at detailed cap state (#7316) +#define MAX_WARN_CAPS 100 + +void Locker::caps_tick() +{ + utime_t now = ceph_clock_now(); + + dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl; + + int i = 0; + for (xlist::iterator p = revoking_caps.begin(); !p.end(); ++p) { + Capability *cap = *p; + + utime_t age = now - cap->get_last_revoke_stamp(); + dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl; + if (age <= g_conf->mds_revoke_cap_timeout) { + dout(20) << __func__ << " age below timeout " << g_conf->mds_revoke_cap_timeout << dendl; + break; + } else { + ++i; + if (i > MAX_WARN_CAPS) { + dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late" + << "revoking, ignoring subsequent caps" << dendl; + break; + } + } + // exponential backoff of warning intervals + if (age > g_conf->mds_revoke_cap_timeout * (1 << cap->get_num_revoke_warnings())) { + cap->inc_num_revoke_warnings(); + stringstream ss; + ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino " + << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending()) + << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago"; + mds->clog->warn() << ss.str(); + dout(20) << __func__ << " " << ss.str() << dendl; + } else { + dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl; + } + } +} + + +void Locker::handle_client_lease(MClientLease *m) +{ + dout(10) << "handle_client_lease " << *m << dendl; + + assert(m->get_source().is_client()); + client_t client = m->get_source().num(); + + CInode *in = mdcache->get_inode(m->get_ino(), m->get_last()); + if (!in) { + dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl; + m->put(); + return; + } + CDentry *dn = 0; + + frag_t fg = in->pick_dirfrag(m->dname); + CDir *dir = in->get_dirfrag(fg); + if (dir) + dn = dir->lookup(m->dname); + if (!dn) { + dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl; + m->put(); + return; + } + dout(10) << " on " << *dn << dendl; + + // replica and lock + ClientLease *l = dn->get_client_lease(client); + if (!l) { + dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl; + m->put(); + return; + } + + switch (m->get_action()) { + case CEPH_MDS_LEASE_REVOKE_ACK: + case CEPH_MDS_LEASE_RELEASE: + if (l->seq != m->get_seq()) { + dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl; + } else { + dout(7) << "handle_client_lease client." << client + << " on " << *dn << dendl; + dn->remove_client_lease(l, this); + } + m->put(); + break; + + case CEPH_MDS_LEASE_RENEW: + { + dout(7) << "handle_client_lease client." << client << " renew on " << *dn + << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl; + if (dn->lock.can_lease(client)) { + int pool = 1; // fixme.. do something smart! + m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]); + m->h.seq = ++l->seq; + m->clear_payload(); + + utime_t now = ceph_clock_now(); + now += mdcache->client_lease_durations[pool]; + mdcache->touch_client_lease(l, pool, now); + + mds->send_message_client_counted(m, m->get_connection()); + } + } + break; + + default: + ceph_abort(); // implement me + break; + } +} + + +void Locker::issue_client_lease(CDentry *dn, client_t client, + bufferlist &bl, utime_t now, Session *session) +{ + CInode *diri = dn->get_dir()->get_inode(); + if (!diri->is_stray() && // do not issue dn leases in stray dir! + ((!diri->filelock.can_lease(client) && + (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) && + dn->lock.can_lease(client)) { + int pool = 1; // fixme.. do something smart! + // issue a dentry lease + ClientLease *l = dn->add_client_lease(client, session); + session->touch_lease(l); + + now += mdcache->client_lease_durations[pool]; + mdcache->touch_client_lease(l, pool, now); + + LeaseStat e; + e.mask = 1 | CEPH_LOCK_DN; // old and new bit values + e.seq = ++l->seq; + e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]); + ::encode(e, bl); + dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms " + << " on " << *dn << dendl; + } else { + // null lease + LeaseStat e; + e.mask = 0; + e.seq = 0; + e.duration_ms = 0; + ::encode(e, bl); + dout(20) << "issue_client_lease no/null lease on " << *dn << dendl; + } +} + + +void Locker::revoke_client_leases(SimpleLock *lock) +{ + int n = 0; + CDentry *dn = static_cast(lock->get_parent()); + for (map::iterator p = dn->client_lease_map.begin(); + p != dn->client_lease_map.end(); + ++p) { + ClientLease *l = p->second; + + n++; + assert(lock->get_type() == CEPH_LOCK_DN); + + CDentry *dn = static_cast(lock->get_parent()); + int mask = 1 | CEPH_LOCK_DN; // old and new bits + + // i should also revoke the dir ICONTENT lease, if they have it! + CInode *diri = dn->get_dir()->get_inode(); + mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq, + mask, + diri->ino(), + diri->first, CEPH_NOSNAP, + dn->get_name()), + l->client); + } + assert(n == lock->get_num_client_lease()); +} + + + +// locks ---------------------------------------------------------------- + +SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info) +{ + switch (lock_type) { + case CEPH_LOCK_DN: + { + // be careful; info.dirfrag may have incorrect frag; recalculate based on dname. + CInode *diri = mdcache->get_inode(info.dirfrag.ino); + frag_t fg; + CDir *dir = 0; + CDentry *dn = 0; + if (diri) { + fg = diri->pick_dirfrag(info.dname); + dir = diri->get_dirfrag(fg); + if (dir) + dn = dir->lookup(info.dname, info.snapid); + } + if (!dn) { + dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl; + return 0; + } + return &dn->lock; + } + + case CEPH_LOCK_IAUTH: + case CEPH_LOCK_ILINK: + case CEPH_LOCK_IDFT: + case CEPH_LOCK_IFILE: + case CEPH_LOCK_INEST: + case CEPH_LOCK_IXATTR: + case CEPH_LOCK_ISNAP: + case CEPH_LOCK_IFLOCK: + case CEPH_LOCK_IPOLICY: + { + CInode *in = mdcache->get_inode(info.ino, info.snapid); + if (!in) { + dout(7) << "get_lock don't have ino " << info.ino << dendl; + return 0; + } + switch (lock_type) { + case CEPH_LOCK_IAUTH: return &in->authlock; + case CEPH_LOCK_ILINK: return &in->linklock; + case CEPH_LOCK_IDFT: return &in->dirfragtreelock; + case CEPH_LOCK_IFILE: return &in->filelock; + case CEPH_LOCK_INEST: return &in->nestlock; + case CEPH_LOCK_IXATTR: return &in->xattrlock; + case CEPH_LOCK_ISNAP: return &in->snaplock; + case CEPH_LOCK_IFLOCK: return &in->flocklock; + case CEPH_LOCK_IPOLICY: return &in->policylock; + } + } + + default: + dout(7) << "get_lock don't know lock_type " << lock_type << dendl; + ceph_abort(); + break; + } + + return 0; +} + +/* This function DOES put the passed message before returning */ +void Locker::handle_lock(MLock *m) +{ + // nobody should be talking to us during recovery. + assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping()); + + SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info()); + if (!lock) { + dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl; + m->put(); + return; + } + + switch (lock->get_type()) { + case CEPH_LOCK_DN: + case CEPH_LOCK_IAUTH: + case CEPH_LOCK_ILINK: + case CEPH_LOCK_ISNAP: + case CEPH_LOCK_IXATTR: + case CEPH_LOCK_IFLOCK: + case CEPH_LOCK_IPOLICY: + handle_simple_lock(lock, m); + break; + + case CEPH_LOCK_IDFT: + case CEPH_LOCK_INEST: + //handle_scatter_lock((ScatterLock*)lock, m); + //break; + + case CEPH_LOCK_IFILE: + handle_file_lock(static_cast(lock), m); + break; + + default: + dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl; + ceph_abort(); + break; + } +} + + + + + +// ========================================================================== +// simple lock + +/** This function may take a reference to m if it needs one, but does + * not put references. */ +void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m) +{ + MDSCacheObject *parent = lock->get_parent(); + if (parent->is_auth() && + lock->get_state() != LOCK_SYNC && + !parent->is_frozen()) { + dout(7) << "handle_reqrdlock got rdlock request on " << *lock + << " on " << *parent << dendl; + assert(parent->is_auth()); // replica auth pinned if they're doing this! + if (lock->is_stable()) { + simple_sync(lock); + } else { + dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl; + lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE, + new C_MDS_RetryMessage(mds, m->get())); + } + } else { + dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock + << " on " << *parent << dendl; + // replica should retry + } +} + +/* This function DOES put the passed message before returning */ +void Locker::handle_simple_lock(SimpleLock *lock, MLock *m) +{ + int from = m->get_asker(); + + dout(10) << "handle_simple_lock " << *m + << " on " << *lock << " " << *lock->get_parent() << dendl; + + if (mds->is_rejoin()) { + if (lock->get_parent()->is_rejoining()) { + dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent() + << ", dropping " << *m << dendl; + m->put(); + return; + } + } + + switch (m->get_action()) { + // -- replica -- + case LOCK_AC_SYNC: + assert(lock->get_state() == LOCK_LOCK); + lock->decode_locked_state(m->get_data()); + lock->set_state(LOCK_SYNC); + lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); + break; + + case LOCK_AC_LOCK: + assert(lock->get_state() == LOCK_SYNC); + lock->set_state(LOCK_SYNC_LOCK); + if (lock->is_leased()) + revoke_client_leases(lock); + eval_gather(lock, true); + if (lock->is_unstable_and_locked()) + mds->mdlog->flush(); + break; + + + // -- auth -- + case LOCK_AC_LOCKACK: + assert(lock->get_state() == LOCK_SYNC_LOCK || + lock->get_state() == LOCK_SYNC_EXCL); + assert(lock->is_gathering(from)); + lock->remove_gather(from); + + if (lock->is_gathering()) { + dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from + << ", still gathering " << lock->get_gather_set() << dendl; + } else { + dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from + << ", last one" << dendl; + eval_gather(lock); + } + break; + + case LOCK_AC_REQRDLOCK: + handle_reqrdlock(lock, m); + break; + + } + + m->put(); +} + +/* unused, currently. + +class C_Locker_SimpleEval : public Context { + Locker *locker; + SimpleLock *lock; +public: + C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {} + void finish(int r) { + locker->try_simple_eval(lock); + } +}; + +void Locker::try_simple_eval(SimpleLock *lock) +{ + // unstable and ambiguous auth? + if (!lock->is_stable() && + lock->get_parent()->is_ambiguous_auth()) { + dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl; + //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH)) + lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock)); + return; + } + + if (!lock->get_parent()->is_auth()) { + dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl; + return; + } + + if (!lock->get_parent()->can_auth_pin()) { + dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl; + //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH)) + lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock)); + return; + } + + if (lock->is_stable()) + simple_eval(lock); +} +*/ + + +void Locker::simple_eval(SimpleLock *lock, bool *need_issue) +{ + dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl; + + assert(lock->get_parent()->is_auth()); + assert(lock->is_stable()); + + if (lock->get_parent()->is_freezing_or_frozen()) { + // dentry lock in unreadable state can block path traverse + if ((lock->get_type() != CEPH_LOCK_DN || + lock->get_state() == LOCK_SYNC || + lock->get_parent()->is_frozen())) + return; + } + + if (mdcache->is_readonly()) { + if (lock->get_state() != LOCK_SYNC) { + dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl; + simple_sync(lock, need_issue); + } + return; + } + + CInode *in = 0; + int wanted = 0; + if (lock->get_type() != CEPH_LOCK_DN) { + in = static_cast(lock->get_parent()); + in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift()); + } + + // -> excl? + if (lock->get_state() != LOCK_EXCL && + in && in->get_target_loner() >= 0 && + (wanted & CEPH_CAP_GEXCL)) { + dout(7) << "simple_eval stable, going to excl " << *lock + << " on " << *lock->get_parent() << dendl; + simple_excl(lock, need_issue); + } + + // stable -> sync? + else if (lock->get_state() != LOCK_SYNC && + !lock->is_wrlocked() && + ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) || + (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) { + dout(7) << "simple_eval stable, syncing " << *lock + << " on " << *lock->get_parent() << dendl; + simple_sync(lock, need_issue); + } +} + + +// mid + +bool Locker::simple_sync(SimpleLock *lock, bool *need_issue) +{ + dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl; + assert(lock->get_parent()->is_auth()); + assert(lock->is_stable()); + + CInode *in = 0; + if (lock->get_cap_shift()) + in = static_cast(lock->get_parent()); + + int old_state = lock->get_state(); + + if (old_state != LOCK_TSYN) { + + switch (lock->get_state()) { + case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break; + case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break; + case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break; + case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_wrlocked()) + gather++; + + if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) { + send_lock_message(lock, LOCK_AC_SYNC); + lock->init_gather(); + gather++; + } + + if (in && in->is_head()) { + if (in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + } + + bool need_recover = false; + if (lock->get_type() == CEPH_LOCK_IFILE) { + assert(in); + if (in->state_test(CInode::STATE_NEEDSRECOVER)) { + mds->mdcache->queue_file_recover(in); + need_recover = true; + gather++; + } + } + + if (!gather && lock->is_dirty()) { + lock->get_parent()->auth_pin(lock); + scatter_writebehind(static_cast(lock)); + mds->mdlog->flush(); + return false; + } + + if (gather) { + lock->get_parent()->auth_pin(lock); + if (need_recover) + mds->mdcache->do_file_recover(); + return false; + } + } + + if (lock->get_parent()->is_replicated()) { // FIXME + bufferlist data; + lock->encode_locked_state(data); + send_lock_message(lock, LOCK_AC_SYNC, data); + } + lock->set_state(LOCK_SYNC); + lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); + if (in && in->is_head()) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } + return true; +} + +void Locker::simple_excl(SimpleLock *lock, bool *need_issue) +{ + dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl; + assert(lock->get_parent()->is_auth()); + assert(lock->is_stable()); + + CInode *in = 0; + if (lock->get_cap_shift()) + in = static_cast(lock->get_parent()); + + switch (lock->get_state()) { + case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break; + case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break; + case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_rdlocked()) + gather++; + if (lock->is_wrlocked()) + gather++; + + if (lock->get_parent()->is_replicated() && + lock->get_state() != LOCK_LOCK_EXCL && + lock->get_state() != LOCK_XSYN_EXCL) { + send_lock_message(lock, LOCK_AC_LOCK); + lock->init_gather(); + gather++; + } + + if (in && in->is_head()) { + if (in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + } + + if (gather) { + lock->get_parent()->auth_pin(lock); + } else { + lock->set_state(LOCK_EXCL); + lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE); + if (in) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } + } +} + +void Locker::simple_lock(SimpleLock *lock, bool *need_issue) +{ + dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl; + assert(lock->get_parent()->is_auth()); + assert(lock->is_stable()); + assert(lock->get_state() != LOCK_LOCK); + + CInode *in = 0; + if (lock->get_cap_shift()) + in = static_cast(lock->get_parent()); + + int old_state = lock->get_state(); + + switch (lock->get_state()) { + case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break; + case LOCK_XSYN: + file_excl(static_cast(lock), need_issue); + if (lock->get_state() != LOCK_EXCL) + return; + // fall-thru + case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break; + case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); + (static_cast(lock))->clear_unscatter_wanted(); + break; + case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_leased()) { + gather++; + revoke_client_leases(lock); + } + if (lock->is_rdlocked()) + gather++; + if (in && in->is_head()) { + if (in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + } + + bool need_recover = false; + if (lock->get_type() == CEPH_LOCK_IFILE) { + assert(in); + if(in->state_test(CInode::STATE_NEEDSRECOVER)) { + mds->mdcache->queue_file_recover(in); + need_recover = true; + gather++; + } + } + + if (lock->get_parent()->is_replicated() && + lock->get_state() == LOCK_MIX_LOCK && + gather) { + dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl; + } else { + // move to second stage of gather now, so we don't send the lock action later. + if (lock->get_state() == LOCK_MIX_LOCK) + lock->set_state(LOCK_MIX_LOCK2); + + if (lock->get_parent()->is_replicated() && + lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK + gather++; + send_lock_message(lock, LOCK_AC_LOCK); + lock->init_gather(); + } + } + + if (!gather && lock->is_dirty()) { + lock->get_parent()->auth_pin(lock); + scatter_writebehind(static_cast(lock)); + mds->mdlog->flush(); + return; + } + + if (gather) { + lock->get_parent()->auth_pin(lock); + if (need_recover) + mds->mdcache->do_file_recover(); + } else { + lock->set_state(LOCK_LOCK); + lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE); + } +} + + +void Locker::simple_xlock(SimpleLock *lock) +{ + dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl; + assert(lock->get_parent()->is_auth()); + //assert(lock->is_stable()); + assert(lock->get_state() != LOCK_XLOCK); + + CInode *in = 0; + if (lock->get_cap_shift()) + in = static_cast(lock->get_parent()); + + if (lock->is_stable()) + lock->get_parent()->auth_pin(lock); + + switch (lock->get_state()) { + case LOCK_LOCK: + case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_rdlocked()) + gather++; + if (lock->is_wrlocked()) + gather++; + + if (in && in->is_head()) { + if (in->issued_caps_need_gather(lock)) { + issue_caps(in); + gather++; + } + } + + if (!gather) { + lock->set_state(LOCK_PREXLOCK); + //assert("shouldn't be called if we are already xlockable" == 0); + } +} + + + + + +// ========================================================================== +// scatter lock + +/* + +Some notes on scatterlocks. + + - The scatter/gather is driven by the inode lock. The scatter always + brings in the latest metadata from the fragments. + + - When in a scattered/MIX state, fragments are only allowed to + update/be written to if the accounted stat matches the inode's + current version. + + - That means, on gather, we _only_ assimilate diffs for frag metadata + that match the current version, because those are the only ones + written during this scatter/gather cycle. (Others didn't permit + it.) We increment the version and journal this to disk. + + - When possible, we also simultaneously update our local frag + accounted stats to match. + + - On scatter, the new inode info is broadcast to frags, both local + and remote. If possible (auth and !frozen), the dirfrag auth + should update the accounted state (if it isn't already up to date). + Note that this may occur on both the local inode auth node and + inode replicas, so there are two potential paths. If it is NOT + possible, they need to mark_stale to prevent any possible writes. + + - A scatter can be to MIX (potentially writeable) or to SYNC (read + only). Both are opportunities to update the frag accounted stats, + even though only the MIX case is affected by a stale dirfrag. + + - Because many scatter/gather cycles can potentially go by without a + frag being able to update its accounted stats (due to being frozen + by exports/refragments in progress), the frag may have (even very) + old stat versions. That's fine. If when we do want to update it, + we can update accounted_* and the version first. + +*/ + +class C_Locker_ScatterWB : public LockerLogContext { + ScatterLock *lock; + MutationRef mut; +public: + C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) : + LockerLogContext(l), lock(sl), mut(m) {} + void finish(int r) override { + locker->scatter_writebehind_finish(lock, mut); + } +}; + +void Locker::scatter_writebehind(ScatterLock *lock) +{ + CInode *in = static_cast(lock->get_parent()); + dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl; + + // journal + MutationRef mut(new MutationImpl()); + mut->ls = mds->mdlog->get_current_segment(); + + // forcefully take a wrlock + lock->get_wrlock(true); + mut->wrlocks.insert(lock); + mut->locks.insert(lock); + + in->pre_cow_old_inode(); // avoid cow mayhem + + inode_t *pi = in->project_inode(); + pi->version = in->pre_dirty(); + + in->finish_scatter_gather_update(lock->get_type()); + lock->start_flush(); + + EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind"); + mds->mdlog->start_entry(le); + + mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY); + mdcache->journal_dirty_inode(mut.get(), &le->metablob, in); + + in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob); + + mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut)); +} + +void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut) +{ + CInode *in = static_cast(lock->get_parent()); + dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl; + in->pop_and_dirty_projected_inode(mut->ls); + + lock->finish_flush(); + + // if replicas may have flushed in a mix->lock state, send another + // message so they can finish_flush(). + if (in->is_replicated()) { + switch (lock->get_state()) { + case LOCK_MIX_LOCK: + case LOCK_MIX_LOCK2: + case LOCK_MIX_EXCL: + case LOCK_MIX_TSYN: + send_lock_message(lock, LOCK_AC_LOCKFLUSHED); + } + } + + mut->apply(); + drop_locks(mut.get()); + mut->cleanup(); + + if (lock->is_stable()) + lock->finish_waiters(ScatterLock::WAIT_STABLE); + + //scatter_eval_gather(lock); +} + +void Locker::scatter_eval(ScatterLock *lock, bool *need_issue) +{ + dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl; + + assert(lock->get_parent()->is_auth()); + assert(lock->is_stable()); + + if (lock->get_parent()->is_freezing_or_frozen()) { + dout(20) << " freezing|frozen" << dendl; + return; + } + + if (mdcache->is_readonly()) { + if (lock->get_state() != LOCK_SYNC) { + dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl; + simple_sync(lock, need_issue); + } + return; + } + + if (!lock->is_rdlocked() && + lock->get_state() != LOCK_MIX && + lock->get_scatter_wanted()) { + dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock + << " on " << *lock->get_parent() << dendl; + scatter_mix(lock, need_issue); + return; + } + + if (lock->get_type() == CEPH_LOCK_INEST) { + // in general, we want to keep INEST writable at all times. + if (!lock->is_rdlocked()) { + if (lock->get_parent()->is_replicated()) { + if (lock->get_state() != LOCK_MIX) + scatter_mix(lock, need_issue); + } else { + if (lock->get_state() != LOCK_LOCK) + simple_lock(lock, need_issue); + } + } + return; + } + + CInode *in = static_cast(lock->get_parent()); + if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) { + // i _should_ be sync. + if (!lock->is_wrlocked() && + lock->get_state() != LOCK_SYNC) { + dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl; + simple_sync(lock, need_issue); + } + } +} + + +/* + * mark a scatterlock to indicate that the dir fnode has some dirty data + */ +void Locker::mark_updated_scatterlock(ScatterLock *lock) +{ + lock->mark_dirty(); + if (lock->get_updated_item()->is_on_list()) { + dout(10) << "mark_updated_scatterlock " << *lock + << " - already on list since " << lock->get_update_stamp() << dendl; + } else { + updated_scatterlocks.push_back(lock->get_updated_item()); + utime_t now = ceph_clock_now(); + lock->set_update_stamp(now); + dout(10) << "mark_updated_scatterlock " << *lock + << " - added at " << now << dendl; + } +} + +/* + * this is called by scatter_tick and LogSegment::try_to_trim() when + * trying to flush dirty scattered data (i.e. updated fnode) back to + * the inode. + * + * we need to lock|scatter in order to push fnode changes into the + * inode.dirstat. + */ +void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange) +{ + CInode *p = static_cast(lock->get_parent()); + + if (p->is_frozen() || p->is_freezing()) { + dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl; + if (c) + p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c); + else + // just requeue. not ideal.. starvation prone.. + updated_scatterlocks.push_back(lock->get_updated_item()); + return; + } + + if (p->is_ambiguous_auth()) { + dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl; + if (c) + p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c); + else + // just requeue. not ideal.. starvation prone.. + updated_scatterlocks.push_back(lock->get_updated_item()); + return; + } + + if (p->is_auth()) { + int count = 0; + while (true) { + if (lock->is_stable()) { + // can we do it now? + // (only if we're not replicated.. if we are, we really do need + // to nudge the lock state!) + /* + actually, even if we're not replicated, we can't stay in MIX, because another mds + could discover and replicate us at any time. if that happens while we're flushing, + they end up in MIX but their inode has the old scatterstat version. + + if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) { + dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl; + scatter_writebehind(lock); + if (c) + lock->add_waiter(SimpleLock::WAIT_STABLE, c); + return; + } + */ + + if (mdcache->is_readonly()) { + if (lock->get_state() != LOCK_SYNC) { + dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl; + simple_sync(static_cast(lock)); + } + break; + } + + // adjust lock state + dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl; + switch (lock->get_type()) { + case CEPH_LOCK_IFILE: + if (p->is_replicated() && lock->get_state() != LOCK_MIX) + scatter_mix(static_cast(lock)); + else if (lock->get_state() != LOCK_LOCK) + simple_lock(static_cast(lock)); + else + simple_sync(static_cast(lock)); + break; + + case CEPH_LOCK_IDFT: + case CEPH_LOCK_INEST: + if (p->is_replicated() && lock->get_state() != LOCK_MIX) + scatter_mix(lock); + else if (lock->get_state() != LOCK_LOCK) + simple_lock(lock); + else + simple_sync(lock); + break; + default: + ceph_abort(); + } + ++count; + if (lock->is_stable() && count == 2) { + dout(10) << "scatter_nudge oh, stable after two cycles." << dendl; + // this should only realy happen when called via + // handle_file_lock due to AC_NUDGE, because the rest of the + // time we are replicated or have dirty data and won't get + // called. bailing here avoids an infinite loop. + assert(!c); + break; + } + } else { + dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl; + if (c) + lock->add_waiter(SimpleLock::WAIT_STABLE, c); + return; + } + } + } else { + dout(10) << "scatter_nudge replica, requesting scatter/unscatter of " + << *lock << " on " << *p << dendl; + // request unscatter? + mds_rank_t auth = lock->get_parent()->authority().first; + if (!mds->is_cluster_degraded() || + mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) + mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth); + + // wait... + if (c) + lock->add_waiter(SimpleLock::WAIT_STABLE, c); + + // also, requeue, in case we had wrong auth or something + updated_scatterlocks.push_back(lock->get_updated_item()); + } +} + +void Locker::scatter_tick() +{ + dout(10) << "scatter_tick" << dendl; + + // updated + utime_t now = ceph_clock_now(); + int n = updated_scatterlocks.size(); + while (!updated_scatterlocks.empty()) { + ScatterLock *lock = updated_scatterlocks.front(); + + if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping + + if (!lock->is_dirty()) { + updated_scatterlocks.pop_front(); + dout(10) << " removing from updated_scatterlocks " + << *lock << " " << *lock->get_parent() << dendl; + continue; + } + if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval) + break; + updated_scatterlocks.pop_front(); + scatter_nudge(lock, 0); + } + mds->mdlog->flush(); +} + + +void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue) +{ + dout(10) << "scatter_tempsync " << *lock + << " on " << *lock->get_parent() << dendl; + assert(lock->get_parent()->is_auth()); + assert(lock->is_stable()); + + assert(0 == "not fully implemented, at least not for filelock"); + + CInode *in = static_cast(lock->get_parent()); + + switch (lock->get_state()) { + case LOCK_SYNC: ceph_abort(); // this shouldn't happen + case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break; + case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_wrlocked()) + gather++; + + if (lock->get_cap_shift() && + in->is_head() && + in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + + if (lock->get_state() == LOCK_MIX_TSYN && + in->is_replicated()) { + lock->init_gather(); + send_lock_message(lock, LOCK_AC_LOCK); + gather++; + } + + if (gather) { + in->auth_pin(lock); + } else { + // do tempsync + lock->set_state(LOCK_TSYN); + lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE); + if (lock->get_cap_shift()) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } + } +} + + + +// ========================================================================== +// local lock + +void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut) +{ + dout(7) << "local_wrlock_grab on " << *lock + << " on " << *lock->get_parent() << dendl; + + assert(lock->get_parent()->is_auth()); + assert(lock->can_wrlock()); + assert(!mut->wrlocks.count(lock)); + lock->get_wrlock(mut->get_client()); + mut->wrlocks.insert(lock); + mut->locks.insert(lock); +} + +bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut) +{ + dout(7) << "local_wrlock_start on " << *lock + << " on " << *lock->get_parent() << dendl; + + assert(lock->get_parent()->is_auth()); + if (lock->can_wrlock()) { + assert(!mut->wrlocks.count(lock)); + lock->get_wrlock(mut->get_client()); + mut->wrlocks.insert(lock); + mut->locks.insert(lock); + return true; + } else { + lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); + return false; + } +} + +void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut) +{ + dout(7) << "local_wrlock_finish on " << *lock + << " on " << *lock->get_parent() << dendl; + lock->put_wrlock(); + mut->wrlocks.erase(lock); + mut->locks.erase(lock); + if (lock->get_num_wrlocks() == 0) { + lock->finish_waiters(SimpleLock::WAIT_STABLE | + SimpleLock::WAIT_WR | + SimpleLock::WAIT_RD); + } +} + +bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut) +{ + dout(7) << "local_xlock_start on " << *lock + << " on " << *lock->get_parent() << dendl; + + assert(lock->get_parent()->is_auth()); + if (!lock->can_xlock_local()) { + lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); + return false; + } + + lock->get_xlock(mut, mut->get_client()); + mut->xlocks.insert(lock); + mut->locks.insert(lock); + return true; +} + +void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut) +{ + dout(7) << "local_xlock_finish on " << *lock + << " on " << *lock->get_parent() << dendl; + lock->put_xlock(); + mut->xlocks.erase(lock); + mut->locks.erase(lock); + + lock->finish_waiters(SimpleLock::WAIT_STABLE | + SimpleLock::WAIT_WR | + SimpleLock::WAIT_RD); +} + + + +// ========================================================================== +// file lock + + +void Locker::file_eval(ScatterLock *lock, bool *need_issue) +{ + CInode *in = static_cast(lock->get_parent()); + int loner_wanted, other_wanted; + int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE); + dout(7) << "file_eval wanted=" << gcap_string(wanted) + << " loner_wanted=" << gcap_string(loner_wanted) + << " other_wanted=" << gcap_string(other_wanted) + << " filelock=" << *lock << " on " << *lock->get_parent() + << dendl; + + assert(lock->get_parent()->is_auth()); + assert(lock->is_stable()); + + if (lock->get_parent()->is_freezing_or_frozen()) + return; + + if (mdcache->is_readonly()) { + if (lock->get_state() != LOCK_SYNC) { + dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl; + simple_sync(lock, need_issue); + } + return; + } + + // excl -> *? + if (lock->get_state() == LOCK_EXCL) { + dout(20) << " is excl" << dendl; + int loner_issued, other_issued, xlocker_issued; + in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE); + dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued) + << " other_issued=" << gcap_string(other_issued) + << " xlocker_issued=" << gcap_string(xlocker_issued) + << dendl; + if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) || + (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) || + (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/ + dout(20) << " should lose it" << dendl; + // we should lose it. + // loner other want + // R R SYNC + // R R|W MIX + // R W MIX + // R|W R MIX + // R|W R|W MIX + // R|W W MIX + // W R MIX + // W R|W MIX + // W W MIX + // -> any writer means MIX; RD doesn't matter. + if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) || + lock->is_waiter_for(SimpleLock::WAIT_WR)) + scatter_mix(lock, need_issue); + else if (!lock->is_wrlocked()) // let excl wrlocks drain first + simple_sync(lock, need_issue); + else + dout(10) << " waiting for wrlock to drain" << dendl; + } + } + + // * -> excl? + else if (lock->get_state() != LOCK_EXCL && + !lock->is_rdlocked() && + //!lock->is_waiter_for(SimpleLock::WAIT_WR) && + ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) || + (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) && + in->get_target_loner() >= 0) { + dout(7) << "file_eval stable, bump to loner " << *lock + << " on " << *lock->get_parent() << dendl; + file_excl(lock, need_issue); + } + + // * -> mixed? + else if (lock->get_state() != LOCK_MIX && + !lock->is_rdlocked() && + //!lock->is_waiter_for(SimpleLock::WAIT_WR) && + (lock->get_scatter_wanted() || + (in->get_wanted_loner() < 0 && (wanted & CEPH_CAP_GWR)))) { + dout(7) << "file_eval stable, bump to mixed " << *lock + << " on " << *lock->get_parent() << dendl; + scatter_mix(lock, need_issue); + } + + // * -> sync? + else if (lock->get_state() != LOCK_SYNC && + !lock->is_wrlocked() && // drain wrlocks first! + !lock->is_waiter_for(SimpleLock::WAIT_WR) && + !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) && + !((lock->get_state() == LOCK_MIX) && + in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are + //((wanted & CEPH_CAP_RD) || + //in->is_replicated() || + //lock->get_num_client_lease() || + //(!loner && lock->get_state() == LOCK_EXCL)) && + ) { + dout(7) << "file_eval stable, bump to sync " << *lock + << " on " << *lock->get_parent() << dendl; + simple_sync(lock, need_issue); + } +} + + + +void Locker::scatter_mix(ScatterLock *lock, bool *need_issue) +{ + dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl; + + CInode *in = static_cast(lock->get_parent()); + assert(in->is_auth()); + assert(lock->is_stable()); + + if (lock->get_state() == LOCK_LOCK) { + in->start_scatter(lock); + if (in->is_replicated()) { + // data + bufferlist softdata; + lock->encode_locked_state(softdata); + + // bcast to replicas + send_lock_message(lock, LOCK_AC_MIX, softdata); + } + + // change lock + lock->set_state(LOCK_MIX); + lock->clear_scatter_wanted(); + if (lock->get_cap_shift()) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } + } else { + // gather? + switch (lock->get_state()) { + case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break; + case LOCK_XSYN: + file_excl(lock, need_issue); + if (lock->get_state() != LOCK_EXCL) + return; + // fall-thru + case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break; + case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_rdlocked()) + gather++; + if (in->is_replicated()) { + if (lock->get_state() != LOCK_EXCL_MIX && // EXCL replica is already LOCK + lock->get_state() != LOCK_XSYN_EXCL) { // XSYN replica is already LOCK; ** FIXME here too! + send_lock_message(lock, LOCK_AC_MIX); + lock->init_gather(); + gather++; + } + } + if (lock->is_leased()) { + revoke_client_leases(lock); + gather++; + } + if (lock->get_cap_shift() && + in->is_head() && + in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + bool need_recover = false; + if (in->state_test(CInode::STATE_NEEDSRECOVER)) { + mds->mdcache->queue_file_recover(in); + need_recover = true; + gather++; + } + + if (gather) { + lock->get_parent()->auth_pin(lock); + if (need_recover) + mds->mdcache->do_file_recover(); + } else { + in->start_scatter(lock); + lock->set_state(LOCK_MIX); + lock->clear_scatter_wanted(); + if (in->is_replicated()) { + bufferlist softdata; + lock->encode_locked_state(softdata); + send_lock_message(lock, LOCK_AC_MIX, softdata); + } + if (lock->get_cap_shift()) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } + } + } +} + + +void Locker::file_excl(ScatterLock *lock, bool *need_issue) +{ + CInode *in = static_cast(lock->get_parent()); + dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl; + + assert(in->is_auth()); + assert(lock->is_stable()); + + assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) || + (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> + + switch (lock->get_state()) { + case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break; + case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break; + case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break; + case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break; + default: ceph_abort(); + } + int gather = 0; + + if (lock->is_rdlocked()) + gather++; + if (lock->is_wrlocked()) + gather++; + + if (in->is_replicated() && + lock->get_state() != LOCK_LOCK_EXCL && + lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock. + send_lock_message(lock, LOCK_AC_LOCK); + lock->init_gather(); + gather++; + } + if (lock->is_leased()) { + revoke_client_leases(lock); + gather++; + } + if (in->is_head() && + in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + bool need_recover = false; + if (in->state_test(CInode::STATE_NEEDSRECOVER)) { + mds->mdcache->queue_file_recover(in); + need_recover = true; + gather++; + } + + if (gather) { + lock->get_parent()->auth_pin(lock); + if (need_recover) + mds->mdcache->do_file_recover(); + } else { + lock->set_state(LOCK_EXCL); + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } +} + +void Locker::file_xsyn(SimpleLock *lock, bool *need_issue) +{ + dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl; + CInode *in = static_cast(lock->get_parent()); + assert(in->is_auth()); + assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty()); + + switch (lock->get_state()) { + case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_wrlocked()) + gather++; + + if (in->is_head() && + in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + + if (gather) { + lock->get_parent()->auth_pin(lock); + } else { + lock->set_state(LOCK_XSYN); + lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } +} + +void Locker::file_recover(ScatterLock *lock) +{ + CInode *in = static_cast(lock->get_parent()); + dout(7) << "file_recover " << *lock << " on " << *in << dendl; + + assert(in->is_auth()); + //assert(lock->is_stable()); + assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover() + + int gather = 0; + + /* + if (in->is_replicated() + lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) { + send_lock_message(lock, LOCK_AC_LOCK); + lock->init_gather(); + gather++; + } + */ + if (in->is_head() && + in->issued_caps_need_gather(lock)) { + issue_caps(in); + gather++; + } + + lock->set_state(LOCK_SCAN); + if (gather) + in->state_set(CInode::STATE_NEEDSRECOVER); + else + mds->mdcache->queue_file_recover(in); +} + + +// messenger +/* This function DOES put the passed message before returning */ +void Locker::handle_file_lock(ScatterLock *lock, MLock *m) +{ + CInode *in = static_cast(lock->get_parent()); + int from = m->get_asker(); + + if (mds->is_rejoin()) { + if (in->is_rejoining()) { + dout(7) << "handle_file_lock still rejoining " << *in + << ", dropping " << *m << dendl; + m->put(); + return; + } + } + + dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action()) + << " on " << *lock + << " from mds." << from << " " + << *in << dendl; + + bool caps = lock->get_cap_shift(); + + switch (m->get_action()) { + // -- replica -- + case LOCK_AC_SYNC: + assert(lock->get_state() == LOCK_LOCK || + lock->get_state() == LOCK_MIX || + lock->get_state() == LOCK_MIX_SYNC2); + + if (lock->get_state() == LOCK_MIX) { + lock->set_state(LOCK_MIX_SYNC); + eval_gather(lock, true); + if (lock->is_unstable_and_locked()) + mds->mdlog->flush(); + break; + } + + (static_cast(lock))->finish_flush(); + (static_cast(lock))->clear_flushed(); + + // ok + lock->decode_locked_state(m->get_data()); + lock->set_state(LOCK_SYNC); + + lock->get_rdlock(); + if (caps) + issue_caps(in); + lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); + lock->put_rdlock(); + break; + + case LOCK_AC_LOCK: + switch (lock->get_state()) { + case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break; + case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break; + default: ceph_abort(); + } + + eval_gather(lock, true); + if (lock->is_unstable_and_locked()) + mds->mdlog->flush(); + + break; + + case LOCK_AC_LOCKFLUSHED: + (static_cast(lock))->finish_flush(); + (static_cast(lock))->clear_flushed(); + // wake up scatter_nudge waiters + if (lock->is_stable()) + lock->finish_waiters(SimpleLock::WAIT_STABLE); + break; + + case LOCK_AC_MIX: + assert(lock->get_state() == LOCK_SYNC || + lock->get_state() == LOCK_LOCK || + lock->get_state() == LOCK_SYNC_MIX2); + + if (lock->get_state() == LOCK_SYNC) { + // MIXED + lock->set_state(LOCK_SYNC_MIX); + eval_gather(lock, true); + if (lock->is_unstable_and_locked()) + mds->mdlog->flush(); + break; + } + + // ok + lock->set_state(LOCK_MIX); + lock->decode_locked_state(m->get_data()); + + if (caps) + issue_caps(in); + + lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE); + break; + + + // -- auth -- + case LOCK_AC_LOCKACK: + assert(lock->get_state() == LOCK_SYNC_LOCK || + lock->get_state() == LOCK_MIX_LOCK || + lock->get_state() == LOCK_MIX_LOCK2 || + lock->get_state() == LOCK_MIX_EXCL || + lock->get_state() == LOCK_SYNC_EXCL || + lock->get_state() == LOCK_SYNC_MIX || + lock->get_state() == LOCK_MIX_TSYN); + assert(lock->is_gathering(from)); + lock->remove_gather(from); + + if (lock->get_state() == LOCK_MIX_LOCK || + lock->get_state() == LOCK_MIX_LOCK2 || + lock->get_state() == LOCK_MIX_EXCL || + lock->get_state() == LOCK_MIX_TSYN) { + lock->decode_locked_state(m->get_data()); + // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not + // delay calling scatter_writebehind(). + lock->clear_flushed(); + } + + if (lock->is_gathering()) { + dout(7) << "handle_file_lock " << *in << " from " << from + << ", still gathering " << lock->get_gather_set() << dendl; + } else { + dout(7) << "handle_file_lock " << *in << " from " << from + << ", last one" << dendl; + eval_gather(lock); + } + break; + + case LOCK_AC_SYNCACK: + assert(lock->get_state() == LOCK_MIX_SYNC); + assert(lock->is_gathering(from)); + lock->remove_gather(from); + + lock->decode_locked_state(m->get_data()); + + if (lock->is_gathering()) { + dout(7) << "handle_file_lock " << *in << " from " << from + << ", still gathering " << lock->get_gather_set() << dendl; + } else { + dout(7) << "handle_file_lock " << *in << " from " << from + << ", last one" << dendl; + eval_gather(lock); + } + break; + + case LOCK_AC_MIXACK: + assert(lock->get_state() == LOCK_SYNC_MIX); + assert(lock->is_gathering(from)); + lock->remove_gather(from); + + if (lock->is_gathering()) { + dout(7) << "handle_file_lock " << *in << " from " << from + << ", still gathering " << lock->get_gather_set() << dendl; + } else { + dout(7) << "handle_file_lock " << *in << " from " << from + << ", last one" << dendl; + eval_gather(lock); + } + break; + + + // requests.... + case LOCK_AC_REQSCATTER: + if (lock->is_stable()) { + /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing) + * because the replica should be holding an auth_pin if they're + * doing this (and thus, we are freezing, not frozen, and indefinite + * starvation isn't an issue). + */ + dout(7) << "handle_file_lock got scatter request on " << *lock + << " on " << *lock->get_parent() << dendl; + if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter + scatter_mix(lock); + } else { + dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock + << " on " << *lock->get_parent() << dendl; + lock->set_scatter_wanted(); + } + break; + + case LOCK_AC_REQUNSCATTER: + if (lock->is_stable()) { + /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing) + * because the replica should be holding an auth_pin if they're + * doing this (and thus, we are freezing, not frozen, and indefinite + * starvation isn't an issue). + */ + dout(7) << "handle_file_lock got unscatter request on " << *lock + << " on " << *lock->get_parent() << dendl; + if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter + simple_lock(lock); // FIXME tempsync? + } else { + dout(7) << "handle_file_lock ignoring unscatter request on " << *lock + << " on " << *lock->get_parent() << dendl; + lock->set_unscatter_wanted(); + } + break; + + case LOCK_AC_REQRDLOCK: + handle_reqrdlock(lock, m); + break; + + case LOCK_AC_NUDGE: + if (!lock->get_parent()->is_auth()) { + dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock + << " on " << *lock->get_parent() << dendl; + } else if (!lock->get_parent()->is_replicated()) { + dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock + << " on " << *lock->get_parent() << dendl; + } else { + dout(7) << "handle_file_lock trying nudge on " << *lock + << " on " << *lock->get_parent() << dendl; + scatter_nudge(lock, 0, true); + mds->mdlog->flush(); + } + break; + + default: + ceph_abort(); + } + + m->put(); +} + + + + + +