// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include "MDSRank.h" #include "MDCache.h" #include "Locker.h" #include "CInode.h" #include "CDir.h" #include "CDentry.h" #include "Mutation.h" #include "MDSContext.h" #include "MDLog.h" #include "MDSMap.h" #include "events/EUpdate.h" #include "events/EOpen.h" #include "msg/Messenger.h" #include "osdc/Objecter.h" #include "messages/MInodeFileCaps.h" #include "messages/MLock.h" #include "messages/MClientLease.h" #include "messages/MClientReply.h" #include "messages/MClientCaps.h" #include "messages/MClientCapRelease.h" #include "messages/MMDSSlaveRequest.h" #include #include "common/config.h" #define dout_subsys ceph_subsys_mds #undef dout_prefix #define dout_context g_ceph_context #define dout_prefix _prefix(_dout, mds) static ostream& _prefix(std::ostream *_dout, MDSRank *mds) { return *_dout << "mds." << mds->get_nodeid() << ".locker "; } class LockerContext : public MDSInternalContextBase { protected: Locker *locker; MDSRank *get_mds() override { return locker->mds; } public: explicit LockerContext(Locker *locker_) : locker(locker_) { assert(locker != NULL); } }; class LockerLogContext : public MDSLogContextBase { protected: Locker *locker; MDSRank *get_mds() override { return locker->mds; } public: explicit LockerLogContext(Locker *locker_) : locker(locker_) { assert(locker != NULL); } }; /* This function DOES put the passed message before returning */ void Locker::dispatch(Message *m) { switch (m->get_type()) { // inter-mds locking case MSG_MDS_LOCK: handle_lock(static_cast(m)); break; // inter-mds caps case MSG_MDS_INODEFILECAPS: handle_inode_file_caps(static_cast(m)); break; // client sync case CEPH_MSG_CLIENT_CAPS: handle_client_caps(static_cast(m)); break; case CEPH_MSG_CLIENT_CAPRELEASE: handle_client_cap_release(static_cast(m)); break; case CEPH_MSG_CLIENT_LEASE: handle_client_lease(static_cast(m)); break; default: derr << "locker unknown message " << m->get_type() << dendl; assert(0 == "locker unknown message"); } } void Locker::tick() { scatter_tick(); caps_tick(); } /* * locks vs rejoin * * * */ void Locker::send_lock_message(SimpleLock *lock, int msg) { for (const auto &it : lock->get_parent()->get_replicas()) { if (mds->is_cluster_degraded() && mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN) continue; MLock *m = new MLock(lock, msg, mds->get_nodeid()); mds->send_message_mds(m, it.first); } } void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data) { for (const auto &it : lock->get_parent()->get_replicas()) { if (mds->is_cluster_degraded() && mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN) continue; MLock *m = new MLock(lock, msg, mds->get_nodeid()); m->set_data(data); mds->send_message_mds(m, it.first); } } void Locker::include_snap_rdlocks(set& rdlocks, CInode *in) { // rdlock ancestor snaps CInode *t = in; rdlocks.insert(&in->snaplock); while (t->get_projected_parent_dn()) { t = t->get_projected_parent_dn()->get_dir()->get_inode(); rdlocks.insert(&t->snaplock); } } void Locker::include_snap_rdlocks_wlayout(set& rdlocks, CInode *in, file_layout_t **layout) { //rdlock ancestor snaps CInode *t = in; rdlocks.insert(&in->snaplock); rdlocks.insert(&in->policylock); bool found_layout = false; while (t) { rdlocks.insert(&t->snaplock); if (!found_layout) { rdlocks.insert(&t->policylock); if (t->get_projected_inode()->has_layout()) { *layout = &t->get_projected_inode()->layout; found_layout = true; } } if (t->get_projected_parent_dn() && t->get_projected_parent_dn()->get_dir()) t = t->get_projected_parent_dn()->get_dir()->get_inode(); else t = NULL; } } struct MarkEventOnDestruct { MDRequestRef& mdr; const char* message; bool mark_event; MarkEventOnDestruct(MDRequestRef& _mdr, const char *_message) : mdr(_mdr), message(_message), mark_event(true) {} ~MarkEventOnDestruct() { if (mark_event) mdr->mark_event(message); } }; /* If this function returns false, the mdr has been placed * on the appropriate wait list */ bool Locker::acquire_locks(MDRequestRef& mdr, set &rdlocks, set &wrlocks, set &xlocks, map *remote_wrlocks, CInode *auth_pin_freeze, bool auth_pin_nonblock) { if (mdr->done_locking && !mdr->is_slave()) { // not on slaves! master requests locks piecemeal. dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl; return true; // at least we had better be! } dout(10) << "acquire_locks " << *mdr << dendl; MarkEventOnDestruct marker(mdr, "failed to acquire_locks"); client_t client = mdr->get_client(); set sorted; // sort everything we will lock set mustpin; // items to authpin // xlocks for (set::iterator p = xlocks.begin(); p != xlocks.end(); ++p) { dout(20) << " must xlock " << **p << " " << *(*p)->get_parent() << dendl; sorted.insert(*p); mustpin.insert((*p)->get_parent()); // augment xlock with a versionlock? if ((*p)->get_type() == CEPH_LOCK_DN) { CDentry *dn = (CDentry*)(*p)->get_parent(); if (!dn->is_auth()) continue; if (xlocks.count(&dn->versionlock)) continue; // we're xlocking the versionlock too; don't wrlock it! if (mdr->is_master()) { // master. wrlock versionlock so we can pipeline dentry updates to journal. wrlocks.insert(&dn->versionlock); } else { // slave. exclusively lock the dentry version (i.e. block other journal updates). // this makes rollback safe. xlocks.insert(&dn->versionlock); sorted.insert(&dn->versionlock); } } if ((*p)->get_type() > CEPH_LOCK_IVERSION) { // inode version lock? CInode *in = (CInode*)(*p)->get_parent(); if (!in->is_auth()) continue; if (mdr->is_master()) { // master. wrlock versionlock so we can pipeline inode updates to journal. wrlocks.insert(&in->versionlock); } else { // slave. exclusively lock the inode version (i.e. block other journal updates). // this makes rollback safe. xlocks.insert(&in->versionlock); sorted.insert(&in->versionlock); } } } // wrlocks for (set::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) { MDSCacheObject *object = (*p)->get_parent(); dout(20) << " must wrlock " << **p << " " << *object << dendl; sorted.insert(*p); if (object->is_auth()) mustpin.insert(object); else if (!object->is_auth() && !(*p)->can_wrlock(client) && // we might have to request a scatter !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned dout(15) << " will also auth_pin " << *object << " in case we need to request a scatter" << dendl; mustpin.insert(object); } } // remote_wrlocks if (remote_wrlocks) { for (map::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) { MDSCacheObject *object = p->first->get_parent(); dout(20) << " must remote_wrlock on mds." << p->second << " " << *p->first << " " << *object << dendl; sorted.insert(p->first); mustpin.insert(object); } } // rdlocks for (set::iterator p = rdlocks.begin(); p != rdlocks.end(); ++p) { MDSCacheObject *object = (*p)->get_parent(); dout(20) << " must rdlock " << **p << " " << *object << dendl; sorted.insert(*p); if (object->is_auth()) mustpin.insert(object); else if (!object->is_auth() && !(*p)->can_rdlock(client)) { // we might have to request an rdlock dout(15) << " will also auth_pin " << *object << " in case we need to request a rdlock" << dendl; mustpin.insert(object); } } // AUTH PINS map > mustpin_remote; // mds -> (object set) // can i auth pin them all now? marker.message = "failed to authpin local pins"; for (set::iterator p = mustpin.begin(); p != mustpin.end(); ++p) { MDSCacheObject *object = *p; dout(10) << " must authpin " << *object << dendl; if (mdr->is_auth_pinned(object)) { if (object != (MDSCacheObject*)auth_pin_freeze) continue; if (mdr->more()->is_remote_frozen_authpin) { if (mdr->more()->rename_inode == auth_pin_freeze) continue; // unfreeze auth pin for the wrong inode mustpin_remote[mdr->more()->rename_inode->authority().first].size(); } } if (!object->is_auth()) { if (!mdr->locks.empty()) drop_locks(mdr.get()); if (object->is_ambiguous_auth()) { // wait dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl; object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr)); mdr->drop_local_auth_pins(); return false; } mustpin_remote[object->authority().first].insert(object); continue; } if (!object->can_auth_pin()) { // wait drop_locks(mdr.get()); mdr->drop_local_auth_pins(); if (auth_pin_nonblock) { dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl; mdr->aborted = true; return false; } dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl; object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr)); if (!mdr->remote_auth_pins.empty()) notify_freeze_waiter(object); return false; } } // ok, grab local auth pins for (set::iterator p = mustpin.begin(); p != mustpin.end(); ++p) { MDSCacheObject *object = *p; if (mdr->is_auth_pinned(object)) { dout(10) << " already auth_pinned " << *object << dendl; } else if (object->is_auth()) { dout(10) << " auth_pinning " << *object << dendl; mdr->auth_pin(object); } } // request remote auth_pins if (!mustpin_remote.empty()) { marker.message = "requesting remote authpins"; for (map::iterator p = mdr->remote_auth_pins.begin(); p != mdr->remote_auth_pins.end(); ++p) { if (mustpin.count(p->first)) { assert(p->second == p->first->authority().first); map >::iterator q = mustpin_remote.find(p->second); if (q != mustpin_remote.end()) q->second.insert(p->first); } } for (map >::iterator p = mustpin_remote.begin(); p != mustpin_remote.end(); ++p) { dout(10) << "requesting remote auth_pins from mds." << p->first << dendl; // wait for active auth if (mds->is_cluster_degraded() && !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) { dout(10) << " mds." << p->first << " is not active" << dendl; if (mdr->more()->waiting_on_slave.empty()) mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr)); return false; } MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPIN); for (set::iterator q = p->second.begin(); q != p->second.end(); ++q) { dout(10) << " req remote auth_pin of " << **q << dendl; MDSCacheObjectInfo info; (*q)->set_object_info(info); req->get_authpins().push_back(info); if (*q == auth_pin_freeze) (*q)->set_object_info(req->get_authpin_freeze()); mdr->pin(*q); } if (auth_pin_nonblock) req->mark_nonblock(); mds->send_message_mds(req, p->first); // put in waiting list assert(mdr->more()->waiting_on_slave.count(p->first) == 0); mdr->more()->waiting_on_slave.insert(p->first); } return false; } // caps i'll need to issue set issue_set; bool result = false; // acquire locks. // make sure they match currently acquired locks. set::iterator existing = mdr->locks.begin(); for (set::iterator p = sorted.begin(); p != sorted.end(); ++p) { bool need_wrlock = !!wrlocks.count(*p); bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p)); // already locked? if (existing != mdr->locks.end() && *existing == *p) { // right kind? SimpleLock *have = *existing; ++existing; if (xlocks.count(have) && mdr->xlocks.count(have)) { dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl; continue; } if (mdr->remote_wrlocks.count(have)) { if (!need_remote_wrlock || mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) { dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have] << " " << *have << " " << *have->get_parent() << dendl; remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get()); } } if (need_wrlock || need_remote_wrlock) { if (need_wrlock == !!mdr->wrlocks.count(have) && need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) { if (need_wrlock) dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl; if (need_remote_wrlock) dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl; continue; } } if (rdlocks.count(have) && mdr->rdlocks.count(have)) { dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl; continue; } } // hose any stray locks if (existing != mdr->locks.end() && *existing == *p) { assert(need_wrlock || need_remote_wrlock); SimpleLock *lock = *existing; if (mdr->wrlocks.count(lock)) { if (!need_wrlock) dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl; else if (need_remote_wrlock) // acquire remote_wrlock first dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl; bool need_issue = false; wrlock_finish(lock, mdr.get(), &need_issue); if (need_issue) issue_set.insert(static_cast(lock->get_parent())); } ++existing; } while (existing != mdr->locks.end()) { SimpleLock *stray = *existing; ++existing; dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl; bool need_issue = false; if (mdr->xlocks.count(stray)) { xlock_finish(stray, mdr.get(), &need_issue); } else if (mdr->rdlocks.count(stray)) { rdlock_finish(stray, mdr.get(), &need_issue); } else { // may have acquired both wrlock and remore wrlock if (mdr->wrlocks.count(stray)) wrlock_finish(stray, mdr.get(), &need_issue); if (mdr->remote_wrlocks.count(stray)) remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get()); } if (need_issue) issue_set.insert(static_cast(stray->get_parent())); } // lock if (mdr->locking && *p != mdr->locking) { cancel_locking(mdr.get(), &issue_set); } if (xlocks.count(*p)) { marker.message = "failed to xlock, waiting"; if (!xlock_start(*p, mdr)) goto out; dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl; } else if (need_wrlock || need_remote_wrlock) { if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) { marker.message = "waiting for remote wrlocks"; remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr); goto out; } if (need_wrlock && !mdr->wrlocks.count(*p)) { marker.message = "failed to wrlock, waiting"; if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) { marker.message = "failed to wrlock, dropping remote wrlock and waiting"; // can't take the wrlock because the scatter lock is gathering. need to // release the remote wrlock, so that the gathering process can finish. remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get()); remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr); goto out; } // nowait if we have already gotten remote wrlock if (!wrlock_start(*p, mdr, need_remote_wrlock)) goto out; dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl; } } else { assert(mdr->is_master()); if ((*p)->is_scatterlock()) { ScatterLock *slock = static_cast(*p); if (slock->is_rejoin_mix()) { // If there is a recovering mds who replcated an object when it failed // and scatterlock in the object was in MIX state, It's possible that // the recovering mds needs to take wrlock on the scatterlock when it // replays unsafe requests. So this mds should delay taking rdlock on // the scatterlock until the recovering mds finishes replaying unsafe. // Otherwise unsafe requests may get replayed after current request. // // For example: // The recovering mds is auth mds of a dirfrag, this mds is auth mds // of correspinding inode. when 'rm -rf' the direcotry, this mds should // delay the rmdir request until the recovering mds has replayed unlink // requests. if (mds->is_cluster_degraded()) { if (!mdr->is_replay()) { drop_locks(mdr.get()); mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr)); dout(10) << " rejoin mix scatterlock " << *slock << " " << *(*p)->get_parent() << ", waiting for cluster recovered" << dendl; marker.message = "rejoin mix scatterlock, waiting for cluster recovered"; return false; } } else { slock->clear_rejoin_mix(); } } } marker.message = "failed to rdlock, waiting"; if (!rdlock_start(*p, mdr)) goto out; dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl; } } // any extra unneeded locks? while (existing != mdr->locks.end()) { SimpleLock *stray = *existing; ++existing; dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl; bool need_issue = false; if (mdr->xlocks.count(stray)) { xlock_finish(stray, mdr.get(), &need_issue); } else if (mdr->rdlocks.count(stray)) { rdlock_finish(stray, mdr.get(), &need_issue); } else { // may have acquired both wrlock and remore wrlock if (mdr->wrlocks.count(stray)) wrlock_finish(stray, mdr.get(), &need_issue); if (mdr->remote_wrlocks.count(stray)) remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get()); } if (need_issue) issue_set.insert(static_cast(stray->get_parent())); } mdr->done_locking = true; mdr->set_mds_stamp(ceph_clock_now()); result = true; marker.message = "acquired locks"; out: issue_caps_set(issue_set); return result; } void Locker::notify_freeze_waiter(MDSCacheObject *o) { CDir *dir = NULL; if (CInode *in = dynamic_cast(o)) { if (!in->is_root()) dir = in->get_parent_dir(); } else if (CDentry *dn = dynamic_cast(o)) { dir = dn->get_dir(); } else { dir = dynamic_cast(o); assert(dir); } if (dir) { if (dir->is_freezing_dir()) mdcache->fragment_freeze_inc_num_waiters(dir); if (dir->is_freezing_tree()) { while (!dir->is_freezing_tree_root()) dir = dir->get_parent_dir(); mdcache->migrator->export_freeze_inc_num_waiters(dir); } } } void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry) { for (set::iterator p = mut->xlocks.begin(); p != mut->xlocks.end(); ++p) { MDSCacheObject *object = (*p)->get_parent(); assert(object->is_auth()); if (skip_dentry && ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION)) continue; dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl; (*p)->set_xlock_done(); } } void Locker::_drop_rdlocks(MutationImpl *mut, set *pneed_issue) { while (!mut->rdlocks.empty()) { bool ni = false; MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent(); rdlock_finish(*mut->rdlocks.begin(), mut, &ni); if (ni) pneed_issue->insert(static_cast(p)); } } void Locker::_drop_non_rdlocks(MutationImpl *mut, set *pneed_issue) { set slaves; while (!mut->xlocks.empty()) { SimpleLock *lock = *mut->xlocks.begin(); MDSCacheObject *p = lock->get_parent(); if (!p->is_auth()) { assert(lock->get_sm()->can_remote_xlock); slaves.insert(p->authority().first); lock->put_xlock(); mut->locks.erase(lock); mut->xlocks.erase(lock); continue; } bool ni = false; xlock_finish(lock, mut, &ni); if (ni) pneed_issue->insert(static_cast(p)); } while (!mut->remote_wrlocks.empty()) { map::iterator p = mut->remote_wrlocks.begin(); slaves.insert(p->second); if (mut->wrlocks.count(p->first) == 0) mut->locks.erase(p->first); mut->remote_wrlocks.erase(p); } while (!mut->wrlocks.empty()) { bool ni = false; MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent(); wrlock_finish(*mut->wrlocks.begin(), mut, &ni); if (ni) pneed_issue->insert(static_cast(p)); } for (set::iterator p = slaves.begin(); p != slaves.end(); ++p) { if (!mds->is_cluster_degraded() || mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) { dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl; MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_DROPLOCKS); mds->send_message_mds(slavereq, *p); } } } void Locker::cancel_locking(MutationImpl *mut, set *pneed_issue) { SimpleLock *lock = mut->locking; assert(lock); dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl; if (lock->get_parent()->is_auth()) { bool need_issue = false; if (lock->get_state() == LOCK_PREXLOCK) { _finish_xlock(lock, -1, &need_issue); } else if (lock->get_state() == LOCK_LOCK_XLOCK && lock->get_num_xlocks() == 0) { lock->set_state(LOCK_XLOCKDONE); eval_gather(lock, true, &need_issue); } if (need_issue) pneed_issue->insert(static_cast(lock->get_parent())); } mut->finish_locking(lock); } void Locker::drop_locks(MutationImpl *mut, set *pneed_issue) { // leftover locks set my_need_issue; if (!pneed_issue) pneed_issue = &my_need_issue; if (mut->locking) cancel_locking(mut, pneed_issue); _drop_non_rdlocks(mut, pneed_issue); _drop_rdlocks(mut, pneed_issue); if (pneed_issue == &my_need_issue) issue_caps_set(*pneed_issue); mut->done_locking = false; } void Locker::drop_non_rdlocks(MutationImpl *mut, set *pneed_issue) { set my_need_issue; if (!pneed_issue) pneed_issue = &my_need_issue; _drop_non_rdlocks(mut, pneed_issue); if (pneed_issue == &my_need_issue) issue_caps_set(*pneed_issue); } void Locker::drop_rdlocks(MutationImpl *mut, set *pneed_issue) { set my_need_issue; if (!pneed_issue) pneed_issue = &my_need_issue; _drop_rdlocks(mut, pneed_issue); if (pneed_issue == &my_need_issue) issue_caps_set(*pneed_issue); } // generics void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list *pfinishers) { dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl; assert(!lock->is_stable()); int next = lock->get_next_state(); CInode *in = 0; bool caps = lock->get_cap_shift(); if (lock->get_type() != CEPH_LOCK_DN) in = static_cast(lock->get_parent()); bool need_issue = false; int loner_issued = 0, other_issued = 0, xlocker_issued = 0; assert(!caps || in != NULL); if (caps && in->is_head()) { in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, lock->get_cap_shift(), lock->get_cap_mask()); dout(10) << " next state is " << lock->get_state_name(next) << " issued/allows loner " << gcap_string(loner_issued) << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next)) << " xlocker " << gcap_string(xlocker_issued) << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next)) << " other " << gcap_string(other_issued) << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next)) << dendl; if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) || (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) || (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued))) need_issue = true; } #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH))) bool auth = lock->get_parent()->is_auth(); if (!lock->is_gathering() && (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) && (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) && (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) && (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) && !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind! (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 && (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 && (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) && lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds lock->get_state() != LOCK_MIX_SYNC2 ) { dout(7) << "eval_gather finished gather on " << *lock << " on " << *lock->get_parent() << dendl; if (lock->get_sm() == &sm_filelock) { assert(in); if (in->state_test(CInode::STATE_RECOVERING)) { dout(7) << "eval_gather finished gather, but still recovering" << dendl; return; } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) { dout(7) << "eval_gather finished gather, but need to recover" << dendl; mds->mdcache->queue_file_recover(in); mds->mdcache->do_file_recover(); return; } } if (!lock->get_parent()->is_auth()) { // replica: tell auth mds_rank_t auth = lock->get_parent()->authority().first; if (lock->get_parent()->is_rejoining() && mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) { dout(7) << "eval_gather finished gather, but still rejoining " << *lock->get_parent() << dendl; return; } if (!mds->is_cluster_degraded() || mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) { switch (lock->get_state()) { case LOCK_SYNC_LOCK: mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), auth); break; case LOCK_MIX_SYNC: { MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid()); lock->encode_locked_state(reply->get_data()); mds->send_message_mds(reply, auth); next = LOCK_MIX_SYNC2; (static_cast(lock))->start_flush(); } break; case LOCK_MIX_SYNC2: (static_cast(lock))->finish_flush(); (static_cast(lock))->clear_flushed(); case LOCK_SYNC_MIX2: // do nothing, we already acked break; case LOCK_SYNC_MIX: { MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid()); mds->send_message_mds(reply, auth); next = LOCK_SYNC_MIX2; } break; case LOCK_MIX_LOCK: { bufferlist data; lock->encode_locked_state(data); mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth); (static_cast(lock))->start_flush(); // we'll get an AC_LOCKFLUSHED to complete } break; default: ceph_abort(); } } } else { // auth // once the first (local) stage of mix->lock gather complete we can // gather from replicas if (lock->get_state() == LOCK_MIX_LOCK && lock->get_parent()->is_replicated()) { dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl; send_lock_message(lock, LOCK_AC_LOCK); lock->init_gather(); lock->set_state(LOCK_MIX_LOCK2); return; } if (lock->is_dirty() && !lock->is_flushed()) { scatter_writebehind(static_cast(lock)); mds->mdlog->flush(); return; } lock->clear_flushed(); switch (lock->get_state()) { // to mixed case LOCK_TSYN_MIX: case LOCK_SYNC_MIX: case LOCK_EXCL_MIX: in->start_scatter(static_cast(lock)); if (lock->get_parent()->is_replicated()) { bufferlist softdata; lock->encode_locked_state(softdata); send_lock_message(lock, LOCK_AC_MIX, softdata); } (static_cast(lock))->clear_scatter_wanted(); break; case LOCK_XLOCK: case LOCK_XLOCKDONE: if (next != LOCK_SYNC) break; // fall-thru // to sync case LOCK_EXCL_SYNC: case LOCK_LOCK_SYNC: case LOCK_MIX_SYNC: case LOCK_XSYN_SYNC: if (lock->get_parent()->is_replicated()) { bufferlist softdata; lock->encode_locked_state(softdata); send_lock_message(lock, LOCK_AC_SYNC, softdata); } break; } } lock->set_state(next); if (lock->get_parent()->is_auth() && lock->is_stable()) lock->get_parent()->auth_unpin(lock); // drop loner before doing waiters if (caps && in->is_head() && in->is_auth() && in->get_wanted_loner() != in->get_loner()) { dout(10) << " trying to drop loner" << dendl; if (in->try_drop_loner()) { dout(10) << " dropped loner" << dendl; need_issue = true; } } if (pfinishers) lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK, *pfinishers); else lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK); if (caps && in->is_head()) need_issue = true; if (lock->get_parent()->is_auth() && lock->is_stable()) try_eval(lock, &need_issue); } if (need_issue) { if (pneed_issue) *pneed_issue = true; else if (in->is_head()) issue_caps(in); } } bool Locker::eval(CInode *in, int mask, bool caps_imported) { bool need_issue = caps_imported; list finishers; dout(10) << "eval " << mask << " " << *in << dendl; // choose loner? if (in->is_auth() && in->is_head()) { if (in->choose_ideal_loner() >= 0) { if (in->try_set_loner()) { dout(10) << "eval set loner to client." << in->get_loner() << dendl; need_issue = true; mask = -1; } else dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl; } else dout(10) << "eval doesn't want loner" << dendl; } retry: if (mask & CEPH_LOCK_IFILE) eval_any(&in->filelock, &need_issue, &finishers, caps_imported); if (mask & CEPH_LOCK_IAUTH) eval_any(&in->authlock, &need_issue, &finishers, caps_imported); if (mask & CEPH_LOCK_ILINK) eval_any(&in->linklock, &need_issue, &finishers, caps_imported); if (mask & CEPH_LOCK_IXATTR) eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported); if (mask & CEPH_LOCK_INEST) eval_any(&in->nestlock, &need_issue, &finishers, caps_imported); if (mask & CEPH_LOCK_IFLOCK) eval_any(&in->flocklock, &need_issue, &finishers, caps_imported); if (mask & CEPH_LOCK_IPOLICY) eval_any(&in->policylock, &need_issue, &finishers, caps_imported); // drop loner? if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) { dout(10) << " trying to drop loner" << dendl; if (in->try_drop_loner()) { dout(10) << " dropped loner" << dendl; need_issue = true; if (in->get_wanted_loner() >= 0) { if (in->try_set_loner()) { dout(10) << "eval end set loner to client." << in->get_loner() << dendl; mask = -1; goto retry; } else { dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl; } } } } finish_contexts(g_ceph_context, finishers); if (need_issue && in->is_head()) issue_caps(in); dout(10) << "eval done" << dendl; return need_issue; } class C_Locker_Eval : public LockerContext { MDSCacheObject *p; int mask; public: C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) { // We are used as an MDSCacheObject waiter, so should // only be invoked by someone already holding the big lock. assert(locker->mds->mds_lock.is_locked_by_me()); p->get(MDSCacheObject::PIN_PTRWAITER); } void finish(int r) override { locker->try_eval(p, mask); p->put(MDSCacheObject::PIN_PTRWAITER); } }; void Locker::try_eval(MDSCacheObject *p, int mask) { // unstable and ambiguous auth? if (p->is_ambiguous_auth()) { dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl; p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask)); return; } if (p->is_auth() && p->is_frozen()) { dout(7) << "try_eval frozen, waiting on " << *p << dendl; p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask)); return; } if (mask & CEPH_LOCK_DN) { assert(mask == CEPH_LOCK_DN); bool need_issue = false; // ignore this, no caps on dentries CDentry *dn = static_cast(p); eval_any(&dn->lock, &need_issue); } else { CInode *in = static_cast(p); eval(in, mask); } } void Locker::try_eval(SimpleLock *lock, bool *pneed_issue) { MDSCacheObject *p = lock->get_parent(); // unstable and ambiguous auth? if (p->is_ambiguous_auth()) { dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl; p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type())); return; } if (!p->is_auth()) { dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl; return; } if (p->is_frozen()) { dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl; p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type())); return; } /* * We could have a situation like: * * - mds A authpins item on mds B * - mds B starts to freeze tree containing item * - mds A tries wrlock_start on A, sends REQSCATTER to B * - mds B lock is unstable, sets scatter_wanted * - mds B lock stabilizes, calls try_eval. * * We can defer while freezing without causing a deadlock. Honor * scatter_wanted flag here. This will never get deferred by the * checks above due to the auth_pin held by the master. */ if (lock->is_scatterlock()) { ScatterLock *slock = static_cast(lock); if (slock->get_scatter_wanted() && slock->get_state() != LOCK_MIX) { scatter_mix(slock, pneed_issue); if (!lock->is_stable()) return; } else if (slock->get_unscatter_wanted() && slock->get_state() != LOCK_LOCK) { simple_lock(slock, pneed_issue); if (!lock->is_stable()) { return; } } } if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) { dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl; p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type())); return; } eval(lock, pneed_issue); } void Locker::eval_cap_gather(CInode *in, set *issue_set) { bool need_issue = false; list finishers; // kick locks now if (!in->filelock.is_stable()) eval_gather(&in->filelock, false, &need_issue, &finishers); if (!in->authlock.is_stable()) eval_gather(&in->authlock, false, &need_issue, &finishers); if (!in->linklock.is_stable()) eval_gather(&in->linklock, false, &need_issue, &finishers); if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock, false, &need_issue, &finishers); if (need_issue && in->is_head()) { if (issue_set) issue_set->insert(in); else issue_caps(in); } finish_contexts(g_ceph_context, finishers); } void Locker::eval_scatter_gathers(CInode *in) { bool need_issue = false; list finishers; dout(10) << "eval_scatter_gathers " << *in << dendl; // kick locks now if (!in->filelock.is_stable()) eval_gather(&in->filelock, false, &need_issue, &finishers); if (!in->nestlock.is_stable()) eval_gather(&in->nestlock, false, &need_issue, &finishers); if (!in->dirfragtreelock.is_stable()) eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers); if (need_issue && in->is_head()) issue_caps(in); finish_contexts(g_ceph_context, finishers); } void Locker::eval(SimpleLock *lock, bool *need_issue) { switch (lock->get_type()) { case CEPH_LOCK_IFILE: return file_eval(static_cast(lock), need_issue); case CEPH_LOCK_IDFT: case CEPH_LOCK_INEST: return scatter_eval(static_cast(lock), need_issue); default: return simple_eval(lock, need_issue); } } // ------------------ // rdlock bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon) { // kick the lock if (lock->is_stable()) { if (lock->get_parent()->is_auth()) { if (lock->get_sm() == &sm_scatterlock) { // not until tempsync is fully implemented //if (lock->get_parent()->is_replicated()) //scatter_tempsync((ScatterLock*)lock); //else simple_sync(lock); } else if (lock->get_sm() == &sm_filelock) { CInode *in = static_cast(lock->get_parent()); if (lock->get_state() == LOCK_EXCL && in->get_target_loner() >= 0 && !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN file_xsyn(lock); else simple_sync(lock); } else simple_sync(lock); return true; } else { // request rdlock state change from auth mds_rank_t auth = lock->get_parent()->authority().first; if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { dout(10) << "requesting rdlock from auth on " << *lock << " on " << *lock->get_parent() << dendl; mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth); } return false; } } if (lock->get_type() == CEPH_LOCK_IFILE) { CInode *in = static_cast(lock->get_parent()); if (in->state_test(CInode::STATE_RECOVERING)) { mds->mdcache->recovery_queue.prioritize(in); } } return false; } bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con) { dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl; // can read? grab ref. if (lock->can_rdlock(client)) return true; _rdlock_kick(lock, false); if (lock->can_rdlock(client)) return true; // wait! if (con) { dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl; lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con); } return false; } bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon) { dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl; // client may be allowed to rdlock the same item it has xlocked. // UNLESS someone passes in as_anon, or we're reading snapped version here. if (mut->snapid != CEPH_NOSNAP) as_anon = true; client_t client = as_anon ? -1 : mut->get_client(); CInode *in = 0; if (lock->get_type() != CEPH_LOCK_DN) in = static_cast(lock->get_parent()); /* if (!lock->get_parent()->is_auth() && lock->fw_rdlock_to_auth()) { mdcache->request_forward(mut, lock->get_parent()->authority().first); return false; } */ while (1) { // can read? grab ref. if (lock->can_rdlock(client)) { lock->get_rdlock(); mut->rdlocks.insert(lock); mut->locks.insert(lock); return true; } // hmm, wait a second. if (in && !in->is_head() && in->is_auth() && lock->get_state() == LOCK_SNAP_SYNC) { // okay, we actually need to kick the head's lock to get ourselves synced up. CInode *head = mdcache->get_inode(in->ino()); assert(head); SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE); if (hlock->get_state() == LOCK_SYNC) hlock = head->get_lock(lock->get_type()); if (hlock->get_state() != LOCK_SYNC) { dout(10) << "rdlock_start trying head inode " << *head << dendl; if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL ** return false; // oh, check our lock again then } } if (!_rdlock_kick(lock, as_anon)) break; } // wait! int wait_on; if (lock->get_parent()->is_auth() && lock->is_stable()) wait_on = SimpleLock::WAIT_RD; else wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry. dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl; lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut)); nudge_log(lock); return false; } void Locker::nudge_log(SimpleLock *lock) { dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl; if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush mds->mdlog->flush(); } void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue) { // drop ref lock->put_rdlock(); if (mut) { mut->rdlocks.erase(lock); mut->locks.erase(lock); } dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl; // last one? if (!lock->is_rdlocked()) { if (!lock->is_stable()) eval_gather(lock, false, pneed_issue); else if (lock->get_parent()->is_auth()) try_eval(lock, pneed_issue); } } bool Locker::can_rdlock_set(set& locks) { dout(10) << "can_rdlock_set " << locks << dendl; for (set::iterator p = locks.begin(); p != locks.end(); ++p) if (!(*p)->can_rdlock(-1)) { dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl; return false; } return true; } bool Locker::rdlock_try_set(set& locks) { dout(10) << "rdlock_try_set " << locks << dendl; for (set::iterator p = locks.begin(); p != locks.end(); ++p) if (!rdlock_try(*p, -1, NULL)) { dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl; return false; } return true; } void Locker::rdlock_take_set(set& locks, MutationRef& mut) { dout(10) << "rdlock_take_set " << locks << dendl; for (set::iterator p = locks.begin(); p != locks.end(); ++p) { (*p)->get_rdlock(); mut->rdlocks.insert(*p); mut->locks.insert(*p); } } // ------------------ // wrlock void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut) { if (lock->get_type() == CEPH_LOCK_IVERSION || lock->get_type() == CEPH_LOCK_DVERSION) return local_wrlock_grab(static_cast(lock), mut); dout(7) << "wrlock_force on " << *lock << " on " << *lock->get_parent() << dendl; lock->get_wrlock(true); mut->wrlocks.insert(lock); mut->locks.insert(lock); } bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait) { if (lock->get_type() == CEPH_LOCK_IVERSION || lock->get_type() == CEPH_LOCK_DVERSION) return local_wrlock_start(static_cast(lock), mut); dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl; CInode *in = static_cast(lock->get_parent()); client_t client = mut->get_client(); bool want_scatter = !nowait && lock->get_parent()->is_auth() && (in->has_subtree_or_exporting_dirfrag() || static_cast(lock)->get_scatter_wanted()); while (1) { // wrlock? if (lock->can_wrlock(client) && (!want_scatter || lock->get_state() == LOCK_MIX)) { lock->get_wrlock(); mut->wrlocks.insert(lock); mut->locks.insert(lock); return true; } if (lock->get_type() == CEPH_LOCK_IFILE && in->state_test(CInode::STATE_RECOVERING)) { mds->mdcache->recovery_queue.prioritize(in); } if (!lock->is_stable()) break; if (in->is_auth()) { // don't do nested lock state change if we have dirty scatterdata and // may scatter_writebehind or start_scatter, because nowait==true implies // that the caller already has a log entry open! if (nowait && lock->is_dirty()) return false; if (want_scatter) scatter_mix(static_cast(lock)); else simple_lock(lock); if (nowait && !lock->can_wrlock(client)) return false; } else { // replica. // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case). mds_rank_t auth = lock->get_parent()->authority().first; if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { dout(10) << "requesting scatter from auth on " << *lock << " on " << *lock->get_parent() << dendl; mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth); } break; } } if (!nowait) { dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl; lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); nudge_log(lock); } return false; } void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue) { if (lock->get_type() == CEPH_LOCK_IVERSION || lock->get_type() == CEPH_LOCK_DVERSION) return local_wrlock_finish(static_cast(lock), mut); dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl; lock->put_wrlock(); if (mut) { mut->wrlocks.erase(lock); if (mut->remote_wrlocks.count(lock) == 0) mut->locks.erase(lock); } if (!lock->is_wrlocked()) { if (!lock->is_stable()) eval_gather(lock, false, pneed_issue); else if (lock->get_parent()->is_auth()) try_eval(lock, pneed_issue); } } // remote wrlock void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut) { dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl; // wait for active target if (mds->is_cluster_degraded() && !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) { dout(7) << " mds." << target << " is not active" << dendl; if (mut->more()->waiting_on_slave.empty()) mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut)); return; } // send lock request mut->start_locking(lock, target); mut->more()->slaves.insert(target); MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_WRLOCK); r->set_lock_type(lock->get_type()); lock->get_parent()->set_object_info(r->get_object_info()); mds->send_message_mds(r, target); assert(mut->more()->waiting_on_slave.count(target) == 0); mut->more()->waiting_on_slave.insert(target); } void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target, MutationImpl *mut) { // drop ref mut->remote_wrlocks.erase(lock); if (mut->wrlocks.count(lock) == 0) mut->locks.erase(lock); dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target << " " << *lock->get_parent() << dendl; if (!mds->is_cluster_degraded() || mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) { MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNWRLOCK); slavereq->set_lock_type(lock->get_type()); lock->get_parent()->set_object_info(slavereq->get_object_info()); mds->send_message_mds(slavereq, target); } } // ------------------ // xlock bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut) { if (lock->get_type() == CEPH_LOCK_IVERSION || lock->get_type() == CEPH_LOCK_DVERSION) return local_xlock_start(static_cast(lock), mut); dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl; client_t client = mut->get_client(); // auth? if (lock->get_parent()->is_auth()) { // auth while (1) { if (lock->can_xlock(client)) { lock->set_state(LOCK_XLOCK); lock->get_xlock(mut, client); mut->xlocks.insert(lock); mut->locks.insert(lock); mut->finish_locking(lock); return true; } if (lock->get_type() == CEPH_LOCK_IFILE) { CInode *in = static_cast(lock->get_parent()); if (in->state_test(CInode::STATE_RECOVERING)) { mds->mdcache->recovery_queue.prioritize(in); } } if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE || lock->get_xlock_by_client() != client || lock->is_waiter_for(SimpleLock::WAIT_STABLE))) break; if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) { mut->start_locking(lock); simple_xlock(lock); } else { simple_lock(lock); } } lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); nudge_log(lock); return false; } else { // replica assert(lock->get_sm()->can_remote_xlock); assert(!mut->slave_request); // wait for single auth if (lock->get_parent()->is_ambiguous_auth()) { lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mut)); return false; } // wait for active auth mds_rank_t auth = lock->get_parent()->authority().first; if (mds->is_cluster_degraded() && !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { dout(7) << " mds." << auth << " is not active" << dendl; if (mut->more()->waiting_on_slave.empty()) mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut)); return false; } // send lock request mut->more()->slaves.insert(auth); mut->start_locking(lock, auth); MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_XLOCK); r->set_lock_type(lock->get_type()); lock->get_parent()->set_object_info(r->get_object_info()); mds->send_message_mds(r, auth); assert(mut->more()->waiting_on_slave.count(auth) == 0); mut->more()->waiting_on_slave.insert(auth); return false; } } void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue) { assert(!lock->is_stable()); if (lock->get_num_rdlocks() == 0 && lock->get_num_wrlocks() == 0 && lock->get_num_client_lease() == 0 && lock->get_state() != LOCK_XLOCKSNAP && lock->get_type() != CEPH_LOCK_DN) { CInode *in = static_cast(lock->get_parent()); client_t loner = in->get_target_loner(); if (loner >= 0 && (xlocker < 0 || xlocker == loner)) { lock->set_state(LOCK_EXCL); lock->get_parent()->auth_unpin(lock); lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD); if (lock->get_cap_shift()) *pneed_issue = true; if (lock->get_parent()->is_auth() && lock->is_stable()) try_eval(lock, pneed_issue); return; } } // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue); } void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue) { if (lock->get_type() == CEPH_LOCK_IVERSION || lock->get_type() == CEPH_LOCK_DVERSION) return local_xlock_finish(static_cast(lock), mut); dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl; client_t xlocker = lock->get_xlock_by_client(); // drop ref lock->put_xlock(); assert(mut); mut->xlocks.erase(lock); mut->locks.erase(lock); bool do_issue = false; // remote xlock? if (!lock->get_parent()->is_auth()) { assert(lock->get_sm()->can_remote_xlock); // tell auth dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl; mds_rank_t auth = lock->get_parent()->authority().first; if (!mds->is_cluster_degraded() || mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) { MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNXLOCK); slavereq->set_lock_type(lock->get_type()); lock->get_parent()->set_object_info(slavereq->get_object_info()); mds->send_message_mds(slavereq, auth); } // others waiting? lock->finish_waiters(SimpleLock::WAIT_STABLE | SimpleLock::WAIT_WR | SimpleLock::WAIT_RD, 0); } else { if (lock->get_num_xlocks() == 0) { if (lock->get_state() == LOCK_LOCK_XLOCK) lock->set_state(LOCK_XLOCKDONE); _finish_xlock(lock, xlocker, &do_issue); } } if (do_issue) { CInode *in = static_cast(lock->get_parent()); if (in->is_head()) { if (pneed_issue) *pneed_issue = true; else issue_caps(in); } } } void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut) { dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl; lock->put_xlock(); mut->xlocks.erase(lock); mut->locks.erase(lock); MDSCacheObject *p = lock->get_parent(); assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode) if (!lock->is_stable()) lock->get_parent()->auth_unpin(lock); lock->set_state(LOCK_LOCK); } void Locker::xlock_import(SimpleLock *lock) { dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl; lock->get_parent()->auth_pin(lock); } // file i/o ----------------------------------------- version_t Locker::issue_file_data_version(CInode *in) { dout(7) << "issue_file_data_version on " << *in << dendl; return in->inode.file_data_version; } class C_Locker_FileUpdate_finish : public LockerLogContext { CInode *in; MutationRef mut; bool share_max; bool need_issue; client_t client; MClientCaps *ack; public: C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, bool sm=false, bool ni=false, client_t c=-1, MClientCaps *ac = 0) : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni), client(c), ack(ac) { in->get(CInode::PIN_PTRWAITER); } void finish(int r) override { locker->file_update_finish(in, mut, share_max, need_issue, client, ack); in->put(CInode::PIN_PTRWAITER); } }; void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap, client_t client, MClientCaps *ack) { dout(10) << "file_update_finish on " << *in << dendl; in->pop_and_dirty_projected_inode(mut->ls); mut->apply(); if (ack) { Session *session = mds->get_session(client); if (session) { // "oldest flush tid" > 0 means client uses unique TID for each flush if (ack->get_oldest_flush_tid() > 0) session->add_completed_flush(ack->get_client_tid()); mds->send_message_client_counted(ack, session); } else { dout(10) << " no session for client." << client << " " << *ack << dendl; ack->put(); } } set need_issue; drop_locks(mut.get(), &need_issue); if (!in->is_head() && !in->client_snap_caps.empty()) { dout(10) << " client_snap_caps " << in->client_snap_caps << dendl; // check for snap writeback completion bool gather = false; compact_map >::iterator p = in->client_snap_caps.begin(); while (p != in->client_snap_caps.end()) { SimpleLock *lock = in->get_lock(p->first); assert(lock); dout(10) << " completing client_snap_caps for " << ccap_string(p->first) << " lock " << *lock << " on " << *in << dendl; lock->put_wrlock(); p->second.erase(client); if (p->second.empty()) { gather = true; in->client_snap_caps.erase(p++); } else ++p; } if (gather) { if (in->client_snap_caps.empty()) in->item_open_file.remove_myself(); eval_cap_gather(in, &need_issue); } } else { if (issue_client_cap && need_issue.count(in) == 0) { Capability *cap = in->get_client_cap(client); if (cap && (cap->wanted() & ~cap->pending())) issue_caps(in, cap); } if (share_max && in->is_auth() && (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER))) share_inode_max_size(in); } issue_caps_set(need_issue); // auth unpin after issuing caps mut->cleanup(); } Capability* Locker::issue_new_caps(CInode *in, int mode, Session *session, SnapRealm *realm, bool is_replay) { dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl; bool is_new; // if replay, try to reconnect cap, and otherwise do nothing. if (is_replay) { mds->mdcache->try_reconnect_cap(in, session); return 0; } // my needs assert(session->info.inst.name.is_client()); client_t my_client = session->info.inst.name.num(); int my_want = ceph_caps_for_mode(mode); // register a capability Capability *cap = in->get_client_cap(my_client); if (!cap) { // new cap cap = in->add_client_cap(my_client, session, realm); cap->set_wanted(my_want); cap->mark_new(); cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply) is_new = true; } else { is_new = false; // make sure it wants sufficient caps if (my_want & ~cap->wanted()) { // augment wanted caps for this client cap->set_wanted(cap->wanted() | my_want); } } if (in->is_auth()) { // [auth] twiddle mode? eval(in, CEPH_CAP_LOCKS); if (_need_flush_mdlog(in, my_want)) mds->mdlog->flush(); } else { // [replica] tell auth about any new caps wanted request_inode_file_caps(in); } // issue caps (pot. incl new one) //issue_caps(in); // note: _eval above may have done this already... // re-issue whatever we can //cap->issue(cap->pending()); if (is_new) cap->dec_suppress(); return cap; } void Locker::issue_caps_set(set& inset) { for (set::iterator p = inset.begin(); p != inset.end(); ++p) issue_caps(*p); } bool Locker::issue_caps(CInode *in, Capability *only_cap) { // allowed caps are determined by the lock mode. int all_allowed = in->get_caps_allowed_by_type(CAP_ANY); int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER); int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER); client_t loner = in->get_loner(); if (loner >= 0) { dout(7) << "issue_caps loner client." << loner << " allowed=" << ccap_string(loner_allowed) << ", xlocker allowed=" << ccap_string(xlocker_allowed) << ", others allowed=" << ccap_string(all_allowed) << " on " << *in << dendl; } else { dout(7) << "issue_caps allowed=" << ccap_string(all_allowed) << ", xlocker allowed=" << ccap_string(xlocker_allowed) << " on " << *in << dendl; } assert(in->is_head()); // count conflicts with int nissued = 0; // client caps map::iterator it; if (only_cap) it = in->client_caps.find(only_cap->get_client()); else it = in->client_caps.begin(); for (; it != in->client_caps.end(); ++it) { Capability *cap = it->second; if (cap->is_stale()) continue; // do not issue _new_ bits when size|mtime is projected int allowed; if (loner == it->first) allowed = loner_allowed; else allowed = all_allowed; // add in any xlocker-only caps (for locks this client is the xlocker for) allowed |= xlocker_allowed & in->get_xlocker_mask(it->first); Session *session = mds->get_session(it->first); if (in->inode.inline_data.version != CEPH_INLINE_NONE && !(session && session->connection && session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA))) allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR); int pending = cap->pending(); int wanted = cap->wanted(); dout(20) << " client." << it->first << " pending " << ccap_string(pending) << " allowed " << ccap_string(allowed) << " wanted " << ccap_string(wanted) << dendl; if (!(pending & ~allowed)) { // skip if suppress or new, and not revocation if (cap->is_new() || cap->is_suppress()) { dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl; continue; } } // notify clients about deleted inode, to make sure they release caps ASAP. if (in->inode.nlink == 0) wanted |= CEPH_CAP_LINK_SHARED; // are there caps that the client _wants_ and can have, but aren't pending? // or do we need to revoke? if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps (pending & ~allowed)) { // need to revoke ~allowed caps. // issue nissued++; // include caps that clients generally like, while we're at it. int likes = in->get_caps_liked(); int before = pending; long seq; if (pending & ~allowed) seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new. else seq = cap->issue((wanted|likes) & allowed); int after = cap->pending(); if (cap->is_new()) { // haven't send caps to client yet if (before & ~after) cap->confirm_receipt(seq, after); } else { dout(7) << " sending MClientCaps to client." << it->first << " seq " << cap->get_last_seq() << " new pending " << ccap_string(after) << " was " << ccap_string(before) << dendl; int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT; if (op == CEPH_CAP_OP_REVOKE) { revoking_caps.push_back(&cap->item_revoking_caps); revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps); cap->set_last_revoke_stamp(ceph_clock_now()); cap->reset_num_revoke_warnings(); } MClientCaps *m = new MClientCaps(op, in->ino(), in->find_snaprealm()->inode->ino(), cap->get_cap_id(), cap->get_last_seq(), after, wanted, 0, cap->get_mseq(), mds->get_osd_epoch_barrier()); in->encode_cap_message(m, cap); mds->send_message_client_counted(m, it->first); } } if (only_cap) break; } return (nissued == 0); // true if no re-issued, no callbacks } void Locker::issue_truncate(CInode *in) { dout(7) << "issue_truncate on " << *in << dendl; for (map::iterator it = in->client_caps.begin(); it != in->client_caps.end(); ++it) { Capability *cap = it->second; MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC, in->ino(), in->find_snaprealm()->inode->ino(), cap->get_cap_id(), cap->get_last_seq(), cap->pending(), cap->wanted(), 0, cap->get_mseq(), mds->get_osd_epoch_barrier()); in->encode_cap_message(m, cap); mds->send_message_client_counted(m, it->first); } // should we increase max_size? if (in->is_auth() && in->is_file()) check_inode_max_size(in); } void Locker::revoke_stale_caps(Capability *cap) { CInode *in = cap->get_inode(); if (in->state_test(CInode::STATE_EXPORTINGCAPS)) { // if export succeeds, the cap will be removed. if export fails, we need to // revoke the cap if it's still stale. in->state_set(CInode::STATE_EVALSTALECAPS); return; } int issued = cap->issued(); if (issued & ~CEPH_CAP_PIN) { dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl; cap->revoke(); if (in->is_auth() && in->inode.client_ranges.count(cap->get_client())) in->state_set(CInode::STATE_NEEDSRECOVER); if (!in->filelock.is_stable()) eval_gather(&in->filelock); if (!in->linklock.is_stable()) eval_gather(&in->linklock); if (!in->authlock.is_stable()) eval_gather(&in->authlock); if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock); if (in->is_auth()) { try_eval(in, CEPH_CAP_LOCKS); } else { request_inode_file_caps(in); } } } void Locker::revoke_stale_caps(Session *session) { dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl; for (xlist::iterator p = session->caps.begin(); !p.end(); ++p) { Capability *cap = *p; cap->mark_stale(); revoke_stale_caps(cap); } } void Locker::resume_stale_caps(Session *session) { dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl; for (xlist::iterator p = session->caps.begin(); !p.end(); ++p) { Capability *cap = *p; CInode *in = cap->get_inode(); assert(in->is_head()); if (cap->is_stale()) { dout(10) << " clearing stale flag on " << *in << dendl; cap->clear_stale(); if (in->state_test(CInode::STATE_EXPORTINGCAPS)) { // if export succeeds, the cap will be removed. if export fails, // we need to re-issue the cap if it's not stale. in->state_set(CInode::STATE_EVALSTALECAPS); continue; } if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS)) issue_caps(in, cap); } } } void Locker::remove_stale_leases(Session *session) { dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl; xlist::iterator p = session->leases.begin(); while (!p.end()) { ClientLease *l = *p; ++p; CDentry *parent = static_cast(l->parent); dout(15) << " removing lease on " << *parent << dendl; parent->remove_client_lease(l, this); } } class C_MDL_RequestInodeFileCaps : public LockerContext { CInode *in; public: C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) { in->get(CInode::PIN_PTRWAITER); } void finish(int r) override { if (!in->is_auth()) locker->request_inode_file_caps(in); in->put(CInode::PIN_PTRWAITER); } }; void Locker::request_inode_file_caps(CInode *in) { assert(!in->is_auth()); int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN; if (wanted != in->replica_caps_wanted) { // wait for single auth if (in->is_ambiguous_auth()) { in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDL_RequestInodeFileCaps(this, in)); return; } mds_rank_t auth = in->authority().first; if (mds->is_cluster_degraded() && mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) { mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in)); return; } dout(7) << "request_inode_file_caps " << ccap_string(wanted) << " was " << ccap_string(in->replica_caps_wanted) << " on " << *in << " to mds." << auth << dendl; in->replica_caps_wanted = wanted; if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted), auth); } } /* This function DOES put the passed message before returning */ void Locker::handle_inode_file_caps(MInodeFileCaps *m) { // nobody should be talking to us during recovery. assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping()); // ok CInode *in = mdcache->get_inode(m->get_ino()); mds_rank_t from = mds_rank_t(m->get_source().num()); assert(in); assert(in->is_auth()); dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl; if (m->get_caps()) in->mds_caps_wanted[from] = m->get_caps(); else in->mds_caps_wanted.erase(from); try_eval(in, CEPH_CAP_LOCKS); m->put(); } class C_MDL_CheckMaxSize : public LockerContext { CInode *in; uint64_t new_max_size; uint64_t newsize; utime_t mtime; public: C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size, uint64_t _newsize, utime_t _mtime) : LockerContext(l), in(i), new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime) { in->get(CInode::PIN_PTRWAITER); } void finish(int r) override { if (in->is_auth()) locker->check_inode_max_size(in, false, new_max_size, newsize, mtime); in->put(CInode::PIN_PTRWAITER); } }; uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size) { uint64_t new_max = (size + 1) << 1; uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs; if (max_inc > 0) { max_inc *= pi->get_layout_size_increment(); new_max = MIN(new_max, size + max_inc); } return ROUND_UP_TO(new_max, pi->get_layout_size_increment()); } void Locker::calc_new_client_ranges(CInode *in, uint64_t size, map *new_ranges, bool *max_increased) { inode_t *latest = in->get_projected_inode(); uint64_t ms; if(latest->has_layout()) { ms = calc_new_max_size(latest, size); } else { // Layout-less directories like ~mds0/, have zero size ms = 0; } // increase ranges as appropriate. // shrink to 0 if no WR|BUFFER caps issued. for (map::iterator p = in->client_caps.begin(); p != in->client_caps.end(); ++p) { if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) { client_writeable_range_t& nr = (*new_ranges)[p->first]; nr.range.first = 0; if (latest->client_ranges.count(p->first)) { client_writeable_range_t& oldr = latest->client_ranges[p->first]; if (ms > oldr.range.last) *max_increased = true; nr.range.last = MAX(ms, oldr.range.last); nr.follows = oldr.follows; } else { *max_increased = true; nr.range.last = ms; nr.follows = in->first - 1; } } } } bool Locker::check_inode_max_size(CInode *in, bool force_wrlock, uint64_t new_max_size, uint64_t new_size, utime_t new_mtime) { assert(in->is_auth()); assert(in->is_file()); inode_t *latest = in->get_projected_inode(); map new_ranges; uint64_t size = latest->size; bool update_size = new_size > 0; bool update_max = false; bool max_increased = false; if (update_size) { new_size = size = MAX(size, new_size); new_mtime = MAX(new_mtime, latest->mtime); if (latest->size == new_size && latest->mtime == new_mtime) update_size = false; } calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased); if (max_increased || latest->client_ranges != new_ranges) update_max = true; if (!update_size && !update_max) { dout(20) << "check_inode_max_size no-op on " << *in << dendl; return false; } dout(10) << "check_inode_max_size new_ranges " << new_ranges << " update_size " << update_size << " on " << *in << dendl; if (in->is_frozen()) { dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl; C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime); in->add_waiter(CInode::WAIT_UNFREEZE, cms); return false; } if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) { // lock? if (in->filelock.is_stable()) { if (in->get_target_loner() >= 0) file_excl(&in->filelock); else simple_lock(&in->filelock); } if (!in->filelock.can_wrlock(in->get_loner())) { // try again later C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime); in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms); dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl; return false; } } MutationRef mut(new MutationImpl()); mut->ls = mds->mdlog->get_current_segment(); inode_t *pi = in->project_inode(); pi->version = in->pre_dirty(); if (update_max) { dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl; pi->client_ranges = new_ranges; } if (update_size) { dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl; pi->size = new_size; pi->rstat.rbytes = new_size; dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl; pi->mtime = new_mtime; } // use EOpen if the file is still open; otherwise, use EUpdate. // this is just an optimization to push open files forward into // newer log segments. LogEvent *le; EMetaBlob *metablob; if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) { EOpen *eo = new EOpen(mds->mdlog); eo->add_ino(in->ino()); metablob = &eo->metablob; le = eo; mut->ls->open_files.push_back(&in->item_open_file); } else { EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size"); metablob = &eu->metablob; le = eu; } mds->mdlog->start_entry(le); if (update_size) { // FIXME if/when we do max_size nested accounting mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY); // no cow, here! CDentry *parent = in->get_projected_parent_dn(); metablob->add_primary_dentry(parent, in, true); } else { metablob->add_dir_context(in->get_projected_parent_dn()->get_dir()); mdcache->journal_dirty_inode(mut.get(), metablob, in); } mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, true)); wrlock_force(&in->filelock, mut); // wrlock for duration of journal mut->auth_pin(in); // make max_size _increase_ timely if (max_increased) mds->mdlog->flush(); return true; } void Locker::share_inode_max_size(CInode *in, Capability *only_cap) { /* * only share if currently issued a WR cap. if client doesn't have it, * file_max doesn't matter, and the client will get it if/when they get * the cap later. */ dout(10) << "share_inode_max_size on " << *in << dendl; map::iterator it; if (only_cap) it = in->client_caps.find(only_cap->get_client()); else it = in->client_caps.begin(); for (; it != in->client_caps.end(); ++it) { const client_t client = it->first; Capability *cap = it->second; if (cap->is_suppress()) continue; if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) { dout(10) << "share_inode_max_size with client." << client << dendl; cap->inc_last_seq(); MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT, in->ino(), in->find_snaprealm()->inode->ino(), cap->get_cap_id(), cap->get_last_seq(), cap->pending(), cap->wanted(), 0, cap->get_mseq(), mds->get_osd_epoch_barrier()); in->encode_cap_message(m, cap); mds->send_message_client_counted(m, client); } if (only_cap) break; } } bool Locker::_need_flush_mdlog(CInode *in, int wanted) { /* flush log if caps are wanted by client but corresponding lock is unstable and locked by * pending mutations. */ if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) && in->filelock.is_unstable_and_locked()) || ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) && in->authlock.is_unstable_and_locked()) || ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) && in->linklock.is_unstable_and_locked()) || ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) && in->xattrlock.is_unstable_and_locked())) return true; return false; } void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq) { if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) { dout(10) << " wanted " << ccap_string(cap->wanted()) << " -> " << ccap_string(wanted) << dendl; cap->set_wanted(wanted); } else if (wanted & ~cap->wanted()) { dout(10) << " wanted " << ccap_string(cap->wanted()) << " -> " << ccap_string(wanted) << " (added caps even though we had seq mismatch!)" << dendl; cap->set_wanted(wanted | cap->wanted()); } else { dout(10) << " NOT changing wanted " << ccap_string(cap->wanted()) << " -> " << ccap_string(wanted) << " (issue_seq " << issue_seq << " != last_issue " << cap->get_last_issue() << ")" << dendl; return; } CInode *cur = cap->get_inode(); if (!cur->is_auth()) { request_inode_file_caps(cur); return; } if (cap->wanted() == 0) { if (cur->item_open_file.is_on_list() && !cur->is_any_caps_wanted()) { dout(10) << " removing unwanted file from open file list " << *cur << dendl; cur->item_open_file.remove_myself(); } } else { if (cur->state_test(CInode::STATE_RECOVERING) && (cap->wanted() & (CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR))) { mds->mdcache->recovery_queue.prioritize(cur); } if (!cur->item_open_file.is_on_list()) { dout(10) << " adding to open file list " << *cur << dendl; assert(cur->last == CEPH_NOSNAP); LogSegment *ls = mds->mdlog->get_current_segment(); EOpen *le = new EOpen(mds->mdlog); mds->mdlog->start_entry(le); le->add_clean_inode(cur); ls->open_files.push_back(&cur->item_open_file); mds->mdlog->submit_entry(le); } } } void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last) { dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl; for (auto p = head_in->client_need_snapflush.begin(); p != head_in->client_need_snapflush.end() && p->first < last; ) { snapid_t snapid = p->first; set& clients = p->second; ++p; // be careful, q loop below depends on this if (clients.count(client)) { dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl; CInode *sin = mdcache->get_inode(head_in->ino(), snapid); if (!sin) { // hrm, look forward until we find the inode. // (we can only look it up by the last snapid it is valid for) dout(10) << " didn't have " << head_in->ino() << " snapid " << snapid << dendl; for (compact_map >::iterator q = p; // p is already at next entry q != head_in->client_need_snapflush.end(); ++q) { dout(10) << " trying snapid " << q->first << dendl; sin = mdcache->get_inode(head_in->ino(), q->first); if (sin) { assert(sin->first <= snapid); break; } dout(10) << " didn't have " << head_in->ino() << " snapid " << q->first << dendl; } if (!sin && head_in->is_multiversion()) sin = head_in; assert(sin); } _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL); head_in->remove_need_snapflush(sin, snapid, client); } } } bool Locker::should_defer_client_cap_frozen(CInode *in) { /* * This policy needs to be AT LEAST as permissive as allowing a client request * to go forward, or else a client request can release something, the release * gets deferred, but the request gets processed and deadlocks because when the * caps can't get revoked. * * Currently, a request wait if anything locked is freezing (can't * auth_pin), which would avoid any deadlock with cap release. Thus @in * _MUST_ be in the lock/auth_pin set. * * auth_pins==0 implies no unstable lock and not auth pinnned by * client request, otherwise continue even it's freezing. */ return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen(); } /* * This function DOES put the passed message before returning */ void Locker::handle_client_caps(MClientCaps *m) { Session *session = static_cast(m->get_connection()->get_priv()); client_t client = m->get_source().num(); snapid_t follows = m->get_snap_follows(); dout(7) << "handle_client_caps " << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async") << " on " << m->get_ino() << " tid " << m->get_client_tid() << " follows " << follows << " op " << ceph_cap_op_name(m->get_op()) << dendl; if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) { if (!session) { dout(5) << " no session, dropping " << *m << dendl; m->put(); return; } if (session->is_closed() || session->is_closing() || session->is_killing()) { dout(7) << " session closed|closing|killing, dropping " << *m << dendl; m->put(); return; } if (mds->is_reconnect() && m->get_dirty() && m->get_client_tid() > 0 && !session->have_completed_flush(m->get_client_tid())) { mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty()); } mds->wait_for_replay(new C_MDS_RetryMessage(mds, m)); return; } if (m->get_client_tid() > 0 && session && session->have_completed_flush(m->get_client_tid())) { dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid() << " for client." << client << dendl; MClientCaps *ack; if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) { ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier()); } else { ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier()); } ack->set_snap_follows(follows); ack->set_client_tid(m->get_client_tid()); mds->send_message_client_counted(ack, m->get_connection()); if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) { m->put(); return; } else { // fall-thru because the message may release some caps m->clear_dirty(); m->set_op(CEPH_CAP_OP_UPDATE); } } // "oldest flush tid" > 0 means client uses unique TID for each flush if (m->get_oldest_flush_tid() > 0 && session) { if (session->trim_completed_flushes(m->get_oldest_flush_tid())) { mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name); if (session->get_num_trim_flushes_warnings() > 0 && session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes) session->reset_num_trim_flushes_warnings(); } else { if (session->get_num_completed_flushes() >= (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) { session->inc_num_trim_flushes_warnings(); stringstream ss; ss << "client." << session->get_client() << " does not advance its oldest_flush_tid (" << m->get_oldest_flush_tid() << "), " << session->get_num_completed_flushes() << " completed flushes recorded in session"; mds->clog->warn() << ss.str(); dout(20) << __func__ << " " << ss.str() << dendl; } } } CInode *head_in = mdcache->get_inode(m->get_ino()); if (!head_in) { if (mds->is_clientreplay()) { dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", will try again after replayed client requests" << dendl; mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m)); return; } dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl; m->put(); return; } if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) { // Pause RADOS operations until we see the required epoch mds->objecter->set_epoch_barrier(m->osd_epoch_barrier); } if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) { // Record the barrier so that we will retransmit it to clients mds->set_osd_epoch_barrier(m->osd_epoch_barrier); } CInode *in = head_in; if (follows > 0) { in = mdcache->pick_inode_snap(head_in, follows); if (in != head_in) dout(10) << " head inode " << *head_in << dendl; } dout(10) << " cap inode " << *in << dendl; Capability *cap = 0; cap = in->get_client_cap(client); if (!cap && in != head_in) cap = head_in->get_client_cap(client); if (!cap) { dout(7) << "handle_client_caps no cap for client." << client << " on " << *in << dendl; m->put(); return; } assert(cap); // freezing|frozen? if (should_defer_client_cap_frozen(in)) { dout(7) << "handle_client_caps freezing|frozen on " << *in << dendl; in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m)); return; } if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) { dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq() << ", dropping" << dendl; m->put(); return; } int op = m->get_op(); // flushsnap? if (op == CEPH_CAP_OP_FLUSHSNAP) { if (!in->is_auth()) { dout(7) << " not auth, ignoring flushsnap on " << *in << dendl; goto out; } SnapRealm *realm = in->find_snaprealm(); snapid_t snap = realm->get_snap_following(follows); dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl; // we can prepare the ack now, since this FLUSHEDSNAP is independent of any // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst // case we get a dup response, so whatever.) MClientCaps *ack = 0; if (m->get_dirty()) { ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier()); ack->set_snap_follows(follows); ack->set_client_tid(m->get_client_tid()); ack->set_oldest_flush_tid(m->get_oldest_flush_tid()); } if (in == head_in || (head_in->client_need_snapflush.count(snap) && head_in->client_need_snapflush[snap].count(client))) { dout(7) << " flushsnap snap " << snap << " client." << client << " on " << *in << dendl; // this cap now follows a later snap (i.e. the one initiating this flush, or later) if (in == head_in) cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq(); else if (head_in->client_need_snapflush.begin()->first < snap) _do_null_snapflush(head_in, client, snap); _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack); if (in != head_in) head_in->remove_need_snapflush(in, snap, client); } else { dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl; if (ack) mds->send_message_client_counted(ack, m->get_connection()); } goto out; } if (cap->get_cap_id() != m->get_cap_id()) { dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl; } else { // intermediate snap inodes while (in != head_in) { assert(in->last != CEPH_NOSNAP); if (in->is_auth() && m->get_dirty()) { dout(10) << " updating intermediate snapped inode " << *in << dendl; _do_cap_update(in, NULL, m->get_dirty(), follows, m); } in = mdcache->pick_inode_snap(head_in, in->last); } // head inode, and cap MClientCaps *ack = 0; int caps = m->get_caps(); if (caps & ~cap->issued()) { dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl; caps &= cap->issued(); } cap->confirm_receipt(m->get_seq(), caps); dout(10) << " follows " << follows << " retains " << ccap_string(m->get_caps()) << " dirty " << ccap_string(m->get_dirty()) << " on " << *in << dendl; // missing/skipped snapflush? // The client MAY send a snapflush if it is issued WR/EXCL caps, but // presently only does so when it has actual dirty metadata. But, we // set up the need_snapflush stuff based on the issued caps. // We can infer that the client WONT send a FLUSHSNAP once they have // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap // update/release). if (!head_in->client_need_snapflush.empty()) { if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) { _do_null_snapflush(head_in, client); } else { dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl; } } if (m->get_dirty() && in->is_auth()) { dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty()) << " seq " << m->get_seq() << " on " << *in << dendl; ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier()); ack->set_client_tid(m->get_client_tid()); ack->set_oldest_flush_tid(m->get_oldest_flush_tid()); } // filter wanted based on what we could ever give out (given auth/replica status) bool need_flush = m->flags & CLIENT_CAPS_SYNC; int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever(); if (new_wanted != cap->wanted()) { if (!need_flush && (new_wanted & ~cap->pending())) { // exapnding caps. make sure we aren't waiting for a log flush need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending()); } adjust_cap_wanted(cap, new_wanted, m->get_issue_seq()); } if (in->is_auth() && _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) { // updated eval(in, CEPH_CAP_LOCKS); if (!need_flush && (cap->wanted() & ~cap->pending())) need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending()); } else { // no update, ack now. if (ack) mds->send_message_client_counted(ack, m->get_connection()); bool did_issue = eval(in, CEPH_CAP_LOCKS); if (!did_issue && (cap->wanted() & ~cap->pending())) issue_caps(in, cap); if (cap->get_last_seq() == 0 && (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) { cap->issue_norevoke(cap->issued()); share_inode_max_size(in, cap); } } if (need_flush) mds->mdlog->flush(); } out: m->put(); } class C_Locker_RetryRequestCapRelease : public LockerContext { client_t client; ceph_mds_request_release item; public: C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) : LockerContext(l), client(c), item(it) { } void finish(int r) override { string dname; MDRequestRef null_ref; locker->process_request_cap_release(null_ref, client, item, dname); } }; void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item, const string &dname) { inodeno_t ino = (uint64_t)item.ino; uint64_t cap_id = item.cap_id; int caps = item.caps; int wanted = item.wanted; int seq = item.seq; int issue_seq = item.issue_seq; int mseq = item.mseq; CInode *in = mdcache->get_inode(ino); if (!in) return; if (dname.length()) { frag_t fg = in->pick_dirfrag(dname); CDir *dir = in->get_dirfrag(fg); if (dir) { CDentry *dn = dir->lookup(dname); if (dn) { ClientLease *l = dn->get_client_lease(client); if (l) { dout(10) << "process_cap_release removing lease on " << *dn << dendl; dn->remove_client_lease(l, this); } else { dout(7) << "process_cap_release client." << client << " doesn't have lease on " << *dn << dendl; } } else { dout(7) << "process_cap_release client." << client << " released lease on dn " << dir->dirfrag() << "/" << dname << " which dne" << dendl; } } } Capability *cap = in->get_client_cap(client); if (!cap) return; dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in << (mdr ? "" : " (DEFERRED, no mdr)") << dendl; if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) { dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl; return; } if (cap->get_cap_id() != cap_id) { dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl; return; } if (should_defer_client_cap_frozen(in)) { dout(7) << " frozen, deferring" << dendl; in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item)); return; } if (caps & ~cap->issued()) { dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl; caps &= cap->issued(); } cap->confirm_receipt(seq, caps); if (!in->client_need_snapflush.empty() && (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) { _do_null_snapflush(in, client); } adjust_cap_wanted(cap, wanted, issue_seq); if (mdr) cap->inc_suppress(); eval(in, CEPH_CAP_LOCKS); if (mdr) cap->dec_suppress(); // take note; we may need to reissue on this cap later if (mdr) mdr->cap_releases[in->vino()] = cap->get_last_seq(); } class C_Locker_RetryKickIssueCaps : public LockerContext { CInode *in; client_t client; ceph_seq_t seq; public: C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) : LockerContext(l), in(i), client(c), seq(s) { in->get(CInode::PIN_PTRWAITER); } void finish(int r) override { locker->kick_issue_caps(in, client, seq); in->put(CInode::PIN_PTRWAITER); } }; void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq) { Capability *cap = in->get_client_cap(client); if (!cap || cap->get_last_sent() != seq) return; if (in->is_frozen()) { dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl; in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryKickIssueCaps(this, in, client, seq)); return; } dout(10) << "kick_issue_caps released at current seq " << seq << ", reissuing" << dendl; issue_caps(in, cap); } void Locker::kick_cap_releases(MDRequestRef& mdr) { client_t client = mdr->get_client(); for (map::iterator p = mdr->cap_releases.begin(); p != mdr->cap_releases.end(); ++p) { CInode *in = mdcache->get_inode(p->first); if (!in) continue; kick_issue_caps(in, client, p->second); } } /** * m and ack might be NULL, so don't dereference them unless dirty != 0 */ void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack) { dout(10) << "_do_snap_update dirty " << ccap_string(dirty) << " follows " << follows << " snap " << snap << " on " << *in << dendl; if (snap == CEPH_NOSNAP) { // hmm, i guess snap was already deleted? just ack! dout(10) << " wow, the snap following " << follows << " was already deleted. nothing to record, just ack." << dendl; if (ack) mds->send_message_client_counted(ack, m->get_connection()); return; } EUpdate *le = new EUpdate(mds->mdlog, "snap flush"); mds->mdlog->start_entry(le); MutationRef mut = new MutationImpl(); mut->ls = mds->mdlog->get_current_segment(); // normal metadata updates that we can apply to the head as well. // update xattrs? bool xattrs = false; map *px = 0; if ((dirty & CEPH_CAP_XATTR_EXCL) && m->xattrbl.length() && m->head.xattr_version > in->get_projected_inode()->xattr_version) xattrs = true; old_inode_t *oi = 0; if (in->is_multiversion()) { oi = in->pick_old_inode(snap); } inode_t *pi; if (oi) { dout(10) << " writing into old inode" << dendl; pi = in->project_inode(); pi->version = in->pre_dirty(); if (snap > oi->first) in->split_old_inode(snap); pi = &oi->inode; if (xattrs) px = &oi->xattrs; } else { if (xattrs) px = new map; pi = in->project_inode(px); pi->version = in->pre_dirty(); } _update_cap_fields(in, dirty, m, pi); // xattr if (px) { dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << " len " << m->xattrbl.length() << dendl; pi->xattr_version = m->head.xattr_version; bufferlist::iterator p = m->xattrbl.begin(); ::decode(*px, p); } if (pi->client_ranges.count(client)) { if (in->last == snap) { dout(10) << " removing client_range entirely" << dendl; pi->client_ranges.erase(client); } else { dout(10) << " client_range now follows " << snap << dendl; pi->client_ranges[client].follows = snap; } } mut->auth_pin(in); mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows); mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows); // "oldest flush tid" > 0 means client uses unique TID for each flush if (ack && ack->get_oldest_flush_tid() > 0) le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()), ack->get_oldest_flush_tid()); mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false, client, ack)); } void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi) { if (dirty == 0) return; /* m must be valid if there are dirty caps */ assert(m); uint64_t features = m->get_connection()->get_features(); if (m->get_ctime() > pi->ctime) { dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime() << " for " << *in << dendl; pi->ctime = m->get_ctime(); } if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) && m->get_change_attr() > pi->change_attr) { dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr() << " for " << *in << dendl; pi->change_attr = m->get_change_attr(); } // file if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { utime_t atime = m->get_atime(); utime_t mtime = m->get_mtime(); uint64_t size = m->get_size(); version_t inline_version = m->inline_version; if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) || ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) { dout(7) << " mtime " << pi->mtime << " -> " << mtime << " for " << *in << dendl; pi->mtime = mtime; } if (in->inode.is_file() && // ONLY if regular file size > pi->size) { dout(7) << " size " << pi->size << " -> " << size << " for " << *in << dendl; pi->size = size; pi->rstat.rbytes = size; } if (in->inode.is_file() && (dirty & CEPH_CAP_FILE_WR) && inline_version > pi->inline_data.version) { pi->inline_data.version = inline_version; if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0) pi->inline_data.get_data() = m->inline_data; else pi->inline_data.free_data(); } if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) { dout(7) << " atime " << pi->atime << " -> " << atime << " for " << *in << dendl; pi->atime = atime; } if ((dirty & CEPH_CAP_FILE_EXCL) && ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) { dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq() << " for " << *in << dendl; pi->time_warp_seq = m->get_time_warp_seq(); } } // auth if (dirty & CEPH_CAP_AUTH_EXCL) { if (m->head.uid != pi->uid) { dout(7) << " uid " << pi->uid << " -> " << m->head.uid << " for " << *in << dendl; pi->uid = m->head.uid; } if (m->head.gid != pi->gid) { dout(7) << " gid " << pi->gid << " -> " << m->head.gid << " for " << *in << dendl; pi->gid = m->head.gid; } if (m->head.mode != pi->mode) { dout(7) << " mode " << oct << pi->mode << " -> " << m->head.mode << dec << " for " << *in << dendl; pi->mode = m->head.mode; } if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) { dout(7) << " btime " << oct << pi->btime << " -> " << m->get_btime() << dec << " for " << *in << dendl; pi->btime = m->get_btime(); } } } /* * update inode based on cap flush|flushsnap|wanted. * adjust max_size, if needed. * if we update, return true; otherwise, false (no updated needed). */ bool Locker::_do_cap_update(CInode *in, Capability *cap, int dirty, snapid_t follows, MClientCaps *m, MClientCaps *ack, bool *need_flush) { dout(10) << "_do_cap_update dirty " << ccap_string(dirty) << " issued " << ccap_string(cap ? cap->issued() : 0) << " wanted " << ccap_string(cap ? cap->wanted() : 0) << " on " << *in << dendl; assert(in->is_auth()); client_t client = m->get_source().num(); inode_t *latest = in->get_projected_inode(); // increase or zero max_size? uint64_t size = m->get_size(); bool change_max = false; uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0; uint64_t new_max = old_max; if (in->is_file()) { bool forced_change_max = false; dout(20) << "inode is file" << dendl; if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) { dout(20) << "client has write caps; m->get_max_size=" << m->get_max_size() << "; old_max=" << old_max << dendl; if (m->get_max_size() > new_max) { dout(10) << "client requests file_max " << m->get_max_size() << " > max " << old_max << dendl; change_max = true; forced_change_max = true; new_max = calc_new_max_size(latest, m->get_max_size()); } else { new_max = calc_new_max_size(latest, size); if (new_max > old_max) change_max = true; else new_max = old_max; } } else { if (old_max) { change_max = true; new_max = 0; } } if (in->last == CEPH_NOSNAP && change_max && !in->filelock.can_wrlock(client) && !in->filelock.can_force_wrlock(client)) { dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl; if (in->filelock.is_stable()) { bool need_issue = false; if (cap) cap->inc_suppress(); if (in->mds_caps_wanted.empty() && (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) { if (in->filelock.get_state() != LOCK_EXCL) file_excl(&in->filelock, &need_issue); } else simple_lock(&in->filelock, &need_issue); if (need_issue) issue_caps(in); if (cap) cap->dec_suppress(); } if (!in->filelock.can_wrlock(client) && !in->filelock.can_force_wrlock(client)) { C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in, forced_change_max ? new_max : 0, 0, utime_t()); in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms); change_max = false; } } } if (m->flockbl.length()) { int32_t num_locks; bufferlist::iterator bli = m->flockbl.begin(); ::decode(num_locks, bli); for ( int i=0; i < num_locks; ++i) { ceph_filelock decoded_lock; ::decode(decoded_lock, bli); in->get_fcntl_lock_state()->held_locks. insert(pair(decoded_lock.start, decoded_lock)); ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)]; } ::decode(num_locks, bli); for ( int i=0; i < num_locks; ++i) { ceph_filelock decoded_lock; ::decode(decoded_lock, bli); in->get_flock_lock_state()->held_locks. insert(pair(decoded_lock.start, decoded_lock)); ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)]; } } if (!dirty && !change_max) return false; Session *session = static_cast(m->get_connection()->get_priv()); if (session->check_access(in, MAY_WRITE, m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) { session->put(); dout(10) << "check_access failed, dropping cap update on " << *in << dendl; return false; } session->put(); // do the update. EUpdate *le = new EUpdate(mds->mdlog, "cap update"); mds->mdlog->start_entry(le); // xattrs update? map *px = 0; if ((dirty & CEPH_CAP_XATTR_EXCL) && m->xattrbl.length() && m->head.xattr_version > in->get_projected_inode()->xattr_version) px = new map; inode_t *pi = in->project_inode(px); pi->version = in->pre_dirty(); MutationRef mut(new MutationImpl()); mut->ls = mds->mdlog->get_current_segment(); _update_cap_fields(in, dirty, m, pi); if (change_max) { dout(7) << " max_size " << old_max << " -> " << new_max << " for " << *in << dendl; if (new_max) { pi->client_ranges[client].range.first = 0; pi->client_ranges[client].range.last = new_max; pi->client_ranges[client].follows = in->first - 1; } else pi->client_ranges.erase(client); } if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) wrlock_force(&in->filelock, mut); // wrlock for duration of journal // auth if (dirty & CEPH_CAP_AUTH_EXCL) wrlock_force(&in->authlock, mut); // xattr if (px) { dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl; pi->xattr_version = m->head.xattr_version; bufferlist::iterator p = m->xattrbl.begin(); ::decode(*px, p); wrlock_force(&in->xattrlock, mut); } mut->auth_pin(in); mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows); mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows); // "oldest flush tid" > 0 means client uses unique TID for each flush if (ack && ack->get_oldest_flush_tid() > 0) le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()), ack->get_oldest_flush_tid()); mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, change_max, !!cap, client, ack)); if (need_flush && !*need_flush && ((change_max && new_max) || // max INCREASE _need_flush_mdlog(in, dirty))) *need_flush = true; return true; } /* This function DOES put the passed message before returning */ void Locker::handle_client_cap_release(MClientCapRelease *m) { client_t client = m->get_source().num(); dout(10) << "handle_client_cap_release " << *m << dendl; if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) { mds->wait_for_replay(new C_MDS_RetryMessage(mds, m)); return; } if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) { // Pause RADOS operations until we see the required epoch mds->objecter->set_epoch_barrier(m->osd_epoch_barrier); } if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) { // Record the barrier so that we will retransmit it to clients mds->set_osd_epoch_barrier(m->osd_epoch_barrier); } Session *session = static_cast(m->get_connection()->get_priv()); for (vector::iterator p = m->caps.begin(); p != m->caps.end(); ++p) { _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq); } if (session) { session->notify_cap_release(m->caps.size()); } m->put(); } class C_Locker_RetryCapRelease : public LockerContext { client_t client; inodeno_t ino; uint64_t cap_id; ceph_seq_t migrate_seq; ceph_seq_t issue_seq; public: C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id, ceph_seq_t mseq, ceph_seq_t seq) : LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {} void finish(int r) override { locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq); } }; void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id, ceph_seq_t mseq, ceph_seq_t seq) { CInode *in = mdcache->get_inode(ino); if (!in) { dout(7) << "_do_cap_release missing ino " << ino << dendl; return; } Capability *cap = in->get_client_cap(client); if (!cap) { dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl; return; } dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl; if (cap->get_cap_id() != cap_id) { dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl; return; } if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) { dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl; return; } if (should_defer_client_cap_frozen(in)) { dout(7) << " freezing|frozen, deferring" << dendl; in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq)); return; } if (seq != cap->get_last_issue()) { dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl; // clean out any old revoke history cap->clean_revoke_from(seq); eval_cap_gather(in); return; } remove_client_cap(in, client); } /* This function DOES put the passed message before returning */ void Locker::remove_client_cap(CInode *in, client_t client) { // clean out any pending snapflush state if (!in->client_need_snapflush.empty()) _do_null_snapflush(in, client); in->remove_client_cap(client); if (in->is_auth()) { // make sure we clear out the client byte range if (in->get_projected_inode()->client_ranges.count(client) && !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray check_inode_max_size(in); } else { request_inode_file_caps(in); } try_eval(in, CEPH_CAP_LOCKS); } /** * Return true if any currently revoking caps exceed the * mds_revoke_cap_timeout threshold. */ bool Locker::any_late_revoking_caps(xlist const &revoking) const { xlist::const_iterator p = revoking.begin(); if (p.end()) { // No revoking caps at the moment return false; } else { utime_t now = ceph_clock_now(); utime_t age = now - (*p)->get_last_revoke_stamp(); if (age <= g_conf->mds_revoke_cap_timeout) { return false; } else { return true; } } } void Locker::get_late_revoking_clients(std::list *result) const { if (!any_late_revoking_caps(revoking_caps)) { // Fast path: no misbehaving clients, execute in O(1) return; } // Slow path: execute in O(N_clients) std::map >::const_iterator client_rc_iter; for (client_rc_iter = revoking_caps_by_client.begin(); client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) { xlist const &client_rc = client_rc_iter->second; bool any_late = any_late_revoking_caps(client_rc); if (any_late) { result->push_back(client_rc_iter->first); } } } // Hard-code instead of surfacing a config settings because this is // really a hack that should go away at some point when we have better // inspection tools for getting at detailed cap state (#7316) #define MAX_WARN_CAPS 100 void Locker::caps_tick() { utime_t now = ceph_clock_now(); dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl; int i = 0; for (xlist::iterator p = revoking_caps.begin(); !p.end(); ++p) { Capability *cap = *p; utime_t age = now - cap->get_last_revoke_stamp(); dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl; if (age <= g_conf->mds_revoke_cap_timeout) { dout(20) << __func__ << " age below timeout " << g_conf->mds_revoke_cap_timeout << dendl; break; } else { ++i; if (i > MAX_WARN_CAPS) { dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late" << "revoking, ignoring subsequent caps" << dendl; break; } } // exponential backoff of warning intervals if (age > g_conf->mds_revoke_cap_timeout * (1 << cap->get_num_revoke_warnings())) { cap->inc_num_revoke_warnings(); stringstream ss; ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino " << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending()) << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago"; mds->clog->warn() << ss.str(); dout(20) << __func__ << " " << ss.str() << dendl; } else { dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl; } } } void Locker::handle_client_lease(MClientLease *m) { dout(10) << "handle_client_lease " << *m << dendl; assert(m->get_source().is_client()); client_t client = m->get_source().num(); CInode *in = mdcache->get_inode(m->get_ino(), m->get_last()); if (!in) { dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl; m->put(); return; } CDentry *dn = 0; frag_t fg = in->pick_dirfrag(m->dname); CDir *dir = in->get_dirfrag(fg); if (dir) dn = dir->lookup(m->dname); if (!dn) { dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl; m->put(); return; } dout(10) << " on " << *dn << dendl; // replica and lock ClientLease *l = dn->get_client_lease(client); if (!l) { dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl; m->put(); return; } switch (m->get_action()) { case CEPH_MDS_LEASE_REVOKE_ACK: case CEPH_MDS_LEASE_RELEASE: if (l->seq != m->get_seq()) { dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl; } else { dout(7) << "handle_client_lease client." << client << " on " << *dn << dendl; dn->remove_client_lease(l, this); } m->put(); break; case CEPH_MDS_LEASE_RENEW: { dout(7) << "handle_client_lease client." << client << " renew on " << *dn << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl; if (dn->lock.can_lease(client)) { int pool = 1; // fixme.. do something smart! m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]); m->h.seq = ++l->seq; m->clear_payload(); utime_t now = ceph_clock_now(); now += mdcache->client_lease_durations[pool]; mdcache->touch_client_lease(l, pool, now); mds->send_message_client_counted(m, m->get_connection()); } } break; default: ceph_abort(); // implement me break; } } void Locker::issue_client_lease(CDentry *dn, client_t client, bufferlist &bl, utime_t now, Session *session) { CInode *diri = dn->get_dir()->get_inode(); if (!diri->is_stray() && // do not issue dn leases in stray dir! ((!diri->filelock.can_lease(client) && (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) && dn->lock.can_lease(client)) { int pool = 1; // fixme.. do something smart! // issue a dentry lease ClientLease *l = dn->add_client_lease(client, session); session->touch_lease(l); now += mdcache->client_lease_durations[pool]; mdcache->touch_client_lease(l, pool, now); LeaseStat e; e.mask = 1 | CEPH_LOCK_DN; // old and new bit values e.seq = ++l->seq; e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]); ::encode(e, bl); dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms " << " on " << *dn << dendl; } else { // null lease LeaseStat e; e.mask = 0; e.seq = 0; e.duration_ms = 0; ::encode(e, bl); dout(20) << "issue_client_lease no/null lease on " << *dn << dendl; } } void Locker::revoke_client_leases(SimpleLock *lock) { int n = 0; CDentry *dn = static_cast(lock->get_parent()); for (map::iterator p = dn->client_lease_map.begin(); p != dn->client_lease_map.end(); ++p) { ClientLease *l = p->second; n++; assert(lock->get_type() == CEPH_LOCK_DN); CDentry *dn = static_cast(lock->get_parent()); int mask = 1 | CEPH_LOCK_DN; // old and new bits // i should also revoke the dir ICONTENT lease, if they have it! CInode *diri = dn->get_dir()->get_inode(); mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq, mask, diri->ino(), diri->first, CEPH_NOSNAP, dn->get_name()), l->client); } assert(n == lock->get_num_client_lease()); } // locks ---------------------------------------------------------------- SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info) { switch (lock_type) { case CEPH_LOCK_DN: { // be careful; info.dirfrag may have incorrect frag; recalculate based on dname. CInode *diri = mdcache->get_inode(info.dirfrag.ino); frag_t fg; CDir *dir = 0; CDentry *dn = 0; if (diri) { fg = diri->pick_dirfrag(info.dname); dir = diri->get_dirfrag(fg); if (dir) dn = dir->lookup(info.dname, info.snapid); } if (!dn) { dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl; return 0; } return &dn->lock; } case CEPH_LOCK_IAUTH: case CEPH_LOCK_ILINK: case CEPH_LOCK_IDFT: case CEPH_LOCK_IFILE: case CEPH_LOCK_INEST: case CEPH_LOCK_IXATTR: case CEPH_LOCK_ISNAP: case CEPH_LOCK_IFLOCK: case CEPH_LOCK_IPOLICY: { CInode *in = mdcache->get_inode(info.ino, info.snapid); if (!in) { dout(7) << "get_lock don't have ino " << info.ino << dendl; return 0; } switch (lock_type) { case CEPH_LOCK_IAUTH: return &in->authlock; case CEPH_LOCK_ILINK: return &in->linklock; case CEPH_LOCK_IDFT: return &in->dirfragtreelock; case CEPH_LOCK_IFILE: return &in->filelock; case CEPH_LOCK_INEST: return &in->nestlock; case CEPH_LOCK_IXATTR: return &in->xattrlock; case CEPH_LOCK_ISNAP: return &in->snaplock; case CEPH_LOCK_IFLOCK: return &in->flocklock; case CEPH_LOCK_IPOLICY: return &in->policylock; } } default: dout(7) << "get_lock don't know lock_type " << lock_type << dendl; ceph_abort(); break; } return 0; } /* This function DOES put the passed message before returning */ void Locker::handle_lock(MLock *m) { // nobody should be talking to us during recovery. assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping()); SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info()); if (!lock) { dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl; m->put(); return; } switch (lock->get_type()) { case CEPH_LOCK_DN: case CEPH_LOCK_IAUTH: case CEPH_LOCK_ILINK: case CEPH_LOCK_ISNAP: case CEPH_LOCK_IXATTR: case CEPH_LOCK_IFLOCK: case CEPH_LOCK_IPOLICY: handle_simple_lock(lock, m); break; case CEPH_LOCK_IDFT: case CEPH_LOCK_INEST: //handle_scatter_lock((ScatterLock*)lock, m); //break; case CEPH_LOCK_IFILE: handle_file_lock(static_cast(lock), m); break; default: dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl; ceph_abort(); break; } } // ========================================================================== // simple lock /** This function may take a reference to m if it needs one, but does * not put references. */ void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m) { MDSCacheObject *parent = lock->get_parent(); if (parent->is_auth() && lock->get_state() != LOCK_SYNC && !parent->is_frozen()) { dout(7) << "handle_reqrdlock got rdlock request on " << *lock << " on " << *parent << dendl; assert(parent->is_auth()); // replica auth pinned if they're doing this! if (lock->is_stable()) { simple_sync(lock); } else { dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl; lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m->get())); } } else { dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock << " on " << *parent << dendl; // replica should retry } } /* This function DOES put the passed message before returning */ void Locker::handle_simple_lock(SimpleLock *lock, MLock *m) { int from = m->get_asker(); dout(10) << "handle_simple_lock " << *m << " on " << *lock << " " << *lock->get_parent() << dendl; if (mds->is_rejoin()) { if (lock->get_parent()->is_rejoining()) { dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent() << ", dropping " << *m << dendl; m->put(); return; } } switch (m->get_action()) { // -- replica -- case LOCK_AC_SYNC: assert(lock->get_state() == LOCK_LOCK); lock->decode_locked_state(m->get_data()); lock->set_state(LOCK_SYNC); lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); break; case LOCK_AC_LOCK: assert(lock->get_state() == LOCK_SYNC); lock->set_state(LOCK_SYNC_LOCK); if (lock->is_leased()) revoke_client_leases(lock); eval_gather(lock, true); if (lock->is_unstable_and_locked()) mds->mdlog->flush(); break; // -- auth -- case LOCK_AC_LOCKACK: assert(lock->get_state() == LOCK_SYNC_LOCK || lock->get_state() == LOCK_SYNC_EXCL); assert(lock->is_gathering(from)); lock->remove_gather(from); if (lock->is_gathering()) { dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from << ", still gathering " << lock->get_gather_set() << dendl; } else { dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from << ", last one" << dendl; eval_gather(lock); } break; case LOCK_AC_REQRDLOCK: handle_reqrdlock(lock, m); break; } m->put(); } /* unused, currently. class C_Locker_SimpleEval : public Context { Locker *locker; SimpleLock *lock; public: C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {} void finish(int r) { locker->try_simple_eval(lock); } }; void Locker::try_simple_eval(SimpleLock *lock) { // unstable and ambiguous auth? if (!lock->is_stable() && lock->get_parent()->is_ambiguous_auth()) { dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl; //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH)) lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock)); return; } if (!lock->get_parent()->is_auth()) { dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl; return; } if (!lock->get_parent()->can_auth_pin()) { dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl; //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH)) lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock)); return; } if (lock->is_stable()) simple_eval(lock); } */ void Locker::simple_eval(SimpleLock *lock, bool *need_issue) { dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl; assert(lock->get_parent()->is_auth()); assert(lock->is_stable()); if (lock->get_parent()->is_freezing_or_frozen()) { // dentry lock in unreadable state can block path traverse if ((lock->get_type() != CEPH_LOCK_DN || lock->get_state() == LOCK_SYNC || lock->get_parent()->is_frozen())) return; } if (mdcache->is_readonly()) { if (lock->get_state() != LOCK_SYNC) { dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl; simple_sync(lock, need_issue); } return; } CInode *in = 0; int wanted = 0; if (lock->get_type() != CEPH_LOCK_DN) { in = static_cast(lock->get_parent()); in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift()); } // -> excl? if (lock->get_state() != LOCK_EXCL && in && in->get_target_loner() >= 0 && (wanted & CEPH_CAP_GEXCL)) { dout(7) << "simple_eval stable, going to excl " << *lock << " on " << *lock->get_parent() << dendl; simple_excl(lock, need_issue); } // stable -> sync? else if (lock->get_state() != LOCK_SYNC && !lock->is_wrlocked() && ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) || (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) { dout(7) << "simple_eval stable, syncing " << *lock << " on " << *lock->get_parent() << dendl; simple_sync(lock, need_issue); } } // mid bool Locker::simple_sync(SimpleLock *lock, bool *need_issue) { dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl; assert(lock->get_parent()->is_auth()); assert(lock->is_stable()); CInode *in = 0; if (lock->get_cap_shift()) in = static_cast(lock->get_parent()); int old_state = lock->get_state(); if (old_state != LOCK_TSYN) { switch (lock->get_state()) { case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break; case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break; case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break; case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break; default: ceph_abort(); } int gather = 0; if (lock->is_wrlocked()) gather++; if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) { send_lock_message(lock, LOCK_AC_SYNC); lock->init_gather(); gather++; } if (in && in->is_head()) { if (in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } } bool need_recover = false; if (lock->get_type() == CEPH_LOCK_IFILE) { assert(in); if (in->state_test(CInode::STATE_NEEDSRECOVER)) { mds->mdcache->queue_file_recover(in); need_recover = true; gather++; } } if (!gather && lock->is_dirty()) { lock->get_parent()->auth_pin(lock); scatter_writebehind(static_cast(lock)); mds->mdlog->flush(); return false; } if (gather) { lock->get_parent()->auth_pin(lock); if (need_recover) mds->mdcache->do_file_recover(); return false; } } if (lock->get_parent()->is_replicated()) { // FIXME bufferlist data; lock->encode_locked_state(data); send_lock_message(lock, LOCK_AC_SYNC, data); } lock->set_state(LOCK_SYNC); lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); if (in && in->is_head()) { if (need_issue) *need_issue = true; else issue_caps(in); } return true; } void Locker::simple_excl(SimpleLock *lock, bool *need_issue) { dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl; assert(lock->get_parent()->is_auth()); assert(lock->is_stable()); CInode *in = 0; if (lock->get_cap_shift()) in = static_cast(lock->get_parent()); switch (lock->get_state()) { case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break; case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break; case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break; default: ceph_abort(); } int gather = 0; if (lock->is_rdlocked()) gather++; if (lock->is_wrlocked()) gather++; if (lock->get_parent()->is_replicated() && lock->get_state() != LOCK_LOCK_EXCL && lock->get_state() != LOCK_XSYN_EXCL) { send_lock_message(lock, LOCK_AC_LOCK); lock->init_gather(); gather++; } if (in && in->is_head()) { if (in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } } if (gather) { lock->get_parent()->auth_pin(lock); } else { lock->set_state(LOCK_EXCL); lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE); if (in) { if (need_issue) *need_issue = true; else issue_caps(in); } } } void Locker::simple_lock(SimpleLock *lock, bool *need_issue) { dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl; assert(lock->get_parent()->is_auth()); assert(lock->is_stable()); assert(lock->get_state() != LOCK_LOCK); CInode *in = 0; if (lock->get_cap_shift()) in = static_cast(lock->get_parent()); int old_state = lock->get_state(); switch (lock->get_state()) { case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break; case LOCK_XSYN: file_excl(static_cast(lock), need_issue); if (lock->get_state() != LOCK_EXCL) return; // fall-thru case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break; case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); (static_cast(lock))->clear_unscatter_wanted(); break; case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break; default: ceph_abort(); } int gather = 0; if (lock->is_leased()) { gather++; revoke_client_leases(lock); } if (lock->is_rdlocked()) gather++; if (in && in->is_head()) { if (in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } } bool need_recover = false; if (lock->get_type() == CEPH_LOCK_IFILE) { assert(in); if(in->state_test(CInode::STATE_NEEDSRECOVER)) { mds->mdcache->queue_file_recover(in); need_recover = true; gather++; } } if (lock->get_parent()->is_replicated() && lock->get_state() == LOCK_MIX_LOCK && gather) { dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl; } else { // move to second stage of gather now, so we don't send the lock action later. if (lock->get_state() == LOCK_MIX_LOCK) lock->set_state(LOCK_MIX_LOCK2); if (lock->get_parent()->is_replicated() && lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK gather++; send_lock_message(lock, LOCK_AC_LOCK); lock->init_gather(); } } if (!gather && lock->is_dirty()) { lock->get_parent()->auth_pin(lock); scatter_writebehind(static_cast(lock)); mds->mdlog->flush(); return; } if (gather) { lock->get_parent()->auth_pin(lock); if (need_recover) mds->mdcache->do_file_recover(); } else { lock->set_state(LOCK_LOCK); lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE); } } void Locker::simple_xlock(SimpleLock *lock) { dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl; assert(lock->get_parent()->is_auth()); //assert(lock->is_stable()); assert(lock->get_state() != LOCK_XLOCK); CInode *in = 0; if (lock->get_cap_shift()) in = static_cast(lock->get_parent()); if (lock->is_stable()) lock->get_parent()->auth_pin(lock); switch (lock->get_state()) { case LOCK_LOCK: case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break; default: ceph_abort(); } int gather = 0; if (lock->is_rdlocked()) gather++; if (lock->is_wrlocked()) gather++; if (in && in->is_head()) { if (in->issued_caps_need_gather(lock)) { issue_caps(in); gather++; } } if (!gather) { lock->set_state(LOCK_PREXLOCK); //assert("shouldn't be called if we are already xlockable" == 0); } } // ========================================================================== // scatter lock /* Some notes on scatterlocks. - The scatter/gather is driven by the inode lock. The scatter always brings in the latest metadata from the fragments. - When in a scattered/MIX state, fragments are only allowed to update/be written to if the accounted stat matches the inode's current version. - That means, on gather, we _only_ assimilate diffs for frag metadata that match the current version, because those are the only ones written during this scatter/gather cycle. (Others didn't permit it.) We increment the version and journal this to disk. - When possible, we also simultaneously update our local frag accounted stats to match. - On scatter, the new inode info is broadcast to frags, both local and remote. If possible (auth and !frozen), the dirfrag auth should update the accounted state (if it isn't already up to date). Note that this may occur on both the local inode auth node and inode replicas, so there are two potential paths. If it is NOT possible, they need to mark_stale to prevent any possible writes. - A scatter can be to MIX (potentially writeable) or to SYNC (read only). Both are opportunities to update the frag accounted stats, even though only the MIX case is affected by a stale dirfrag. - Because many scatter/gather cycles can potentially go by without a frag being able to update its accounted stats (due to being frozen by exports/refragments in progress), the frag may have (even very) old stat versions. That's fine. If when we do want to update it, we can update accounted_* and the version first. */ class C_Locker_ScatterWB : public LockerLogContext { ScatterLock *lock; MutationRef mut; public: C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) : LockerLogContext(l), lock(sl), mut(m) {} void finish(int r) override { locker->scatter_writebehind_finish(lock, mut); } }; void Locker::scatter_writebehind(ScatterLock *lock) { CInode *in = static_cast(lock->get_parent()); dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl; // journal MutationRef mut(new MutationImpl()); mut->ls = mds->mdlog->get_current_segment(); // forcefully take a wrlock lock->get_wrlock(true); mut->wrlocks.insert(lock); mut->locks.insert(lock); in->pre_cow_old_inode(); // avoid cow mayhem inode_t *pi = in->project_inode(); pi->version = in->pre_dirty(); in->finish_scatter_gather_update(lock->get_type()); lock->start_flush(); EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind"); mds->mdlog->start_entry(le); mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY); mdcache->journal_dirty_inode(mut.get(), &le->metablob, in); in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob); mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut)); } void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut) { CInode *in = static_cast(lock->get_parent()); dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl; in->pop_and_dirty_projected_inode(mut->ls); lock->finish_flush(); // if replicas may have flushed in a mix->lock state, send another // message so they can finish_flush(). if (in->is_replicated()) { switch (lock->get_state()) { case LOCK_MIX_LOCK: case LOCK_MIX_LOCK2: case LOCK_MIX_EXCL: case LOCK_MIX_TSYN: send_lock_message(lock, LOCK_AC_LOCKFLUSHED); } } mut->apply(); drop_locks(mut.get()); mut->cleanup(); if (lock->is_stable()) lock->finish_waiters(ScatterLock::WAIT_STABLE); //scatter_eval_gather(lock); } void Locker::scatter_eval(ScatterLock *lock, bool *need_issue) { dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl; assert(lock->get_parent()->is_auth()); assert(lock->is_stable()); if (lock->get_parent()->is_freezing_or_frozen()) { dout(20) << " freezing|frozen" << dendl; return; } if (mdcache->is_readonly()) { if (lock->get_state() != LOCK_SYNC) { dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl; simple_sync(lock, need_issue); } return; } if (!lock->is_rdlocked() && lock->get_state() != LOCK_MIX && lock->get_scatter_wanted()) { dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock << " on " << *lock->get_parent() << dendl; scatter_mix(lock, need_issue); return; } if (lock->get_type() == CEPH_LOCK_INEST) { // in general, we want to keep INEST writable at all times. if (!lock->is_rdlocked()) { if (lock->get_parent()->is_replicated()) { if (lock->get_state() != LOCK_MIX) scatter_mix(lock, need_issue); } else { if (lock->get_state() != LOCK_LOCK) simple_lock(lock, need_issue); } } return; } CInode *in = static_cast(lock->get_parent()); if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) { // i _should_ be sync. if (!lock->is_wrlocked() && lock->get_state() != LOCK_SYNC) { dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl; simple_sync(lock, need_issue); } } } /* * mark a scatterlock to indicate that the dir fnode has some dirty data */ void Locker::mark_updated_scatterlock(ScatterLock *lock) { lock->mark_dirty(); if (lock->get_updated_item()->is_on_list()) { dout(10) << "mark_updated_scatterlock " << *lock << " - already on list since " << lock->get_update_stamp() << dendl; } else { updated_scatterlocks.push_back(lock->get_updated_item()); utime_t now = ceph_clock_now(); lock->set_update_stamp(now); dout(10) << "mark_updated_scatterlock " << *lock << " - added at " << now << dendl; } } /* * this is called by scatter_tick and LogSegment::try_to_trim() when * trying to flush dirty scattered data (i.e. updated fnode) back to * the inode. * * we need to lock|scatter in order to push fnode changes into the * inode.dirstat. */ void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange) { CInode *p = static_cast(lock->get_parent()); if (p->is_frozen() || p->is_freezing()) { dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl; if (c) p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c); else // just requeue. not ideal.. starvation prone.. updated_scatterlocks.push_back(lock->get_updated_item()); return; } if (p->is_ambiguous_auth()) { dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl; if (c) p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c); else // just requeue. not ideal.. starvation prone.. updated_scatterlocks.push_back(lock->get_updated_item()); return; } if (p->is_auth()) { int count = 0; while (true) { if (lock->is_stable()) { // can we do it now? // (only if we're not replicated.. if we are, we really do need // to nudge the lock state!) /* actually, even if we're not replicated, we can't stay in MIX, because another mds could discover and replicate us at any time. if that happens while we're flushing, they end up in MIX but their inode has the old scatterstat version. if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) { dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl; scatter_writebehind(lock); if (c) lock->add_waiter(SimpleLock::WAIT_STABLE, c); return; } */ if (mdcache->is_readonly()) { if (lock->get_state() != LOCK_SYNC) { dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl; simple_sync(static_cast(lock)); } break; } // adjust lock state dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl; switch (lock->get_type()) { case CEPH_LOCK_IFILE: if (p->is_replicated() && lock->get_state() != LOCK_MIX) scatter_mix(static_cast(lock)); else if (lock->get_state() != LOCK_LOCK) simple_lock(static_cast(lock)); else simple_sync(static_cast(lock)); break; case CEPH_LOCK_IDFT: case CEPH_LOCK_INEST: if (p->is_replicated() && lock->get_state() != LOCK_MIX) scatter_mix(lock); else if (lock->get_state() != LOCK_LOCK) simple_lock(lock); else simple_sync(lock); break; default: ceph_abort(); } ++count; if (lock->is_stable() && count == 2) { dout(10) << "scatter_nudge oh, stable after two cycles." << dendl; // this should only realy happen when called via // handle_file_lock due to AC_NUDGE, because the rest of the // time we are replicated or have dirty data and won't get // called. bailing here avoids an infinite loop. assert(!c); break; } } else { dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl; if (c) lock->add_waiter(SimpleLock::WAIT_STABLE, c); return; } } } else { dout(10) << "scatter_nudge replica, requesting scatter/unscatter of " << *lock << " on " << *p << dendl; // request unscatter? mds_rank_t auth = lock->get_parent()->authority().first; if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth); // wait... if (c) lock->add_waiter(SimpleLock::WAIT_STABLE, c); // also, requeue, in case we had wrong auth or something updated_scatterlocks.push_back(lock->get_updated_item()); } } void Locker::scatter_tick() { dout(10) << "scatter_tick" << dendl; // updated utime_t now = ceph_clock_now(); int n = updated_scatterlocks.size(); while (!updated_scatterlocks.empty()) { ScatterLock *lock = updated_scatterlocks.front(); if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping if (!lock->is_dirty()) { updated_scatterlocks.pop_front(); dout(10) << " removing from updated_scatterlocks " << *lock << " " << *lock->get_parent() << dendl; continue; } if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval) break; updated_scatterlocks.pop_front(); scatter_nudge(lock, 0); } mds->mdlog->flush(); } void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue) { dout(10) << "scatter_tempsync " << *lock << " on " << *lock->get_parent() << dendl; assert(lock->get_parent()->is_auth()); assert(lock->is_stable()); assert(0 == "not fully implemented, at least not for filelock"); CInode *in = static_cast(lock->get_parent()); switch (lock->get_state()) { case LOCK_SYNC: ceph_abort(); // this shouldn't happen case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break; case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break; default: ceph_abort(); } int gather = 0; if (lock->is_wrlocked()) gather++; if (lock->get_cap_shift() && in->is_head() && in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } if (lock->get_state() == LOCK_MIX_TSYN && in->is_replicated()) { lock->init_gather(); send_lock_message(lock, LOCK_AC_LOCK); gather++; } if (gather) { in->auth_pin(lock); } else { // do tempsync lock->set_state(LOCK_TSYN); lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE); if (lock->get_cap_shift()) { if (need_issue) *need_issue = true; else issue_caps(in); } } } // ========================================================================== // local lock void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut) { dout(7) << "local_wrlock_grab on " << *lock << " on " << *lock->get_parent() << dendl; assert(lock->get_parent()->is_auth()); assert(lock->can_wrlock()); assert(!mut->wrlocks.count(lock)); lock->get_wrlock(mut->get_client()); mut->wrlocks.insert(lock); mut->locks.insert(lock); } bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut) { dout(7) << "local_wrlock_start on " << *lock << " on " << *lock->get_parent() << dendl; assert(lock->get_parent()->is_auth()); if (lock->can_wrlock()) { assert(!mut->wrlocks.count(lock)); lock->get_wrlock(mut->get_client()); mut->wrlocks.insert(lock); mut->locks.insert(lock); return true; } else { lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); return false; } } void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut) { dout(7) << "local_wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl; lock->put_wrlock(); mut->wrlocks.erase(lock); mut->locks.erase(lock); if (lock->get_num_wrlocks() == 0) { lock->finish_waiters(SimpleLock::WAIT_STABLE | SimpleLock::WAIT_WR | SimpleLock::WAIT_RD); } } bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut) { dout(7) << "local_xlock_start on " << *lock << " on " << *lock->get_parent() << dendl; assert(lock->get_parent()->is_auth()); if (!lock->can_xlock_local()) { lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); return false; } lock->get_xlock(mut, mut->get_client()); mut->xlocks.insert(lock); mut->locks.insert(lock); return true; } void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut) { dout(7) << "local_xlock_finish on " << *lock << " on " << *lock->get_parent() << dendl; lock->put_xlock(); mut->xlocks.erase(lock); mut->locks.erase(lock); lock->finish_waiters(SimpleLock::WAIT_STABLE | SimpleLock::WAIT_WR | SimpleLock::WAIT_RD); } // ========================================================================== // file lock void Locker::file_eval(ScatterLock *lock, bool *need_issue) { CInode *in = static_cast(lock->get_parent()); int loner_wanted, other_wanted; int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE); dout(7) << "file_eval wanted=" << gcap_string(wanted) << " loner_wanted=" << gcap_string(loner_wanted) << " other_wanted=" << gcap_string(other_wanted) << " filelock=" << *lock << " on " << *lock->get_parent() << dendl; assert(lock->get_parent()->is_auth()); assert(lock->is_stable()); if (lock->get_parent()->is_freezing_or_frozen()) return; if (mdcache->is_readonly()) { if (lock->get_state() != LOCK_SYNC) { dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl; simple_sync(lock, need_issue); } return; } // excl -> *? if (lock->get_state() == LOCK_EXCL) { dout(20) << " is excl" << dendl; int loner_issued, other_issued, xlocker_issued; in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE); dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued) << " other_issued=" << gcap_string(other_issued) << " xlocker_issued=" << gcap_string(xlocker_issued) << dendl; if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) || (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) || (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/ dout(20) << " should lose it" << dendl; // we should lose it. // loner other want // R R SYNC // R R|W MIX // R W MIX // R|W R MIX // R|W R|W MIX // R|W W MIX // W R MIX // W R|W MIX // W W MIX // -> any writer means MIX; RD doesn't matter. if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) || lock->is_waiter_for(SimpleLock::WAIT_WR)) scatter_mix(lock, need_issue); else if (!lock->is_wrlocked()) // let excl wrlocks drain first simple_sync(lock, need_issue); else dout(10) << " waiting for wrlock to drain" << dendl; } } // * -> excl? else if (lock->get_state() != LOCK_EXCL && !lock->is_rdlocked() && //!lock->is_waiter_for(SimpleLock::WAIT_WR) && ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) || (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) && in->get_target_loner() >= 0) { dout(7) << "file_eval stable, bump to loner " << *lock << " on " << *lock->get_parent() << dendl; file_excl(lock, need_issue); } // * -> mixed? else if (lock->get_state() != LOCK_MIX && !lock->is_rdlocked() && //!lock->is_waiter_for(SimpleLock::WAIT_WR) && (lock->get_scatter_wanted() || (in->get_wanted_loner() < 0 && (wanted & CEPH_CAP_GWR)))) { dout(7) << "file_eval stable, bump to mixed " << *lock << " on " << *lock->get_parent() << dendl; scatter_mix(lock, need_issue); } // * -> sync? else if (lock->get_state() != LOCK_SYNC && !lock->is_wrlocked() && // drain wrlocks first! !lock->is_waiter_for(SimpleLock::WAIT_WR) && !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) && !((lock->get_state() == LOCK_MIX) && in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are //((wanted & CEPH_CAP_RD) || //in->is_replicated() || //lock->get_num_client_lease() || //(!loner && lock->get_state() == LOCK_EXCL)) && ) { dout(7) << "file_eval stable, bump to sync " << *lock << " on " << *lock->get_parent() << dendl; simple_sync(lock, need_issue); } } void Locker::scatter_mix(ScatterLock *lock, bool *need_issue) { dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl; CInode *in = static_cast(lock->get_parent()); assert(in->is_auth()); assert(lock->is_stable()); if (lock->get_state() == LOCK_LOCK) { in->start_scatter(lock); if (in->is_replicated()) { // data bufferlist softdata; lock->encode_locked_state(softdata); // bcast to replicas send_lock_message(lock, LOCK_AC_MIX, softdata); } // change lock lock->set_state(LOCK_MIX); lock->clear_scatter_wanted(); if (lock->get_cap_shift()) { if (need_issue) *need_issue = true; else issue_caps(in); } } else { // gather? switch (lock->get_state()) { case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break; case LOCK_XSYN: file_excl(lock, need_issue); if (lock->get_state() != LOCK_EXCL) return; // fall-thru case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break; case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break; default: ceph_abort(); } int gather = 0; if (lock->is_rdlocked()) gather++; if (in->is_replicated()) { if (lock->get_state() != LOCK_EXCL_MIX && // EXCL replica is already LOCK lock->get_state() != LOCK_XSYN_EXCL) { // XSYN replica is already LOCK; ** FIXME here too! send_lock_message(lock, LOCK_AC_MIX); lock->init_gather(); gather++; } } if (lock->is_leased()) { revoke_client_leases(lock); gather++; } if (lock->get_cap_shift() && in->is_head() && in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } bool need_recover = false; if (in->state_test(CInode::STATE_NEEDSRECOVER)) { mds->mdcache->queue_file_recover(in); need_recover = true; gather++; } if (gather) { lock->get_parent()->auth_pin(lock); if (need_recover) mds->mdcache->do_file_recover(); } else { in->start_scatter(lock); lock->set_state(LOCK_MIX); lock->clear_scatter_wanted(); if (in->is_replicated()) { bufferlist softdata; lock->encode_locked_state(softdata); send_lock_message(lock, LOCK_AC_MIX, softdata); } if (lock->get_cap_shift()) { if (need_issue) *need_issue = true; else issue_caps(in); } } } } void Locker::file_excl(ScatterLock *lock, bool *need_issue) { CInode *in = static_cast(lock->get_parent()); dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl; assert(in->is_auth()); assert(lock->is_stable()); assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) || (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> switch (lock->get_state()) { case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break; case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break; case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break; case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break; default: ceph_abort(); } int gather = 0; if (lock->is_rdlocked()) gather++; if (lock->is_wrlocked()) gather++; if (in->is_replicated() && lock->get_state() != LOCK_LOCK_EXCL && lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock. send_lock_message(lock, LOCK_AC_LOCK); lock->init_gather(); gather++; } if (lock->is_leased()) { revoke_client_leases(lock); gather++; } if (in->is_head() && in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } bool need_recover = false; if (in->state_test(CInode::STATE_NEEDSRECOVER)) { mds->mdcache->queue_file_recover(in); need_recover = true; gather++; } if (gather) { lock->get_parent()->auth_pin(lock); if (need_recover) mds->mdcache->do_file_recover(); } else { lock->set_state(LOCK_EXCL); if (need_issue) *need_issue = true; else issue_caps(in); } } void Locker::file_xsyn(SimpleLock *lock, bool *need_issue) { dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl; CInode *in = static_cast(lock->get_parent()); assert(in->is_auth()); assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty()); switch (lock->get_state()) { case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break; default: ceph_abort(); } int gather = 0; if (lock->is_wrlocked()) gather++; if (in->is_head() && in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } if (gather) { lock->get_parent()->auth_pin(lock); } else { lock->set_state(LOCK_XSYN); lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); if (need_issue) *need_issue = true; else issue_caps(in); } } void Locker::file_recover(ScatterLock *lock) { CInode *in = static_cast(lock->get_parent()); dout(7) << "file_recover " << *lock << " on " << *in << dendl; assert(in->is_auth()); //assert(lock->is_stable()); assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover() int gather = 0; /* if (in->is_replicated() lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) { send_lock_message(lock, LOCK_AC_LOCK); lock->init_gather(); gather++; } */ if (in->is_head() && in->issued_caps_need_gather(lock)) { issue_caps(in); gather++; } lock->set_state(LOCK_SCAN); if (gather) in->state_set(CInode::STATE_NEEDSRECOVER); else mds->mdcache->queue_file_recover(in); } // messenger /* This function DOES put the passed message before returning */ void Locker::handle_file_lock(ScatterLock *lock, MLock *m) { CInode *in = static_cast(lock->get_parent()); int from = m->get_asker(); if (mds->is_rejoin()) { if (in->is_rejoining()) { dout(7) << "handle_file_lock still rejoining " << *in << ", dropping " << *m << dendl; m->put(); return; } } dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action()) << " on " << *lock << " from mds." << from << " " << *in << dendl; bool caps = lock->get_cap_shift(); switch (m->get_action()) { // -- replica -- case LOCK_AC_SYNC: assert(lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_MIX || lock->get_state() == LOCK_MIX_SYNC2); if (lock->get_state() == LOCK_MIX) { lock->set_state(LOCK_MIX_SYNC); eval_gather(lock, true); if (lock->is_unstable_and_locked()) mds->mdlog->flush(); break; } (static_cast(lock))->finish_flush(); (static_cast(lock))->clear_flushed(); // ok lock->decode_locked_state(m->get_data()); lock->set_state(LOCK_SYNC); lock->get_rdlock(); if (caps) issue_caps(in); lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); lock->put_rdlock(); break; case LOCK_AC_LOCK: switch (lock->get_state()) { case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break; case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break; default: ceph_abort(); } eval_gather(lock, true); if (lock->is_unstable_and_locked()) mds->mdlog->flush(); break; case LOCK_AC_LOCKFLUSHED: (static_cast(lock))->finish_flush(); (static_cast(lock))->clear_flushed(); // wake up scatter_nudge waiters if (lock->is_stable()) lock->finish_waiters(SimpleLock::WAIT_STABLE); break; case LOCK_AC_MIX: assert(lock->get_state() == LOCK_SYNC || lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_SYNC_MIX2); if (lock->get_state() == LOCK_SYNC) { // MIXED lock->set_state(LOCK_SYNC_MIX); eval_gather(lock, true); if (lock->is_unstable_and_locked()) mds->mdlog->flush(); break; } // ok lock->set_state(LOCK_MIX); lock->decode_locked_state(m->get_data()); if (caps) issue_caps(in); lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE); break; // -- auth -- case LOCK_AC_LOCKACK: assert(lock->get_state() == LOCK_SYNC_LOCK || lock->get_state() == LOCK_MIX_LOCK || lock->get_state() == LOCK_MIX_LOCK2 || lock->get_state() == LOCK_MIX_EXCL || lock->get_state() == LOCK_SYNC_EXCL || lock->get_state() == LOCK_SYNC_MIX || lock->get_state() == LOCK_MIX_TSYN); assert(lock->is_gathering(from)); lock->remove_gather(from); if (lock->get_state() == LOCK_MIX_LOCK || lock->get_state() == LOCK_MIX_LOCK2 || lock->get_state() == LOCK_MIX_EXCL || lock->get_state() == LOCK_MIX_TSYN) { lock->decode_locked_state(m->get_data()); // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not // delay calling scatter_writebehind(). lock->clear_flushed(); } if (lock->is_gathering()) { dout(7) << "handle_file_lock " << *in << " from " << from << ", still gathering " << lock->get_gather_set() << dendl; } else { dout(7) << "handle_file_lock " << *in << " from " << from << ", last one" << dendl; eval_gather(lock); } break; case LOCK_AC_SYNCACK: assert(lock->get_state() == LOCK_MIX_SYNC); assert(lock->is_gathering(from)); lock->remove_gather(from); lock->decode_locked_state(m->get_data()); if (lock->is_gathering()) { dout(7) << "handle_file_lock " << *in << " from " << from << ", still gathering " << lock->get_gather_set() << dendl; } else { dout(7) << "handle_file_lock " << *in << " from " << from << ", last one" << dendl; eval_gather(lock); } break; case LOCK_AC_MIXACK: assert(lock->get_state() == LOCK_SYNC_MIX); assert(lock->is_gathering(from)); lock->remove_gather(from); if (lock->is_gathering()) { dout(7) << "handle_file_lock " << *in << " from " << from << ", still gathering " << lock->get_gather_set() << dendl; } else { dout(7) << "handle_file_lock " << *in << " from " << from << ", last one" << dendl; eval_gather(lock); } break; // requests.... case LOCK_AC_REQSCATTER: if (lock->is_stable()) { /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing) * because the replica should be holding an auth_pin if they're * doing this (and thus, we are freezing, not frozen, and indefinite * starvation isn't an issue). */ dout(7) << "handle_file_lock got scatter request on " << *lock << " on " << *lock->get_parent() << dendl; if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter scatter_mix(lock); } else { dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock << " on " << *lock->get_parent() << dendl; lock->set_scatter_wanted(); } break; case LOCK_AC_REQUNSCATTER: if (lock->is_stable()) { /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing) * because the replica should be holding an auth_pin if they're * doing this (and thus, we are freezing, not frozen, and indefinite * starvation isn't an issue). */ dout(7) << "handle_file_lock got unscatter request on " << *lock << " on " << *lock->get_parent() << dendl; if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter simple_lock(lock); // FIXME tempsync? } else { dout(7) << "handle_file_lock ignoring unscatter request on " << *lock << " on " << *lock->get_parent() << dendl; lock->set_unscatter_wanted(); } break; case LOCK_AC_REQRDLOCK: handle_reqrdlock(lock, m); break; case LOCK_AC_NUDGE: if (!lock->get_parent()->is_auth()) { dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock << " on " << *lock->get_parent() << dendl; } else if (!lock->get_parent()->is_replicated()) { dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock << " on " << *lock->get_parent() << dendl; } else { dout(7) << "handle_file_lock trying nudge on " << *lock << " on " << *lock->get_parent() << dendl; scatter_nudge(lock, 0, true); mds->mdlog->flush(); } break; default: ceph_abort(); } m->put(); }