remove ceph code
[stor4nfv.git] / src / ceph / src / mds / Locker.cc
diff --git a/src/ceph/src/mds/Locker.cc b/src/ceph/src/mds/Locker.cc
deleted file mode 100644 (file)
index a0ccf96..0000000
+++ /dev/null
@@ -1,5316 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#include "MDSRank.h"
-#include "MDCache.h"
-#include "Locker.h"
-#include "CInode.h"
-#include "CDir.h"
-#include "CDentry.h"
-#include "Mutation.h"
-#include "MDSContext.h"
-
-#include "MDLog.h"
-#include "MDSMap.h"
-
-#include "events/EUpdate.h"
-#include "events/EOpen.h"
-
-#include "msg/Messenger.h"
-#include "osdc/Objecter.h"
-
-#include "messages/MInodeFileCaps.h"
-#include "messages/MLock.h"
-#include "messages/MClientLease.h"
-#include "messages/MClientReply.h"
-#include "messages/MClientCaps.h"
-#include "messages/MClientCapRelease.h"
-
-#include "messages/MMDSSlaveRequest.h"
-
-#include <errno.h>
-
-#include "common/config.h"
-
-
-#define dout_subsys ceph_subsys_mds
-#undef dout_prefix
-#define dout_context g_ceph_context
-#define dout_prefix _prefix(_dout, mds)
-static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
-  return *_dout << "mds." << mds->get_nodeid() << ".locker ";
-}
-
-
-class LockerContext : public MDSInternalContextBase {
-protected:
-  Locker *locker;
-  MDSRank *get_mds() override
-  {
-    return locker->mds;
-  }
-
-public:
-  explicit LockerContext(Locker *locker_) : locker(locker_) {
-    assert(locker != NULL);
-  }
-};
-
-class LockerLogContext : public MDSLogContextBase {
-protected:
-  Locker *locker;
-  MDSRank *get_mds() override
-  {
-    return locker->mds;
-  }
-
-public:
-  explicit LockerLogContext(Locker *locker_) : locker(locker_) {
-    assert(locker != NULL);
-  }
-};
-
-/* This function DOES put the passed message before returning */
-void Locker::dispatch(Message *m)
-{
-
-  switch (m->get_type()) {
-
-    // inter-mds locking
-  case MSG_MDS_LOCK:
-    handle_lock(static_cast<MLock*>(m));
-    break;
-    // inter-mds caps
-  case MSG_MDS_INODEFILECAPS:
-    handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
-    break;
-
-    // client sync
-  case CEPH_MSG_CLIENT_CAPS:
-    handle_client_caps(static_cast<MClientCaps*>(m));
-
-    break;
-  case CEPH_MSG_CLIENT_CAPRELEASE:
-    handle_client_cap_release(static_cast<MClientCapRelease*>(m));
-    break;
-  case CEPH_MSG_CLIENT_LEASE:
-    handle_client_lease(static_cast<MClientLease*>(m));
-    break;
-    
-  default:
-    derr << "locker unknown message " << m->get_type() << dendl;
-    assert(0 == "locker unknown message");
-  }
-}
-
-void Locker::tick()
-{
-  scatter_tick();
-  caps_tick();
-}
-
-/*
- * locks vs rejoin
- *
- * 
- *
- */
-
-void Locker::send_lock_message(SimpleLock *lock, int msg)
-{
-  for (const auto &it : lock->get_parent()->get_replicas()) {
-    if (mds->is_cluster_degraded() &&
-       mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
-      continue;
-    MLock *m = new MLock(lock, msg, mds->get_nodeid());
-    mds->send_message_mds(m, it.first);
-  }
-}
-
-void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
-{
-  for (const auto &it : lock->get_parent()->get_replicas()) {
-    if (mds->is_cluster_degraded() &&
-       mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
-      continue;
-    MLock *m = new MLock(lock, msg, mds->get_nodeid());
-    m->set_data(data);
-    mds->send_message_mds(m, it.first);
-  }
-}
-
-
-
-
-void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
-{
-  // rdlock ancestor snaps
-  CInode *t = in;
-  rdlocks.insert(&in->snaplock);
-  while (t->get_projected_parent_dn()) {
-    t = t->get_projected_parent_dn()->get_dir()->get_inode();
-    rdlocks.insert(&t->snaplock);
-  }
-}
-
-void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
-                                         file_layout_t **layout)
-{
-  //rdlock ancestor snaps
-  CInode *t = in;
-  rdlocks.insert(&in->snaplock);
-  rdlocks.insert(&in->policylock);
-  bool found_layout = false;
-  while (t) {
-    rdlocks.insert(&t->snaplock);
-    if (!found_layout) {
-      rdlocks.insert(&t->policylock);
-      if (t->get_projected_inode()->has_layout()) {
-        *layout = &t->get_projected_inode()->layout;
-        found_layout = true;
-      }
-    }
-    if (t->get_projected_parent_dn() &&
-        t->get_projected_parent_dn()->get_dir())
-      t = t->get_projected_parent_dn()->get_dir()->get_inode();
-    else t = NULL;
-  }
-}
-
-struct MarkEventOnDestruct {
-  MDRequestRef& mdr;
-  const char* message;
-  bool mark_event;
-  MarkEventOnDestruct(MDRequestRef& _mdr,
-                      const char *_message) : mdr(_mdr),
-                          message(_message),
-                          mark_event(true) {}
-  ~MarkEventOnDestruct() {
-    if (mark_event)
-      mdr->mark_event(message);
-  }
-};
-
-/* If this function returns false, the mdr has been placed
- * on the appropriate wait list */
-bool Locker::acquire_locks(MDRequestRef& mdr,
-                          set<SimpleLock*> &rdlocks,
-                          set<SimpleLock*> &wrlocks,
-                          set<SimpleLock*> &xlocks,
-                          map<SimpleLock*,mds_rank_t> *remote_wrlocks,
-                          CInode *auth_pin_freeze,
-                          bool auth_pin_nonblock)
-{
-  if (mdr->done_locking &&
-      !mdr->is_slave()) {  // not on slaves!  master requests locks piecemeal.
-    dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;    
-    return true;  // at least we had better be!
-  }
-  dout(10) << "acquire_locks " << *mdr << dendl;
-
-  MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
-
-  client_t client = mdr->get_client();
-
-  set<SimpleLock*, SimpleLock::ptr_lt> sorted;  // sort everything we will lock
-  set<MDSCacheObject*> mustpin;            // items to authpin
-
-  // xlocks
-  for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
-    dout(20) << " must xlock " << **p << " " << *(*p)->get_parent() << dendl;
-    sorted.insert(*p);
-    mustpin.insert((*p)->get_parent());
-
-    // augment xlock with a versionlock?
-    if ((*p)->get_type() == CEPH_LOCK_DN) {
-      CDentry *dn = (CDentry*)(*p)->get_parent();
-      if (!dn->is_auth())
-       continue;
-
-      if (xlocks.count(&dn->versionlock))
-       continue;  // we're xlocking the versionlock too; don't wrlock it!
-
-      if (mdr->is_master()) {
-       // master.  wrlock versionlock so we can pipeline dentry updates to journal.
-       wrlocks.insert(&dn->versionlock);
-      } else {
-       // slave.  exclusively lock the dentry version (i.e. block other journal updates).
-       // this makes rollback safe.
-       xlocks.insert(&dn->versionlock);
-       sorted.insert(&dn->versionlock);
-      }
-    }
-    if ((*p)->get_type() > CEPH_LOCK_IVERSION) {
-      // inode version lock?
-      CInode *in = (CInode*)(*p)->get_parent();
-      if (!in->is_auth())
-       continue;
-      if (mdr->is_master()) {
-       // master.  wrlock versionlock so we can pipeline inode updates to journal.
-       wrlocks.insert(&in->versionlock);
-      } else {
-       // slave.  exclusively lock the inode version (i.e. block other journal updates).
-       // this makes rollback safe.
-       xlocks.insert(&in->versionlock);
-       sorted.insert(&in->versionlock);
-      }
-    }
-  }
-
-  // wrlocks
-  for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
-    MDSCacheObject *object = (*p)->get_parent();
-    dout(20) << " must wrlock " << **p << " " << *object << dendl;
-    sorted.insert(*p);
-    if (object->is_auth())
-      mustpin.insert(object);
-    else if (!object->is_auth() &&
-            !(*p)->can_wrlock(client) &&  // we might have to request a scatter
-            !mdr->is_slave()) {           // if we are slave (remote_wrlock), the master already authpinned
-      dout(15) << " will also auth_pin " << *object
-              << " in case we need to request a scatter" << dendl;
-      mustpin.insert(object);
-    }
-  }
-
-  // remote_wrlocks
-  if (remote_wrlocks) {
-    for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
-      MDSCacheObject *object = p->first->get_parent();
-      dout(20) << " must remote_wrlock on mds." << p->second << " "
-              << *p->first << " " << *object << dendl;
-      sorted.insert(p->first);
-      mustpin.insert(object);
-    }
-  }
-
-  // rdlocks
-  for (set<SimpleLock*>::iterator p = rdlocks.begin();
-        p != rdlocks.end();
-       ++p) {
-    MDSCacheObject *object = (*p)->get_parent();
-    dout(20) << " must rdlock " << **p << " " << *object << dendl;
-    sorted.insert(*p);
-    if (object->is_auth())
-      mustpin.insert(object);
-    else if (!object->is_auth() &&
-            !(*p)->can_rdlock(client)) {      // we might have to request an rdlock
-      dout(15) << " will also auth_pin " << *object
-              << " in case we need to request a rdlock" << dendl;
-      mustpin.insert(object);
-    }
-  }
-
-  // AUTH PINS
-  map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote;  // mds -> (object set)
-  
-  // can i auth pin them all now?
-  marker.message = "failed to authpin local pins";
-  for (set<MDSCacheObject*>::iterator p = mustpin.begin();
-       p != mustpin.end();
-       ++p) {
-    MDSCacheObject *object = *p;
-
-    dout(10) << " must authpin " << *object << dendl;
-
-    if (mdr->is_auth_pinned(object)) {
-      if (object != (MDSCacheObject*)auth_pin_freeze)
-       continue;
-      if (mdr->more()->is_remote_frozen_authpin) {
-       if (mdr->more()->rename_inode == auth_pin_freeze)
-         continue;
-       // unfreeze auth pin for the wrong inode
-       mustpin_remote[mdr->more()->rename_inode->authority().first].size();
-      }
-    }
-    
-    if (!object->is_auth()) {
-      if (!mdr->locks.empty())
-       drop_locks(mdr.get());
-      if (object->is_ambiguous_auth()) {
-       // wait
-       dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
-       object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
-       mdr->drop_local_auth_pins();
-       return false;
-      }
-      mustpin_remote[object->authority().first].insert(object);
-      continue;
-    }
-    if (!object->can_auth_pin()) {
-      // wait
-      drop_locks(mdr.get());
-      mdr->drop_local_auth_pins();
-      if (auth_pin_nonblock) {
-       dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
-       mdr->aborted = true;
-       return false;
-      }
-      dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
-      object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
-
-      if (!mdr->remote_auth_pins.empty())
-       notify_freeze_waiter(object);
-
-      return false;
-    }
-  }
-
-  // ok, grab local auth pins
-  for (set<MDSCacheObject*>::iterator p = mustpin.begin();
-       p != mustpin.end();
-       ++p) {
-    MDSCacheObject *object = *p;
-    if (mdr->is_auth_pinned(object)) {
-      dout(10) << " already auth_pinned " << *object << dendl;
-    } else if (object->is_auth()) {
-      dout(10) << " auth_pinning " << *object << dendl;
-      mdr->auth_pin(object);
-    }
-  }
-
-  // request remote auth_pins
-  if (!mustpin_remote.empty()) {
-    marker.message = "requesting remote authpins";
-    for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
-        p != mdr->remote_auth_pins.end();
-        ++p) {
-      if (mustpin.count(p->first)) {
-       assert(p->second == p->first->authority().first);
-       map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
-       if (q != mustpin_remote.end())
-         q->second.insert(p->first);
-      }
-    }
-    for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
-        p != mustpin_remote.end();
-        ++p) {
-      dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
-
-      // wait for active auth
-      if (mds->is_cluster_degraded() &&
-         !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
-       dout(10) << " mds." << p->first << " is not active" << dendl;
-       if (mdr->more()->waiting_on_slave.empty())
-         mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
-       return false;
-      }
-      
-      MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
-                                                  MMDSSlaveRequest::OP_AUTHPIN);
-      for (set<MDSCacheObject*>::iterator q = p->second.begin();
-          q != p->second.end();
-          ++q) {
-       dout(10) << " req remote auth_pin of " << **q << dendl;
-       MDSCacheObjectInfo info;
-       (*q)->set_object_info(info);
-       req->get_authpins().push_back(info);
-       if (*q == auth_pin_freeze)
-         (*q)->set_object_info(req->get_authpin_freeze());
-       mdr->pin(*q);
-      }
-      if (auth_pin_nonblock)
-       req->mark_nonblock();
-      mds->send_message_mds(req, p->first);
-
-      // put in waiting list
-      assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
-      mdr->more()->waiting_on_slave.insert(p->first);
-    }
-    return false;
-  }
-
-  // caps i'll need to issue
-  set<CInode*> issue_set;
-  bool result = false;
-
-  // acquire locks.
-  // make sure they match currently acquired locks.
-  set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
-  for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
-       p != sorted.end();
-       ++p) {
-    bool need_wrlock = !!wrlocks.count(*p);
-    bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
-
-    // already locked?
-    if (existing != mdr->locks.end() && *existing == *p) {
-      // right kind?
-      SimpleLock *have = *existing;
-      ++existing;
-      if (xlocks.count(have) && mdr->xlocks.count(have)) {
-       dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
-       continue;
-      }
-      if (mdr->remote_wrlocks.count(have)) {
-       if (!need_remote_wrlock ||
-           mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
-         dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
-                  << " " << *have << " " << *have->get_parent() << dendl;
-         remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
-       }
-      }
-      if (need_wrlock || need_remote_wrlock) {
-       if (need_wrlock == !!mdr->wrlocks.count(have) &&
-           need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
-         if (need_wrlock)
-           dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
-         if (need_remote_wrlock)
-           dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
-         continue;
-       }
-      }
-      if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
-       dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
-       continue;
-      }
-    }
-    
-    // hose any stray locks
-    if (existing != mdr->locks.end() && *existing == *p) {
-      assert(need_wrlock || need_remote_wrlock);
-      SimpleLock *lock = *existing;
-      if (mdr->wrlocks.count(lock)) {
-       if (!need_wrlock)
-         dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
-       else if (need_remote_wrlock) // acquire remote_wrlock first
-         dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
-       bool need_issue = false;
-       wrlock_finish(lock, mdr.get(), &need_issue);
-       if (need_issue)
-         issue_set.insert(static_cast<CInode*>(lock->get_parent()));
-      }
-      ++existing;
-    }
-    while (existing != mdr->locks.end()) {
-      SimpleLock *stray = *existing;
-      ++existing;
-      dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
-      bool need_issue = false;
-      if (mdr->xlocks.count(stray)) {
-       xlock_finish(stray, mdr.get(), &need_issue);
-      } else if (mdr->rdlocks.count(stray)) {
-       rdlock_finish(stray, mdr.get(), &need_issue);
-      } else {
-       // may have acquired both wrlock and remore wrlock
-       if (mdr->wrlocks.count(stray))
-         wrlock_finish(stray, mdr.get(), &need_issue);
-       if (mdr->remote_wrlocks.count(stray))
-         remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
-      }
-      if (need_issue)
-       issue_set.insert(static_cast<CInode*>(stray->get_parent()));
-    }
-
-    // lock
-    if (mdr->locking && *p != mdr->locking) {
-      cancel_locking(mdr.get(), &issue_set);
-    }
-    if (xlocks.count(*p)) {
-      marker.message = "failed to xlock, waiting";
-      if (!xlock_start(*p, mdr)) 
-       goto out;
-      dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
-    } else if (need_wrlock || need_remote_wrlock) {
-      if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
-        marker.message = "waiting for remote wrlocks";
-       remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
-       goto out;
-      }
-      if (need_wrlock && !mdr->wrlocks.count(*p)) {
-        marker.message = "failed to wrlock, waiting";
-       if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
-         marker.message = "failed to wrlock, dropping remote wrlock and waiting";
-         // can't take the wrlock because the scatter lock is gathering. need to
-         // release the remote wrlock, so that the gathering process can finish.
-         remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
-         remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
-         goto out;
-       }
-       // nowait if we have already gotten remote wrlock
-       if (!wrlock_start(*p, mdr, need_remote_wrlock))
-         goto out;
-       dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
-      }
-    } else {
-      assert(mdr->is_master());
-      if ((*p)->is_scatterlock()) {
-       ScatterLock *slock = static_cast<ScatterLock *>(*p);
-       if (slock->is_rejoin_mix()) {
-         // If there is a recovering mds who replcated an object when it failed
-         // and scatterlock in the object was in MIX state, It's possible that
-         // the recovering mds needs to take wrlock on the scatterlock when it
-         // replays unsafe requests. So this mds should delay taking rdlock on
-         // the scatterlock until the recovering mds finishes replaying unsafe.
-         // Otherwise unsafe requests may get replayed after current request.
-         //
-         // For example:
-         // The recovering mds is auth mds of a dirfrag, this mds is auth mds
-         // of correspinding inode. when 'rm -rf' the direcotry, this mds should
-         // delay the rmdir request until the recovering mds has replayed unlink
-         // requests.
-         if (mds->is_cluster_degraded()) {
-           if (!mdr->is_replay()) {
-             drop_locks(mdr.get());
-             mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
-             dout(10) << " rejoin mix scatterlock " << *slock << " " << *(*p)->get_parent()
-                      << ", waiting for cluster recovered" << dendl;
-             marker.message = "rejoin mix scatterlock, waiting for cluster recovered";
-             return false;
-           }
-         } else {
-           slock->clear_rejoin_mix();
-         }
-       }
-      }
-
-      marker.message = "failed to rdlock, waiting";
-      if (!rdlock_start(*p, mdr)) 
-       goto out;
-      dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
-    }
-  }
-    
-  // any extra unneeded locks?
-  while (existing != mdr->locks.end()) {
-    SimpleLock *stray = *existing;
-    ++existing;
-    dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
-    bool need_issue = false;
-    if (mdr->xlocks.count(stray)) {
-      xlock_finish(stray, mdr.get(), &need_issue);
-    } else if (mdr->rdlocks.count(stray)) {
-      rdlock_finish(stray, mdr.get(), &need_issue);
-    } else {
-      // may have acquired both wrlock and remore wrlock
-      if (mdr->wrlocks.count(stray))
-       wrlock_finish(stray, mdr.get(), &need_issue);
-      if (mdr->remote_wrlocks.count(stray))
-       remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
-    }
-    if (need_issue)
-      issue_set.insert(static_cast<CInode*>(stray->get_parent()));
-  }
-
-  mdr->done_locking = true;
-  mdr->set_mds_stamp(ceph_clock_now());
-  result = true;
-  marker.message = "acquired locks";
-
- out:
-  issue_caps_set(issue_set);
-  return result;
-}
-
-void Locker::notify_freeze_waiter(MDSCacheObject *o)
-{
-  CDir *dir = NULL;
-  if (CInode *in = dynamic_cast<CInode*>(o)) {
-    if (!in->is_root())
-      dir = in->get_parent_dir();
-  } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
-    dir = dn->get_dir();
-  } else {
-    dir = dynamic_cast<CDir*>(o);
-    assert(dir);
-  }
-  if (dir) {
-    if (dir->is_freezing_dir())
-      mdcache->fragment_freeze_inc_num_waiters(dir);
-    if (dir->is_freezing_tree()) {
-      while (!dir->is_freezing_tree_root())
-       dir = dir->get_parent_dir();
-      mdcache->migrator->export_freeze_inc_num_waiters(dir);
-    }
-  }
-}
-
-void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
-{
-  for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
-       p != mut->xlocks.end();
-       ++p) {
-    MDSCacheObject *object = (*p)->get_parent();
-    assert(object->is_auth());
-    if (skip_dentry &&
-       ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
-      continue;
-    dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
-    (*p)->set_xlock_done();
-  }
-}
-
-void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
-{
-  while (!mut->rdlocks.empty()) {
-    bool ni = false;
-    MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
-    rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
-    if (ni)
-      pneed_issue->insert(static_cast<CInode*>(p));
-  }
-}
-
-void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
-{
-  set<mds_rank_t> slaves;
-
-  while (!mut->xlocks.empty()) {
-    SimpleLock *lock = *mut->xlocks.begin();
-    MDSCacheObject *p = lock->get_parent();
-    if (!p->is_auth()) {
-      assert(lock->get_sm()->can_remote_xlock);
-      slaves.insert(p->authority().first);
-      lock->put_xlock();
-      mut->locks.erase(lock);
-      mut->xlocks.erase(lock);
-      continue;
-    }
-    bool ni = false;
-    xlock_finish(lock, mut, &ni);
-    if (ni)
-      pneed_issue->insert(static_cast<CInode*>(p));
-  }
-
-  while (!mut->remote_wrlocks.empty()) {
-    map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
-    slaves.insert(p->second);
-    if (mut->wrlocks.count(p->first) == 0)
-      mut->locks.erase(p->first);
-    mut->remote_wrlocks.erase(p);
-  }
-
-  while (!mut->wrlocks.empty()) {
-    bool ni = false;
-    MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
-    wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
-    if (ni)
-      pneed_issue->insert(static_cast<CInode*>(p));
-  }
-
-  for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
-    if (!mds->is_cluster_degraded() ||
-       mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
-      dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
-      MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                                       MMDSSlaveRequest::OP_DROPLOCKS);
-      mds->send_message_mds(slavereq, *p);
-    }
-  }
-}
-
-void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
-{
-  SimpleLock *lock = mut->locking;
-  assert(lock);
-  dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
-
-  if (lock->get_parent()->is_auth()) {
-    bool need_issue = false;
-    if (lock->get_state() == LOCK_PREXLOCK) {
-      _finish_xlock(lock, -1, &need_issue);
-    } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
-              lock->get_num_xlocks() == 0) {
-      lock->set_state(LOCK_XLOCKDONE);
-      eval_gather(lock, true, &need_issue);
-    }
-    if (need_issue)
-      pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
-  }
-  mut->finish_locking(lock);
-}
-
-void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
-{
-  // leftover locks
-  set<CInode*> my_need_issue;
-  if (!pneed_issue)
-    pneed_issue = &my_need_issue;
-
-  if (mut->locking)
-    cancel_locking(mut, pneed_issue);
-  _drop_non_rdlocks(mut, pneed_issue);
-  _drop_rdlocks(mut, pneed_issue);
-
-  if (pneed_issue == &my_need_issue)
-    issue_caps_set(*pneed_issue);
-  mut->done_locking = false;
-}
-
-void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
-{
-  set<CInode*> my_need_issue;
-  if (!pneed_issue)
-    pneed_issue = &my_need_issue;
-
-  _drop_non_rdlocks(mut, pneed_issue);
-
-  if (pneed_issue == &my_need_issue)
-    issue_caps_set(*pneed_issue);
-}
-
-void Locker::drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
-{
-  set<CInode*> my_need_issue;
-  if (!pneed_issue)
-    pneed_issue = &my_need_issue;
-
-  _drop_rdlocks(mut, pneed_issue);
-
-  if (pneed_issue == &my_need_issue)
-    issue_caps_set(*pneed_issue);
-}
-
-
-// generics
-
-void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
-{
-  dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(!lock->is_stable());
-
-  int next = lock->get_next_state();
-
-  CInode *in = 0;
-  bool caps = lock->get_cap_shift();
-  if (lock->get_type() != CEPH_LOCK_DN)
-    in = static_cast<CInode *>(lock->get_parent());
-
-  bool need_issue = false;
-
-  int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
-  assert(!caps || in != NULL);
-  if (caps && in->is_head()) {
-    in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
-                       lock->get_cap_shift(), lock->get_cap_mask());
-    dout(10) << " next state is " << lock->get_state_name(next) 
-            << " issued/allows loner " << gcap_string(loner_issued)
-            << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
-            << " xlocker " << gcap_string(xlocker_issued)
-            << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
-            << " other " << gcap_string(other_issued)
-            << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
-            << dendl;
-
-    if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
-                 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
-                 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
-      need_issue = true;
-  }
-
-#define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
-  bool auth = lock->get_parent()->is_auth();
-  if (!lock->is_gathering() &&
-      (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
-      (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
-      (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
-      (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
-      !(lock->get_parent()->is_auth() && lock->is_flushing()) &&  // i.e. wait for scatter_writebehind!
-      (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
-                (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
-                (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
-      lock->get_state() != LOCK_SYNC_MIX2 &&  // these states need an explicit trigger from the auth mds
-      lock->get_state() != LOCK_MIX_SYNC2
-      ) {
-    dout(7) << "eval_gather finished gather on " << *lock
-           << " on " << *lock->get_parent() << dendl;
-
-    if (lock->get_sm() == &sm_filelock) {
-      assert(in);
-      if (in->state_test(CInode::STATE_RECOVERING)) {
-       dout(7) << "eval_gather finished gather, but still recovering" << dendl;
-       return;
-      } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
-       dout(7) << "eval_gather finished gather, but need to recover" << dendl;
-       mds->mdcache->queue_file_recover(in);
-       mds->mdcache->do_file_recover();
-       return;
-      }
-    }
-
-    if (!lock->get_parent()->is_auth()) {
-      // replica: tell auth
-      mds_rank_t auth = lock->get_parent()->authority().first;
-
-      if (lock->get_parent()->is_rejoining() &&
-         mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
-       dout(7) << "eval_gather finished gather, but still rejoining "
-               << *lock->get_parent() << dendl;
-       return;
-      }
-
-      if (!mds->is_cluster_degraded() ||
-         mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
-       switch (lock->get_state()) {
-       case LOCK_SYNC_LOCK:
-         mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
-                               auth);
-         break;
-
-       case LOCK_MIX_SYNC:
-         {
-           MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
-           lock->encode_locked_state(reply->get_data());
-           mds->send_message_mds(reply, auth);
-           next = LOCK_MIX_SYNC2;
-           (static_cast<ScatterLock *>(lock))->start_flush();
-         }
-         break;
-
-       case LOCK_MIX_SYNC2:
-         (static_cast<ScatterLock *>(lock))->finish_flush();
-         (static_cast<ScatterLock *>(lock))->clear_flushed();
-
-       case LOCK_SYNC_MIX2:
-         // do nothing, we already acked
-         break;
-         
-       case LOCK_SYNC_MIX:
-         { 
-           MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
-           mds->send_message_mds(reply, auth);
-           next = LOCK_SYNC_MIX2;
-         }
-         break;
-
-       case LOCK_MIX_LOCK:
-         {
-           bufferlist data;
-           lock->encode_locked_state(data);
-           mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
-           (static_cast<ScatterLock *>(lock))->start_flush();
-           // we'll get an AC_LOCKFLUSHED to complete
-         }
-         break;
-
-       default:
-         ceph_abort();
-       }
-      }
-    } else {
-      // auth
-
-      // once the first (local) stage of mix->lock gather complete we can
-      // gather from replicas
-      if (lock->get_state() == LOCK_MIX_LOCK &&
-         lock->get_parent()->is_replicated()) {
-       dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
-       send_lock_message(lock, LOCK_AC_LOCK);
-       lock->init_gather();
-       lock->set_state(LOCK_MIX_LOCK2);
-       return;
-      }
-
-      if (lock->is_dirty() && !lock->is_flushed()) {
-       scatter_writebehind(static_cast<ScatterLock *>(lock));
-       mds->mdlog->flush();
-       return;
-      }
-      lock->clear_flushed();
-      
-      switch (lock->get_state()) {
-       // to mixed
-      case LOCK_TSYN_MIX:
-      case LOCK_SYNC_MIX:
-      case LOCK_EXCL_MIX:
-       in->start_scatter(static_cast<ScatterLock *>(lock));
-       if (lock->get_parent()->is_replicated()) {
-         bufferlist softdata;
-         lock->encode_locked_state(softdata);
-         send_lock_message(lock, LOCK_AC_MIX, softdata);
-       }
-       (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
-       break;
-
-      case LOCK_XLOCK:
-      case LOCK_XLOCKDONE:
-       if (next != LOCK_SYNC)
-         break;
-       // fall-thru
-
-       // to sync
-      case LOCK_EXCL_SYNC:
-      case LOCK_LOCK_SYNC:
-      case LOCK_MIX_SYNC:
-      case LOCK_XSYN_SYNC:
-       if (lock->get_parent()->is_replicated()) {
-         bufferlist softdata;
-         lock->encode_locked_state(softdata);
-         send_lock_message(lock, LOCK_AC_SYNC, softdata);
-       }
-       break;
-      }
-
-    }
-
-    lock->set_state(next);
-    
-    if (lock->get_parent()->is_auth() &&
-       lock->is_stable())
-      lock->get_parent()->auth_unpin(lock);
-
-    // drop loner before doing waiters
-    if (caps &&
-       in->is_head() &&
-       in->is_auth() &&
-       in->get_wanted_loner() != in->get_loner()) {
-      dout(10) << "  trying to drop loner" << dendl;
-      if (in->try_drop_loner()) {
-       dout(10) << "  dropped loner" << dendl;
-       need_issue = true;
-      }
-    }
-
-    if (pfinishers)
-      lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
-                        *pfinishers);
-    else
-      lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
-    
-    if (caps && in->is_head())
-      need_issue = true;
-
-    if (lock->get_parent()->is_auth() &&
-       lock->is_stable())
-      try_eval(lock, &need_issue);
-  }
-
-  if (need_issue) {
-    if (pneed_issue)
-      *pneed_issue = true;
-    else if (in->is_head())
-      issue_caps(in);
-  }
-
-}
-
-bool Locker::eval(CInode *in, int mask, bool caps_imported)
-{
-  bool need_issue = caps_imported;
-  list<MDSInternalContextBase*> finishers;
-  
-  dout(10) << "eval " << mask << " " << *in << dendl;
-
-  // choose loner?
-  if (in->is_auth() && in->is_head()) {
-    if (in->choose_ideal_loner() >= 0) {
-      if (in->try_set_loner()) {
-       dout(10) << "eval set loner to client." << in->get_loner() << dendl;
-       need_issue = true;
-       mask = -1;
-      } else
-       dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
-    } else
-      dout(10) << "eval doesn't want loner" << dendl;
-  }
-
- retry:
-  if (mask & CEPH_LOCK_IFILE)
-    eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
-  if (mask & CEPH_LOCK_IAUTH)
-    eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
-  if (mask & CEPH_LOCK_ILINK)
-    eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
-  if (mask & CEPH_LOCK_IXATTR)
-    eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
-  if (mask & CEPH_LOCK_INEST)
-    eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
-  if (mask & CEPH_LOCK_IFLOCK)
-    eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
-  if (mask & CEPH_LOCK_IPOLICY)
-    eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
-
-  // drop loner?
-  if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
-    dout(10) << "  trying to drop loner" << dendl;
-    if (in->try_drop_loner()) {
-      dout(10) << "  dropped loner" << dendl;
-      need_issue = true;
-
-      if (in->get_wanted_loner() >= 0) {
-       if (in->try_set_loner()) {
-         dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
-         mask = -1;
-         goto retry;
-       } else {
-         dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
-       }
-      }
-    }
-  }
-
-  finish_contexts(g_ceph_context, finishers);
-
-  if (need_issue && in->is_head())
-    issue_caps(in);
-
-  dout(10) << "eval done" << dendl;
-  return need_issue;
-}
-
-class C_Locker_Eval : public LockerContext {
-  MDSCacheObject *p;
-  int mask;
-public:
-  C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
-    // We are used as an MDSCacheObject waiter, so should
-    // only be invoked by someone already holding the big lock.
-    assert(locker->mds->mds_lock.is_locked_by_me());
-    p->get(MDSCacheObject::PIN_PTRWAITER);    
-  }
-  void finish(int r) override {
-    locker->try_eval(p, mask);
-    p->put(MDSCacheObject::PIN_PTRWAITER);
-  }
-};
-
-void Locker::try_eval(MDSCacheObject *p, int mask)
-{
-  // unstable and ambiguous auth?
-  if (p->is_ambiguous_auth()) {
-    dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
-    p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
-    return;
-  }
-
-  if (p->is_auth() && p->is_frozen()) {
-    dout(7) << "try_eval frozen, waiting on " << *p << dendl;
-    p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
-    return;
-  }
-
-  if (mask & CEPH_LOCK_DN) {
-    assert(mask == CEPH_LOCK_DN);
-    bool need_issue = false;  // ignore this, no caps on dentries
-    CDentry *dn = static_cast<CDentry *>(p);
-    eval_any(&dn->lock, &need_issue);
-  } else {
-    CInode *in = static_cast<CInode *>(p);
-    eval(in, mask);
-  }
-}
-
-void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
-{
-  MDSCacheObject *p = lock->get_parent();
-
-  // unstable and ambiguous auth?
-  if (p->is_ambiguous_auth()) {
-    dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
-    p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
-    return;
-  }
-  
-  if (!p->is_auth()) {
-    dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
-    return;
-  }
-
-  if (p->is_frozen()) {
-    dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
-    p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
-    return;
-  }
-
-  /*
-   * We could have a situation like:
-   *
-   * - mds A authpins item on mds B
-   * - mds B starts to freeze tree containing item
-   * - mds A tries wrlock_start on A, sends REQSCATTER to B
-   * - mds B lock is unstable, sets scatter_wanted
-   * - mds B lock stabilizes, calls try_eval.
-   *
-   * We can defer while freezing without causing a deadlock.  Honor
-   * scatter_wanted flag here.  This will never get deferred by the
-   * checks above due to the auth_pin held by the master.
-   */
-  if (lock->is_scatterlock()) {
-    ScatterLock *slock = static_cast<ScatterLock *>(lock);
-    if (slock->get_scatter_wanted() &&
-       slock->get_state() != LOCK_MIX) {
-      scatter_mix(slock, pneed_issue);
-      if (!lock->is_stable())
-       return;
-    } else if (slock->get_unscatter_wanted() &&
-        slock->get_state() != LOCK_LOCK) {
-      simple_lock(slock, pneed_issue);
-      if (!lock->is_stable()) {
-        return;
-      }
-    }
-  }
-
-  if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
-    dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
-    p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
-    return;
-  }
-
-  eval(lock, pneed_issue);
-}
-
-void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
-{
-  bool need_issue = false;
-  list<MDSInternalContextBase*> finishers;
-
-  // kick locks now
-  if (!in->filelock.is_stable())
-    eval_gather(&in->filelock, false, &need_issue, &finishers);
-  if (!in->authlock.is_stable())
-    eval_gather(&in->authlock, false, &need_issue, &finishers);
-  if (!in->linklock.is_stable())
-    eval_gather(&in->linklock, false, &need_issue, &finishers);
-  if (!in->xattrlock.is_stable())
-    eval_gather(&in->xattrlock, false, &need_issue, &finishers);
-
-  if (need_issue && in->is_head()) {
-    if (issue_set)
-      issue_set->insert(in);
-    else
-      issue_caps(in);
-  }
-
-  finish_contexts(g_ceph_context, finishers);
-}
-
-void Locker::eval_scatter_gathers(CInode *in)
-{
-  bool need_issue = false;
-  list<MDSInternalContextBase*> finishers;
-
-  dout(10) << "eval_scatter_gathers " << *in << dendl;
-
-  // kick locks now
-  if (!in->filelock.is_stable())
-    eval_gather(&in->filelock, false, &need_issue, &finishers);
-  if (!in->nestlock.is_stable())
-    eval_gather(&in->nestlock, false, &need_issue, &finishers);
-  if (!in->dirfragtreelock.is_stable())
-    eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
-  
-  if (need_issue && in->is_head())
-    issue_caps(in);
-  
-  finish_contexts(g_ceph_context, finishers);
-}
-
-void Locker::eval(SimpleLock *lock, bool *need_issue)
-{
-  switch (lock->get_type()) {
-  case CEPH_LOCK_IFILE:
-    return file_eval(static_cast<ScatterLock*>(lock), need_issue);
-  case CEPH_LOCK_IDFT:
-  case CEPH_LOCK_INEST:
-    return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
-  default:
-    return simple_eval(lock, need_issue);
-  }
-}
-
-
-// ------------------
-// rdlock
-
-bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
-{
-  // kick the lock
-  if (lock->is_stable()) {
-    if (lock->get_parent()->is_auth()) {
-      if (lock->get_sm() == &sm_scatterlock) {
-       // not until tempsync is fully implemented
-       //if (lock->get_parent()->is_replicated())
-       //scatter_tempsync((ScatterLock*)lock);
-       //else
-       simple_sync(lock);
-      } else if (lock->get_sm() == &sm_filelock) {
-       CInode *in = static_cast<CInode*>(lock->get_parent());
-       if (lock->get_state() == LOCK_EXCL &&
-           in->get_target_loner() >= 0 &&
-           !in->is_dir() && !as_anon)   // as_anon => caller wants SYNC, not XSYN
-         file_xsyn(lock);
-       else
-         simple_sync(lock);
-      } else
-       simple_sync(lock);
-      return true;
-    } else {
-      // request rdlock state change from auth
-      mds_rank_t auth = lock->get_parent()->authority().first;
-      if (!mds->is_cluster_degraded() ||
-         mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
-       dout(10) << "requesting rdlock from auth on "
-                << *lock << " on " << *lock->get_parent() << dendl;
-       mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
-      }
-      return false;
-    }
-  }
-  if (lock->get_type() == CEPH_LOCK_IFILE) {
-    CInode *in = static_cast<CInode *>(lock->get_parent());
-    if (in->state_test(CInode::STATE_RECOVERING)) {
-      mds->mdcache->recovery_queue.prioritize(in);
-    }
-  }
-
-  return false;
-}
-
-bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
-{
-  dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;  
-
-  // can read?  grab ref.
-  if (lock->can_rdlock(client)) 
-    return true;
-  
-  _rdlock_kick(lock, false);
-
-  if (lock->can_rdlock(client)) 
-    return true;
-
-  // wait!
-  if (con) {
-    dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
-    lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
-  }
-  return false;
-}
-
-bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
-{
-  dout(7) << "rdlock_start  on " << *lock << " on " << *lock->get_parent() << dendl;  
-
-  // client may be allowed to rdlock the same item it has xlocked.
-  //  UNLESS someone passes in as_anon, or we're reading snapped version here.
-  if (mut->snapid != CEPH_NOSNAP)
-    as_anon = true;
-  client_t client = as_anon ? -1 : mut->get_client();
-
-  CInode *in = 0;
-  if (lock->get_type() != CEPH_LOCK_DN)
-    in = static_cast<CInode *>(lock->get_parent());
-
-  /*
-  if (!lock->get_parent()->is_auth() &&
-      lock->fw_rdlock_to_auth()) {
-    mdcache->request_forward(mut, lock->get_parent()->authority().first);
-    return false;
-  }
-  */
-
-  while (1) {
-    // can read?  grab ref.
-    if (lock->can_rdlock(client)) {
-      lock->get_rdlock();
-      mut->rdlocks.insert(lock);
-      mut->locks.insert(lock);
-      return true;
-    }
-
-    // hmm, wait a second.
-    if (in && !in->is_head() && in->is_auth() &&
-       lock->get_state() == LOCK_SNAP_SYNC) {
-      // okay, we actually need to kick the head's lock to get ourselves synced up.
-      CInode *head = mdcache->get_inode(in->ino());
-      assert(head);
-      SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
-      if (hlock->get_state() == LOCK_SYNC)
-       hlock = head->get_lock(lock->get_type());
-
-      if (hlock->get_state() != LOCK_SYNC) {
-       dout(10) << "rdlock_start trying head inode " << *head << dendl;
-       if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
-         return false;
-       // oh, check our lock again then
-      }
-    }
-
-    if (!_rdlock_kick(lock, as_anon))
-      break;
-  }
-
-  // wait!
-  int wait_on;
-  if (lock->get_parent()->is_auth() && lock->is_stable())
-    wait_on = SimpleLock::WAIT_RD;
-  else
-    wait_on = SimpleLock::WAIT_STABLE;  // REQRDLOCK is ignored if lock is unstable, so we need to retry.
-  dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
-  lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
-  nudge_log(lock);
-  return false;
-}
-
-void Locker::nudge_log(SimpleLock *lock)
-{
-  dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
-  if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked())    // as with xlockdone, or cap flush
-    mds->mdlog->flush();
-}
-
-void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
-{
-  // drop ref
-  lock->put_rdlock();
-  if (mut) {
-    mut->rdlocks.erase(lock);
-    mut->locks.erase(lock);
-  }
-
-  dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
-  
-  // last one?
-  if (!lock->is_rdlocked()) {
-    if (!lock->is_stable())
-      eval_gather(lock, false, pneed_issue);
-    else if (lock->get_parent()->is_auth())
-      try_eval(lock, pneed_issue);
-  }
-}
-
-
-bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
-{
-  dout(10) << "can_rdlock_set " << locks << dendl;
-  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
-    if (!(*p)->can_rdlock(-1)) {
-      dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
-      return false;
-    }
-  return true;
-}
-
-bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
-{
-  dout(10) << "rdlock_try_set " << locks << dendl;
-  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
-    if (!rdlock_try(*p, -1, NULL)) {
-      dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
-      return false;
-    }
-  return true;
-}
-
-void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
-{
-  dout(10) << "rdlock_take_set " << locks << dendl;
-  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
-    (*p)->get_rdlock();
-    mut->rdlocks.insert(*p);
-    mut->locks.insert(*p);
-  }
-}
-
-// ------------------
-// wrlock
-
-void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
-{
-  if (lock->get_type() == CEPH_LOCK_IVERSION ||
-      lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
-
-  dout(7) << "wrlock_force  on " << *lock
-         << " on " << *lock->get_parent() << dendl;  
-  lock->get_wrlock(true);
-  mut->wrlocks.insert(lock);
-  mut->locks.insert(lock);
-}
-
-bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
-{
-  if (lock->get_type() == CEPH_LOCK_IVERSION ||
-      lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
-
-  dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
-
-  CInode *in = static_cast<CInode *>(lock->get_parent());
-  client_t client = mut->get_client();
-  bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
-                     (in->has_subtree_or_exporting_dirfrag() ||
-                      static_cast<ScatterLock*>(lock)->get_scatter_wanted());
-
-  while (1) {
-    // wrlock?
-    if (lock->can_wrlock(client) &&
-       (!want_scatter || lock->get_state() == LOCK_MIX)) {
-      lock->get_wrlock();
-      mut->wrlocks.insert(lock);
-      mut->locks.insert(lock);
-      return true;
-    }
-
-    if (lock->get_type() == CEPH_LOCK_IFILE &&
-       in->state_test(CInode::STATE_RECOVERING)) {
-      mds->mdcache->recovery_queue.prioritize(in);
-    }
-
-    if (!lock->is_stable())
-      break;
-
-    if (in->is_auth()) {
-      // don't do nested lock state change if we have dirty scatterdata and
-      // may scatter_writebehind or start_scatter, because nowait==true implies
-      // that the caller already has a log entry open!
-      if (nowait && lock->is_dirty())
-       return false;
-
-      if (want_scatter)
-       scatter_mix(static_cast<ScatterLock*>(lock));
-      else
-       simple_lock(lock);
-
-      if (nowait && !lock->can_wrlock(client))
-       return false;
-      
-    } else {
-      // replica.
-      // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
-      mds_rank_t auth = lock->get_parent()->authority().first;
-      if (!mds->is_cluster_degraded() ||
-         mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
-       dout(10) << "requesting scatter from auth on "
-                << *lock << " on " << *lock->get_parent() << dendl;
-       mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
-      }
-      break;
-    }
-  }
-
-  if (!nowait) {
-    dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
-    lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
-    nudge_log(lock);
-  }
-    
-  return false;
-}
-
-void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
-{
-  if (lock->get_type() == CEPH_LOCK_IVERSION ||
-      lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
-
-  dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
-  lock->put_wrlock();
-  if (mut) {
-    mut->wrlocks.erase(lock);
-    if (mut->remote_wrlocks.count(lock) == 0)
-      mut->locks.erase(lock);
-  }
-
-  if (!lock->is_wrlocked()) {
-    if (!lock->is_stable())
-      eval_gather(lock, false, pneed_issue);
-    else if (lock->get_parent()->is_auth())
-      try_eval(lock, pneed_issue);
-  }
-}
-
-
-// remote wrlock
-
-void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
-{
-  dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
-
-  // wait for active target
-  if (mds->is_cluster_degraded() &&
-      !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
-    dout(7) << " mds." << target << " is not active" << dendl;
-    if (mut->more()->waiting_on_slave.empty())
-      mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
-    return;
-  }
-
-  // send lock request
-  mut->start_locking(lock, target);
-  mut->more()->slaves.insert(target);
-  MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                            MMDSSlaveRequest::OP_WRLOCK);
-  r->set_lock_type(lock->get_type());
-  lock->get_parent()->set_object_info(r->get_object_info());
-  mds->send_message_mds(r, target);
-
-  assert(mut->more()->waiting_on_slave.count(target) == 0);
-  mut->more()->waiting_on_slave.insert(target);
-}
-
-void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
-                                  MutationImpl *mut)
-{
-  // drop ref
-  mut->remote_wrlocks.erase(lock);
-  if (mut->wrlocks.count(lock) == 0)
-    mut->locks.erase(lock);
-  
-  dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
-         << " " << *lock->get_parent()  << dendl;
-  if (!mds->is_cluster_degraded() ||
-      mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
-    MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                                     MMDSSlaveRequest::OP_UNWRLOCK);
-    slavereq->set_lock_type(lock->get_type());
-    lock->get_parent()->set_object_info(slavereq->get_object_info());
-    mds->send_message_mds(slavereq, target);
-  }
-}
-
-
-// ------------------
-// xlock
-
-bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
-{
-  if (lock->get_type() == CEPH_LOCK_IVERSION ||
-      lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_xlock_start(static_cast<LocalLock*>(lock), mut);
-
-  dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
-  client_t client = mut->get_client();
-
-  // auth?
-  if (lock->get_parent()->is_auth()) {
-    // auth
-    while (1) {
-      if (lock->can_xlock(client)) {
-       lock->set_state(LOCK_XLOCK);
-       lock->get_xlock(mut, client);
-       mut->xlocks.insert(lock);
-       mut->locks.insert(lock);
-       mut->finish_locking(lock);
-       return true;
-      }
-      
-      if (lock->get_type() == CEPH_LOCK_IFILE) {
-       CInode *in = static_cast<CInode*>(lock->get_parent());
-       if (in->state_test(CInode::STATE_RECOVERING)) {
-         mds->mdcache->recovery_queue.prioritize(in);
-       }
-      }
-
-      if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
-                                lock->get_xlock_by_client() != client ||
-                                lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
-       break;
-
-      if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
-       mut->start_locking(lock);
-       simple_xlock(lock);
-      } else {
-       simple_lock(lock);
-      }
-    }
-    
-    lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
-    nudge_log(lock);
-    return false;
-  } else {
-    // replica
-    assert(lock->get_sm()->can_remote_xlock);
-    assert(!mut->slave_request);
-    
-    // wait for single auth
-    if (lock->get_parent()->is_ambiguous_auth()) {
-      lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, 
-                                    new C_MDS_RetryRequest(mdcache, mut));
-      return false;
-    }
-    
-    // wait for active auth
-    mds_rank_t auth = lock->get_parent()->authority().first;
-    if (mds->is_cluster_degraded() &&
-       !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
-      dout(7) << " mds." << auth << " is not active" << dendl;
-      if (mut->more()->waiting_on_slave.empty())
-       mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
-      return false;
-    }
-
-    // send lock request
-    mut->more()->slaves.insert(auth);
-    mut->start_locking(lock, auth);
-    MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                              MMDSSlaveRequest::OP_XLOCK);
-    r->set_lock_type(lock->get_type());
-    lock->get_parent()->set_object_info(r->get_object_info());
-    mds->send_message_mds(r, auth);
-
-    assert(mut->more()->waiting_on_slave.count(auth) == 0);
-    mut->more()->waiting_on_slave.insert(auth);
-
-    return false;
-  }
-}
-
-void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
-{
-  assert(!lock->is_stable());
-  if (lock->get_num_rdlocks() == 0 &&
-      lock->get_num_wrlocks() == 0 &&
-      lock->get_num_client_lease() == 0 &&
-      lock->get_state() != LOCK_XLOCKSNAP &&
-      lock->get_type() != CEPH_LOCK_DN) {
-    CInode *in = static_cast<CInode*>(lock->get_parent());
-    client_t loner = in->get_target_loner();
-    if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
-      lock->set_state(LOCK_EXCL);
-      lock->get_parent()->auth_unpin(lock);
-      lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
-      if (lock->get_cap_shift())
-       *pneed_issue = true;
-      if (lock->get_parent()->is_auth() &&
-         lock->is_stable())
-       try_eval(lock, pneed_issue);
-      return;
-    }
-  }
-  // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
-  eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
-}
-
-void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
-{
-  if (lock->get_type() == CEPH_LOCK_IVERSION ||
-      lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
-
-  dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
-
-  client_t xlocker = lock->get_xlock_by_client();
-
-  // drop ref
-  lock->put_xlock();
-  assert(mut);
-  mut->xlocks.erase(lock);
-  mut->locks.erase(lock);
-  
-  bool do_issue = false;
-
-  // remote xlock?
-  if (!lock->get_parent()->is_auth()) {
-    assert(lock->get_sm()->can_remote_xlock);
-
-    // tell auth
-    dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent()  << dendl;
-    mds_rank_t auth = lock->get_parent()->authority().first;
-    if (!mds->is_cluster_degraded() ||
-       mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
-      MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                                       MMDSSlaveRequest::OP_UNXLOCK);
-      slavereq->set_lock_type(lock->get_type());
-      lock->get_parent()->set_object_info(slavereq->get_object_info());
-      mds->send_message_mds(slavereq, auth);
-    }
-    // others waiting?
-    lock->finish_waiters(SimpleLock::WAIT_STABLE |
-                        SimpleLock::WAIT_WR | 
-                        SimpleLock::WAIT_RD, 0); 
-  } else {
-    if (lock->get_num_xlocks() == 0) {
-      if (lock->get_state() == LOCK_LOCK_XLOCK)
-       lock->set_state(LOCK_XLOCKDONE);
-      _finish_xlock(lock, xlocker, &do_issue);
-    }
-  }
-  
-  if (do_issue) {
-    CInode *in = static_cast<CInode*>(lock->get_parent());
-    if (in->is_head()) {
-      if (pneed_issue)
-       *pneed_issue = true;
-      else
-       issue_caps(in);
-    }
-  }
-}
-
-void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
-{
-  dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
-
-  lock->put_xlock();
-  mut->xlocks.erase(lock);
-  mut->locks.erase(lock);
-
-  MDSCacheObject *p = lock->get_parent();
-  assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH));  // we are exporting this (inode)
-
-  if (!lock->is_stable())
-    lock->get_parent()->auth_unpin(lock);
-
-  lock->set_state(LOCK_LOCK);
-}
-
-void Locker::xlock_import(SimpleLock *lock)
-{
-  dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
-  lock->get_parent()->auth_pin(lock);
-}
-
-
-
-// file i/o -----------------------------------------
-
-version_t Locker::issue_file_data_version(CInode *in)
-{
-  dout(7) << "issue_file_data_version on " << *in << dendl;
-  return in->inode.file_data_version;
-}
-
-class C_Locker_FileUpdate_finish : public LockerLogContext {
-  CInode *in;
-  MutationRef mut;
-  bool share_max;
-  bool need_issue;
-  client_t client;
-  MClientCaps *ack;
-public:
-  C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
-                               bool sm=false, bool ni=false, client_t c=-1,
-                               MClientCaps *ac = 0)
-    : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
-      client(c), ack(ac) {
-    in->get(CInode::PIN_PTRWAITER);
-  }
-  void finish(int r) override {
-    locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
-    in->put(CInode::PIN_PTRWAITER);
-  }
-};
-
-void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
-                               client_t client, MClientCaps *ack)
-{
-  dout(10) << "file_update_finish on " << *in << dendl;
-  in->pop_and_dirty_projected_inode(mut->ls);
-
-  mut->apply();
-  
-  if (ack) {
-    Session *session = mds->get_session(client);
-    if (session) {
-      // "oldest flush tid" > 0 means client uses unique TID for each flush
-      if (ack->get_oldest_flush_tid() > 0)
-       session->add_completed_flush(ack->get_client_tid());
-      mds->send_message_client_counted(ack, session);
-    } else {
-      dout(10) << " no session for client." << client << " " << *ack << dendl;
-      ack->put();
-    }
-  }
-
-  set<CInode*> need_issue;
-  drop_locks(mut.get(), &need_issue);
-
-  if (!in->is_head() && !in->client_snap_caps.empty()) {
-    dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
-    // check for snap writeback completion
-    bool gather = false;
-    compact_map<int,set<client_t> >::iterator p = in->client_snap_caps.begin();
-    while (p != in->client_snap_caps.end()) {
-      SimpleLock *lock = in->get_lock(p->first);
-      assert(lock);
-      dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
-              << " lock " << *lock << " on " << *in << dendl;
-      lock->put_wrlock();
-
-      p->second.erase(client);
-      if (p->second.empty()) {
-       gather = true;
-       in->client_snap_caps.erase(p++);
-      } else
-       ++p;
-    }
-    if (gather) {
-      if (in->client_snap_caps.empty())
-       in->item_open_file.remove_myself();
-      eval_cap_gather(in, &need_issue);
-    }
-  } else {
-    if (issue_client_cap && need_issue.count(in) == 0) {
-      Capability *cap = in->get_client_cap(client);
-      if (cap && (cap->wanted() & ~cap->pending()))
-       issue_caps(in, cap);
-    }
-  
-    if (share_max && in->is_auth() &&
-       (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
-      share_inode_max_size(in);
-  }
-  issue_caps_set(need_issue);
-
-  // auth unpin after issuing caps
-  mut->cleanup();
-}
-
-Capability* Locker::issue_new_caps(CInode *in,
-                                  int mode,
-                                  Session *session,
-                                  SnapRealm *realm,
-                                  bool is_replay)
-{
-  dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
-  bool is_new;
-
-  // if replay, try to reconnect cap, and otherwise do nothing.
-  if (is_replay) {
-    mds->mdcache->try_reconnect_cap(in, session);
-    return 0;
-  }
-
-  // my needs
-  assert(session->info.inst.name.is_client());
-  client_t my_client = session->info.inst.name.num();
-  int my_want = ceph_caps_for_mode(mode);
-
-  // register a capability
-  Capability *cap = in->get_client_cap(my_client);
-  if (!cap) {
-    // new cap
-    cap = in->add_client_cap(my_client, session, realm);
-    cap->set_wanted(my_want);
-    cap->mark_new();
-    cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
-    is_new = true;
-  } else {
-    is_new = false;
-    // make sure it wants sufficient caps
-    if (my_want & ~cap->wanted()) {
-      // augment wanted caps for this client
-      cap->set_wanted(cap->wanted() | my_want);
-    }
-  }
-
-  if (in->is_auth()) {
-    // [auth] twiddle mode?
-    eval(in, CEPH_CAP_LOCKS);
-
-    if (_need_flush_mdlog(in, my_want))
-      mds->mdlog->flush();
-
-  } else {
-    // [replica] tell auth about any new caps wanted
-    request_inode_file_caps(in);
-  }
-
-  // issue caps (pot. incl new one)
-  //issue_caps(in);  // note: _eval above may have done this already...
-
-  // re-issue whatever we can
-  //cap->issue(cap->pending());
-
-  if (is_new)
-    cap->dec_suppress();
-
-  return cap;
-}
-
-
-void Locker::issue_caps_set(set<CInode*>& inset)
-{
-  for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
-    issue_caps(*p);
-}
-
-bool Locker::issue_caps(CInode *in, Capability *only_cap)
-{
-  // allowed caps are determined by the lock mode.
-  int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
-  int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
-  int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
-
-  client_t loner = in->get_loner();
-  if (loner >= 0) {
-    dout(7) << "issue_caps loner client." << loner
-           << " allowed=" << ccap_string(loner_allowed) 
-           << ", xlocker allowed=" << ccap_string(xlocker_allowed)
-           << ", others allowed=" << ccap_string(all_allowed)
-           << " on " << *in << dendl;
-  } else {
-    dout(7) << "issue_caps allowed=" << ccap_string(all_allowed) 
-           << ", xlocker allowed=" << ccap_string(xlocker_allowed)
-           << " on " << *in << dendl;
-  }
-
-  assert(in->is_head());
-
-  // count conflicts with
-  int nissued = 0;        
-
-  // client caps
-  map<client_t, Capability*>::iterator it;
-  if (only_cap)
-    it = in->client_caps.find(only_cap->get_client());
-  else
-    it = in->client_caps.begin();
-  for (; it != in->client_caps.end(); ++it) {
-    Capability *cap = it->second;
-    if (cap->is_stale())
-      continue;
-
-    // do not issue _new_ bits when size|mtime is projected
-    int allowed;
-    if (loner == it->first)
-      allowed = loner_allowed;
-    else
-      allowed = all_allowed;
-
-    // add in any xlocker-only caps (for locks this client is the xlocker for)
-    allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
-
-    Session *session = mds->get_session(it->first);
-    if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
-       !(session && session->connection &&
-         session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
-      allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
-
-    int pending = cap->pending();
-    int wanted = cap->wanted();
-
-    dout(20) << " client." << it->first
-            << " pending " << ccap_string(pending) 
-            << " allowed " << ccap_string(allowed) 
-            << " wanted " << ccap_string(wanted)
-            << dendl;
-
-    if (!(pending & ~allowed)) {
-      // skip if suppress or new, and not revocation
-      if (cap->is_new() || cap->is_suppress()) {
-       dout(20) << "  !revoke and new|suppressed, skipping client." << it->first << dendl;
-       continue;
-      }
-    }
-
-    // notify clients about deleted inode, to make sure they release caps ASAP.
-    if (in->inode.nlink == 0)
-      wanted |= CEPH_CAP_LINK_SHARED;
-
-    // are there caps that the client _wants_ and can have, but aren't pending?
-    // or do we need to revoke?
-    if (((wanted & allowed) & ~pending) ||  // missing wanted+allowed caps
-       (pending & ~allowed)) {             // need to revoke ~allowed caps.
-      // issue
-      nissued++;
-
-      // include caps that clients generally like, while we're at it.
-      int likes = in->get_caps_liked();      
-      int before = pending;
-      long seq;
-      if (pending & ~allowed)
-       seq = cap->issue((wanted|likes) & allowed & pending);  // if revoking, don't issue anything new.
-      else
-       seq = cap->issue((wanted|likes) & allowed);
-      int after = cap->pending();
-
-      if (cap->is_new()) {
-       // haven't send caps to client yet
-       if (before & ~after)
-         cap->confirm_receipt(seq, after);
-      } else {
-        dout(7) << "   sending MClientCaps to client." << it->first
-               << " seq " << cap->get_last_seq()
-               << " new pending " << ccap_string(after) << " was " << ccap_string(before) 
-               << dendl;
-
-       int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
-       if (op == CEPH_CAP_OP_REVOKE) {
-               revoking_caps.push_back(&cap->item_revoking_caps);
-               revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
-               cap->set_last_revoke_stamp(ceph_clock_now());
-               cap->reset_num_revoke_warnings();
-       }
-
-       MClientCaps *m = new MClientCaps(op, in->ino(),
-                                        in->find_snaprealm()->inode->ino(),
-                                        cap->get_cap_id(), cap->get_last_seq(),
-                                        after, wanted, 0,
-                                        cap->get_mseq(),
-                                         mds->get_osd_epoch_barrier());
-       in->encode_cap_message(m, cap);
-
-       mds->send_message_client_counted(m, it->first);
-      }
-    }
-
-    if (only_cap)
-      break;
-  }
-
-  return (nissued == 0);  // true if no re-issued, no callbacks
-}
-
-void Locker::issue_truncate(CInode *in)
-{
-  dout(7) << "issue_truncate on " << *in << dendl;
-  
-  for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
-       it != in->client_caps.end();
-       ++it) {
-    Capability *cap = it->second;
-    MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
-                                    in->ino(),
-                                    in->find_snaprealm()->inode->ino(),
-                                    cap->get_cap_id(), cap->get_last_seq(),
-                                    cap->pending(), cap->wanted(), 0,
-                                    cap->get_mseq(),
-                                     mds->get_osd_epoch_barrier());
-    in->encode_cap_message(m, cap);                         
-    mds->send_message_client_counted(m, it->first);
-  }
-
-  // should we increase max_size?
-  if (in->is_auth() && in->is_file())
-    check_inode_max_size(in);
-}
-
-
-void Locker::revoke_stale_caps(Capability *cap)
-{
-  CInode *in = cap->get_inode();
-  if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
-    // if export succeeds, the cap will be removed. if export fails, we need to
-    // revoke the cap if it's still stale.
-    in->state_set(CInode::STATE_EVALSTALECAPS);
-    return;
-  }
-
-  int issued = cap->issued();
-  if (issued & ~CEPH_CAP_PIN) {
-    dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
-    cap->revoke();
-
-    if (in->is_auth() &&
-       in->inode.client_ranges.count(cap->get_client()))
-      in->state_set(CInode::STATE_NEEDSRECOVER);
-
-    if (!in->filelock.is_stable()) eval_gather(&in->filelock);
-    if (!in->linklock.is_stable()) eval_gather(&in->linklock);
-    if (!in->authlock.is_stable()) eval_gather(&in->authlock);
-    if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
-
-    if (in->is_auth()) {
-      try_eval(in, CEPH_CAP_LOCKS);
-    } else {
-      request_inode_file_caps(in);
-    }
-  }
-}
-
-void Locker::revoke_stale_caps(Session *session)
-{
-  dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
-
-  for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
-    Capability *cap = *p;
-    cap->mark_stale();
-    revoke_stale_caps(cap);
-  }
-}
-
-void Locker::resume_stale_caps(Session *session)
-{
-  dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
-
-  for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
-    Capability *cap = *p;
-    CInode *in = cap->get_inode();
-    assert(in->is_head());
-    if (cap->is_stale()) {
-      dout(10) << " clearing stale flag on " << *in << dendl;
-      cap->clear_stale();
-
-      if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
-       // if export succeeds, the cap will be removed. if export fails,
-       // we need to re-issue the cap if it's not stale.
-       in->state_set(CInode::STATE_EVALSTALECAPS);
-       continue;
-      }
-
-      if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
-       issue_caps(in, cap);
-    }
-  }
-}
-
-void Locker::remove_stale_leases(Session *session)
-{
-  dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
-  xlist<ClientLease*>::iterator p = session->leases.begin();
-  while (!p.end()) {
-    ClientLease *l = *p;
-    ++p;
-    CDentry *parent = static_cast<CDentry*>(l->parent);
-    dout(15) << " removing lease on " << *parent << dendl;
-    parent->remove_client_lease(l, this);
-  }
-}
-
-
-class C_MDL_RequestInodeFileCaps : public LockerContext {
-  CInode *in;
-public:
-  C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
-    in->get(CInode::PIN_PTRWAITER);
-  }
-  void finish(int r) override {
-    if (!in->is_auth())
-      locker->request_inode_file_caps(in);
-    in->put(CInode::PIN_PTRWAITER);
-  }
-};
-
-void Locker::request_inode_file_caps(CInode *in)
-{
-  assert(!in->is_auth());
-
-  int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
-  if (wanted != in->replica_caps_wanted) {
-    // wait for single auth
-    if (in->is_ambiguous_auth()) {
-      in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, 
-                     new C_MDL_RequestInodeFileCaps(this, in));
-      return;
-    }
-
-    mds_rank_t auth = in->authority().first;
-    if (mds->is_cluster_degraded() &&
-       mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
-      mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
-      return;
-    }
-
-    dout(7) << "request_inode_file_caps " << ccap_string(wanted)
-            << " was " << ccap_string(in->replica_caps_wanted) 
-            << " on " << *in << " to mds." << auth << dendl;
-
-    in->replica_caps_wanted = wanted;
-
-    if (!mds->is_cluster_degraded() ||
-       mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
-      mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
-                           auth);
-  }
-}
-
-/* This function DOES put the passed message before returning */
-void Locker::handle_inode_file_caps(MInodeFileCaps *m)
-{
-  // nobody should be talking to us during recovery.
-  assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
-
-  // ok
-  CInode *in = mdcache->get_inode(m->get_ino());
-  mds_rank_t from = mds_rank_t(m->get_source().num());
-
-  assert(in);
-  assert(in->is_auth());
-
-  dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
-
-  if (m->get_caps())
-    in->mds_caps_wanted[from] = m->get_caps();
-  else
-    in->mds_caps_wanted.erase(from);
-
-  try_eval(in, CEPH_CAP_LOCKS);
-  m->put();
-}
-
-
-class C_MDL_CheckMaxSize : public LockerContext {
-  CInode *in;
-  uint64_t new_max_size;
-  uint64_t newsize;
-  utime_t mtime;
-
-public:
-  C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
-                     uint64_t _newsize, utime_t _mtime) :
-    LockerContext(l), in(i),
-    new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
-  {
-    in->get(CInode::PIN_PTRWAITER);
-  }
-  void finish(int r) override {
-    if (in->is_auth())
-      locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
-    in->put(CInode::PIN_PTRWAITER);
-  }
-};
-
-uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size)
-{
-  uint64_t new_max = (size + 1) << 1;
-  uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
-  if (max_inc > 0) {
-    max_inc *= pi->get_layout_size_increment();
-    new_max = MIN(new_max, size + max_inc);
-  }
-  return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
-}
-
-void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
-                                   map<client_t,client_writeable_range_t> *new_ranges,
-                                   bool *max_increased)
-{
-  inode_t *latest = in->get_projected_inode();
-  uint64_t ms;
-  if(latest->has_layout()) {
-    ms = calc_new_max_size(latest, size);
-  } else {
-    // Layout-less directories like ~mds0/, have zero size
-    ms = 0;
-  }
-
-  // increase ranges as appropriate.
-  // shrink to 0 if no WR|BUFFER caps issued.
-  for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
-       p != in->client_caps.end();
-       ++p) {
-    if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
-      client_writeable_range_t& nr = (*new_ranges)[p->first];
-      nr.range.first = 0;
-      if (latest->client_ranges.count(p->first)) {
-       client_writeable_range_t& oldr = latest->client_ranges[p->first];
-       if (ms > oldr.range.last)
-         *max_increased = true;
-       nr.range.last = MAX(ms, oldr.range.last);
-       nr.follows = oldr.follows;
-      } else {
-       *max_increased = true;
-       nr.range.last = ms;
-       nr.follows = in->first - 1;
-      }
-    }
-  }
-}
-
-bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
-                                 uint64_t new_max_size, uint64_t new_size,
-                                 utime_t new_mtime)
-{
-  assert(in->is_auth());
-  assert(in->is_file());
-
-  inode_t *latest = in->get_projected_inode();
-  map<client_t, client_writeable_range_t> new_ranges;
-  uint64_t size = latest->size;
-  bool update_size = new_size > 0;
-  bool update_max = false;
-  bool max_increased = false;
-
-  if (update_size) {
-    new_size = size = MAX(size, new_size);
-    new_mtime = MAX(new_mtime, latest->mtime);
-    if (latest->size == new_size && latest->mtime == new_mtime)
-      update_size = false;
-  }
-
-  calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
-
-  if (max_increased || latest->client_ranges != new_ranges)
-    update_max = true;
-
-  if (!update_size && !update_max) {
-    dout(20) << "check_inode_max_size no-op on " << *in << dendl;
-    return false;
-  }
-
-  dout(10) << "check_inode_max_size new_ranges " << new_ranges
-          << " update_size " << update_size
-          << " on " << *in << dendl;
-
-  if (in->is_frozen()) {
-    dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
-    C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
-                                                     new_max_size,
-                                                     new_size,
-                                                     new_mtime);
-    in->add_waiter(CInode::WAIT_UNFREEZE, cms);
-    return false;
-  }
-  if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
-    // lock?
-    if (in->filelock.is_stable()) {
-      if (in->get_target_loner() >= 0)
-       file_excl(&in->filelock);
-      else
-       simple_lock(&in->filelock);
-    }
-    if (!in->filelock.can_wrlock(in->get_loner())) {
-      // try again later
-      C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
-                                                       new_max_size,
-                                                       new_size,
-                                                       new_mtime);
-
-      in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
-      dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
-      return false;    
-    }
-  }
-
-  MutationRef mut(new MutationImpl());
-  mut->ls = mds->mdlog->get_current_segment();
-    
-  inode_t *pi = in->project_inode();
-  pi->version = in->pre_dirty();
-
-  if (update_max) {
-    dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
-    pi->client_ranges = new_ranges;
-  }
-
-  if (update_size) {
-    dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl;
-    pi->size = new_size;
-    pi->rstat.rbytes = new_size;
-    dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl;
-    pi->mtime = new_mtime;
-  }
-
-  // use EOpen if the file is still open; otherwise, use EUpdate.
-  // this is just an optimization to push open files forward into
-  // newer log segments.
-  LogEvent *le;
-  EMetaBlob *metablob;
-  if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {   
-    EOpen *eo = new EOpen(mds->mdlog);
-    eo->add_ino(in->ino());
-    metablob = &eo->metablob;
-    le = eo;
-    mut->ls->open_files.push_back(&in->item_open_file);
-  } else {
-    EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
-    metablob = &eu->metablob;
-    le = eu;
-  }
-  mds->mdlog->start_entry(le);
-  if (update_size) {  // FIXME if/when we do max_size nested accounting
-    mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
-    // no cow, here!
-    CDentry *parent = in->get_projected_parent_dn();
-    metablob->add_primary_dentry(parent, in, true);
-  } else {
-    metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
-    mdcache->journal_dirty_inode(mut.get(), metablob, in);
-  }
-  mds->mdlog->submit_entry(le,
-          new C_Locker_FileUpdate_finish(this, in, mut, true));
-  wrlock_force(&in->filelock, mut);  // wrlock for duration of journal
-  mut->auth_pin(in);
-
-  // make max_size _increase_ timely
-  if (max_increased)
-    mds->mdlog->flush();
-
-  return true;
-}
-
-
-void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
-{
-  /*
-   * only share if currently issued a WR cap.  if client doesn't have it,
-   * file_max doesn't matter, and the client will get it if/when they get
-   * the cap later.
-   */
-  dout(10) << "share_inode_max_size on " << *in << dendl;
-  map<client_t, Capability*>::iterator it;
-  if (only_cap)
-    it = in->client_caps.find(only_cap->get_client());
-  else
-    it = in->client_caps.begin();
-  for (; it != in->client_caps.end(); ++it) {
-    const client_t client = it->first;
-    Capability *cap = it->second;
-    if (cap->is_suppress())
-      continue;
-    if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
-      dout(10) << "share_inode_max_size with client." << client << dendl;
-      cap->inc_last_seq();
-      MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
-                                      in->ino(),
-                                      in->find_snaprealm()->inode->ino(),
-                                      cap->get_cap_id(), cap->get_last_seq(),
-                                      cap->pending(), cap->wanted(), 0,
-                                       cap->get_mseq(),
-                                       mds->get_osd_epoch_barrier());
-      in->encode_cap_message(m, cap);
-      mds->send_message_client_counted(m, client);
-    }
-    if (only_cap)
-      break;
-  }
-}
-
-bool Locker::_need_flush_mdlog(CInode *in, int wanted)
-{
-  /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
-   * pending mutations. */
-  if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
-       in->filelock.is_unstable_and_locked()) ||
-      ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
-       in->authlock.is_unstable_and_locked()) ||
-      ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
-       in->linklock.is_unstable_and_locked()) ||
-      ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
-       in->xattrlock.is_unstable_and_locked()))
-    return true;
-  return false;
-}
-
-void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
-{
-  if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
-    dout(10) << " wanted " << ccap_string(cap->wanted())
-            << " -> " << ccap_string(wanted) << dendl;
-    cap->set_wanted(wanted);
-  } else if (wanted & ~cap->wanted()) {
-    dout(10) << " wanted " << ccap_string(cap->wanted())
-            << " -> " << ccap_string(wanted)
-            << " (added caps even though we had seq mismatch!)" << dendl;
-    cap->set_wanted(wanted | cap->wanted());
-  } else {
-    dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
-            << " -> " << ccap_string(wanted)
-            << " (issue_seq " << issue_seq << " != last_issue "
-            << cap->get_last_issue() << ")" << dendl;
-    return;
-  }
-
-  CInode *cur = cap->get_inode();
-  if (!cur->is_auth()) {
-    request_inode_file_caps(cur);
-    return;
-  }
-
-  if (cap->wanted() == 0) {
-    if (cur->item_open_file.is_on_list() &&
-       !cur->is_any_caps_wanted()) {
-      dout(10) << " removing unwanted file from open file list " << *cur << dendl;
-      cur->item_open_file.remove_myself();
-    }
-  } else {
-    if (cur->state_test(CInode::STATE_RECOVERING) &&
-       (cap->wanted() & (CEPH_CAP_FILE_RD |
-                         CEPH_CAP_FILE_WR))) {
-      mds->mdcache->recovery_queue.prioritize(cur);
-    }
-
-    if (!cur->item_open_file.is_on_list()) {
-      dout(10) << " adding to open file list " << *cur << dendl;
-      assert(cur->last == CEPH_NOSNAP);
-      LogSegment *ls = mds->mdlog->get_current_segment();
-      EOpen *le = new EOpen(mds->mdlog);
-      mds->mdlog->start_entry(le);
-      le->add_clean_inode(cur);
-      ls->open_files.push_back(&cur->item_open_file);
-      mds->mdlog->submit_entry(le);
-    }
-  }
-}
-
-
-
-void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
-{
-  dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
-  for (auto p = head_in->client_need_snapflush.begin();
-       p != head_in->client_need_snapflush.end() && p->first < last; ) {
-    snapid_t snapid = p->first;
-    set<client_t>& clients = p->second;
-    ++p;  // be careful, q loop below depends on this
-
-    if (clients.count(client)) {
-      dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
-      CInode *sin = mdcache->get_inode(head_in->ino(), snapid);
-      if (!sin) {
-       // hrm, look forward until we find the inode. 
-       //  (we can only look it up by the last snapid it is valid for)
-       dout(10) << " didn't have " << head_in->ino() << " snapid " << snapid << dendl;
-       for (compact_map<snapid_t, set<client_t> >::iterator q = p;  // p is already at next entry
-            q != head_in->client_need_snapflush.end();
-            ++q) {
-         dout(10) << " trying snapid " << q->first << dendl;
-         sin = mdcache->get_inode(head_in->ino(), q->first);
-         if (sin) {
-           assert(sin->first <= snapid);
-           break;
-         }
-         dout(10) << " didn't have " << head_in->ino() << " snapid " << q->first << dendl;
-       }
-       if (!sin && head_in->is_multiversion())
-         sin = head_in;
-       assert(sin);
-      }
-      _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
-      head_in->remove_need_snapflush(sin, snapid, client);
-    }
-  }
-}
-
-
-bool Locker::should_defer_client_cap_frozen(CInode *in)
-{
-  /*
-   * This policy needs to be AT LEAST as permissive as allowing a client request
-   * to go forward, or else a client request can release something, the release
-   * gets deferred, but the request gets processed and deadlocks because when the
-   * caps can't get revoked.
-   *
-   * Currently, a request wait if anything locked is freezing (can't
-   * auth_pin), which would avoid any deadlock with cap release.  Thus @in
-   * _MUST_ be in the lock/auth_pin set.
-   *
-   * auth_pins==0 implies no unstable lock and not auth pinnned by
-   * client request, otherwise continue even it's freezing.
-   */
-  return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
-}
-
-/*
- * This function DOES put the passed message before returning
- */
-void Locker::handle_client_caps(MClientCaps *m)
-{
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
-  client_t client = m->get_source().num();
-
-  snapid_t follows = m->get_snap_follows();
-  dout(7) << "handle_client_caps "
-         << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
-         << " on " << m->get_ino()
-         << " tid " << m->get_client_tid() << " follows " << follows
-         << " op " << ceph_cap_op_name(m->get_op()) << dendl;
-
-  if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
-    if (!session) {
-      dout(5) << " no session, dropping " << *m << dendl;
-      m->put();
-      return;
-    }
-    if (session->is_closed() ||
-       session->is_closing() ||
-       session->is_killing()) {
-      dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
-      m->put();
-      return;
-    }
-    if (mds->is_reconnect() &&
-       m->get_dirty() && m->get_client_tid() > 0 &&
-       !session->have_completed_flush(m->get_client_tid())) {
-      mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
-    }
-    mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
-    return;
-  }
-
-  if (m->get_client_tid() > 0 && session &&
-      session->have_completed_flush(m->get_client_tid())) {
-    dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
-           << " for client." << client << dendl;
-    MClientCaps *ack;
-    if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
-      ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
-                           m->get_dirty(), 0, mds->get_osd_epoch_barrier());
-    } else {
-      ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
-                           m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
-                           mds->get_osd_epoch_barrier());
-    }
-    ack->set_snap_follows(follows);
-    ack->set_client_tid(m->get_client_tid());
-    mds->send_message_client_counted(ack, m->get_connection());
-    if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
-      m->put();
-      return;
-    } else {
-      // fall-thru because the message may release some caps
-      m->clear_dirty();
-      m->set_op(CEPH_CAP_OP_UPDATE);
-    }
-  }
-
-  // "oldest flush tid" > 0 means client uses unique TID for each flush
-  if (m->get_oldest_flush_tid() > 0 && session) {
-    if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
-      mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
-
-      if (session->get_num_trim_flushes_warnings() > 0 &&
-         session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
-       session->reset_num_trim_flushes_warnings();
-    } else {
-      if (session->get_num_completed_flushes() >=
-         (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
-       session->inc_num_trim_flushes_warnings();
-       stringstream ss;
-       ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
-          << m->get_oldest_flush_tid() << "), "
-          << session->get_num_completed_flushes()
-          << " completed flushes recorded in session";
-       mds->clog->warn() << ss.str();
-       dout(20) << __func__ << " " << ss.str() << dendl;
-      }
-    }
-  }
-
-  CInode *head_in = mdcache->get_inode(m->get_ino());
-  if (!head_in) {
-    if (mds->is_clientreplay()) {
-      dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
-       << ", will try again after replayed client requests" << dendl;
-      mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
-      return;
-    }
-    dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
-    m->put();
-    return;
-  }
-
-  if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
-    // Pause RADOS operations until we see the required epoch
-    mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
-  }
-
-  if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
-    // Record the barrier so that we will retransmit it to clients
-    mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
-  }
-
-  CInode *in = head_in;
-  if (follows > 0) {
-    in = mdcache->pick_inode_snap(head_in, follows);
-    if (in != head_in)
-      dout(10) << " head inode " << *head_in << dendl;
-  }
-  dout(10) << "  cap inode " << *in << dendl;
-
-  Capability *cap = 0;
-  cap = in->get_client_cap(client);
-  if (!cap && in != head_in)
-    cap = head_in->get_client_cap(client);
-  if (!cap) {
-    dout(7) << "handle_client_caps no cap for client." << client << " on " << *in << dendl;
-    m->put();
-    return;
-  }  
-  assert(cap);
-
-  // freezing|frozen?
-  if (should_defer_client_cap_frozen(in)) {
-    dout(7) << "handle_client_caps freezing|frozen on " << *in << dendl;
-    in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
-    return;
-  }
-  if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
-    dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
-           << ", dropping" << dendl;
-    m->put();
-    return;
-  }
-
-  int op = m->get_op();
-
-  // flushsnap?
-  if (op == CEPH_CAP_OP_FLUSHSNAP) {
-    if (!in->is_auth()) {
-      dout(7) << " not auth, ignoring flushsnap on " << *in << dendl;
-      goto out;
-    }
-
-    SnapRealm *realm = in->find_snaprealm();
-    snapid_t snap = realm->get_snap_following(follows);
-    dout(10) << "  flushsnap follows " << follows << " -> snap " << snap << dendl;
-
-    // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
-    // other cap ops.  (except possibly duplicate FLUSHSNAP requests, but worst
-    // case we get a dup response, so whatever.)
-    MClientCaps *ack = 0;
-    if (m->get_dirty()) {
-      ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
-      ack->set_snap_follows(follows);
-      ack->set_client_tid(m->get_client_tid());
-      ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
-    }
-
-    if (in == head_in ||
-       (head_in->client_need_snapflush.count(snap) &&
-        head_in->client_need_snapflush[snap].count(client))) {
-      dout(7) << " flushsnap snap " << snap
-             << " client." << client << " on " << *in << dendl;
-
-      // this cap now follows a later snap (i.e. the one initiating this flush, or later)
-      if (in == head_in)
-       cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
-      else if (head_in->client_need_snapflush.begin()->first < snap)
-       _do_null_snapflush(head_in, client, snap);
-   
-      _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
-
-      if (in != head_in)
-       head_in->remove_need_snapflush(in, snap, client);
-      
-    } else {
-      dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
-      if (ack)
-       mds->send_message_client_counted(ack, m->get_connection());
-    }
-    goto out;
-  }
-
-  if (cap->get_cap_id() != m->get_cap_id()) {
-    dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
-  } else {
-    // intermediate snap inodes
-    while (in != head_in) {
-      assert(in->last != CEPH_NOSNAP);
-      if (in->is_auth() && m->get_dirty()) {
-       dout(10) << " updating intermediate snapped inode " << *in << dendl;
-       _do_cap_update(in, NULL, m->get_dirty(), follows, m);
-      }
-      in = mdcache->pick_inode_snap(head_in, in->last);
-    }
-    // head inode, and cap
-    MClientCaps *ack = 0;
-
-    int caps = m->get_caps();
-    if (caps & ~cap->issued()) {
-      dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
-      caps &= cap->issued();
-    }
-    
-    cap->confirm_receipt(m->get_seq(), caps);
-    dout(10) << " follows " << follows
-            << " retains " << ccap_string(m->get_caps())
-            << " dirty " << ccap_string(m->get_dirty())
-            << " on " << *in << dendl;
-
-
-    // missing/skipped snapflush?
-    //  The client MAY send a snapflush if it is issued WR/EXCL caps, but
-    //  presently only does so when it has actual dirty metadata.  But, we
-    //  set up the need_snapflush stuff based on the issued caps.
-    //  We can infer that the client WONT send a FLUSHSNAP once they have
-    //  released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
-    //  update/release).
-    if (!head_in->client_need_snapflush.empty()) {
-      if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
-       _do_null_snapflush(head_in, client);
-      } else {
-       dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
-      }
-    }
-    
-    if (m->get_dirty() && in->is_auth()) {
-      dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty()) 
-             << " seq " << m->get_seq() << " on " << *in << dendl;
-      ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
-                           m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
-      ack->set_client_tid(m->get_client_tid());
-      ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
-    }
-
-    // filter wanted based on what we could ever give out (given auth/replica status)
-    bool need_flush = m->flags & CLIENT_CAPS_SYNC;
-    int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
-    if (new_wanted != cap->wanted()) {
-      if (!need_flush && (new_wanted & ~cap->pending())) {
-       // exapnding caps.  make sure we aren't waiting for a log flush
-       need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
-      }
-
-      adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
-    }
-      
-    if (in->is_auth() &&
-       _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
-      // updated
-      eval(in, CEPH_CAP_LOCKS);
-
-      if (!need_flush && (cap->wanted() & ~cap->pending()))
-       need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
-    } else {
-      // no update, ack now.
-      if (ack)
-       mds->send_message_client_counted(ack, m->get_connection());
-      
-      bool did_issue = eval(in, CEPH_CAP_LOCKS);
-      if (!did_issue && (cap->wanted() & ~cap->pending()))
-       issue_caps(in, cap);
-
-      if (cap->get_last_seq() == 0 &&
-         (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
-       cap->issue_norevoke(cap->issued());
-       share_inode_max_size(in, cap);
-      }
-    }
-
-    if (need_flush)
-      mds->mdlog->flush();
-  }
-
- out:
-  m->put();
-}
-
-
-class C_Locker_RetryRequestCapRelease : public LockerContext {
-  client_t client;
-  ceph_mds_request_release item;
-public:
-  C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
-    LockerContext(l), client(c), item(it) { }
-  void finish(int r) override {
-    string dname;
-    MDRequestRef null_ref;
-    locker->process_request_cap_release(null_ref, client, item, dname);
-  }
-};
-
-void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
-                                        const string &dname)
-{
-  inodeno_t ino = (uint64_t)item.ino;
-  uint64_t cap_id = item.cap_id;
-  int caps = item.caps;
-  int wanted = item.wanted;
-  int seq = item.seq;
-  int issue_seq = item.issue_seq;
-  int mseq = item.mseq;
-
-  CInode *in = mdcache->get_inode(ino);
-  if (!in)
-    return;
-
-  if (dname.length()) {
-    frag_t fg = in->pick_dirfrag(dname);
-    CDir *dir = in->get_dirfrag(fg);
-    if (dir) {
-      CDentry *dn = dir->lookup(dname);
-      if (dn) {
-       ClientLease *l = dn->get_client_lease(client);
-       if (l) {
-         dout(10) << "process_cap_release removing lease on " << *dn << dendl;
-         dn->remove_client_lease(l, this);
-       } else {
-         dout(7) << "process_cap_release client." << client
-                 << " doesn't have lease on " << *dn << dendl;
-       }
-      } else {
-       dout(7) << "process_cap_release client." << client << " released lease on dn "
-               << dir->dirfrag() << "/" << dname << " which dne" << dendl;
-      }
-    }
-  }
-
-  Capability *cap = in->get_client_cap(client);
-  if (!cap)
-    return;
-
-  dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
-          << (mdr ? "" : " (DEFERRED, no mdr)")
-          << dendl;
-    
-  if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
-    dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
-    return;
-  }
-
-  if (cap->get_cap_id() != cap_id) {
-    dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
-    return;
-  }
-
-  if (should_defer_client_cap_frozen(in)) {
-    dout(7) << " frozen, deferring" << dendl;
-    in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
-    return;
-  }
-    
-  if (caps & ~cap->issued()) {
-    dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
-    caps &= cap->issued();
-  }
-  cap->confirm_receipt(seq, caps);
-
-  if (!in->client_need_snapflush.empty() &&
-      (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
-    _do_null_snapflush(in, client);
-  }
-
-  adjust_cap_wanted(cap, wanted, issue_seq);
-  
-  if (mdr)
-    cap->inc_suppress();
-  eval(in, CEPH_CAP_LOCKS);
-  if (mdr)
-    cap->dec_suppress();
-  
-  // take note; we may need to reissue on this cap later
-  if (mdr)
-    mdr->cap_releases[in->vino()] = cap->get_last_seq();
-}
-
-class C_Locker_RetryKickIssueCaps : public LockerContext {
-  CInode *in;
-  client_t client;
-  ceph_seq_t seq;
-public:
-  C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
-    LockerContext(l), in(i), client(c), seq(s) {
-    in->get(CInode::PIN_PTRWAITER);
-  }
-  void finish(int r) override {
-    locker->kick_issue_caps(in, client, seq);
-    in->put(CInode::PIN_PTRWAITER);
-  }
-};
-
-void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
-{
-  Capability *cap = in->get_client_cap(client);
-  if (!cap || cap->get_last_sent() != seq)
-    return;
-  if (in->is_frozen()) {
-    dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
-    in->add_waiter(CInode::WAIT_UNFREEZE,
-       new C_Locker_RetryKickIssueCaps(this, in, client, seq));
-    return;
-  }
-  dout(10) << "kick_issue_caps released at current seq " << seq
-    << ", reissuing" << dendl;
-  issue_caps(in, cap);
-}
-
-void Locker::kick_cap_releases(MDRequestRef& mdr)
-{
-  client_t client = mdr->get_client();
-  for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
-       p != mdr->cap_releases.end();
-       ++p) {
-    CInode *in = mdcache->get_inode(p->first);
-    if (!in)
-      continue;
-    kick_issue_caps(in, client, p->second);
-  }
-}
-
-/**
- * m and ack might be NULL, so don't dereference them unless dirty != 0
- */
-void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
-{
-  dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
-          << " follows " << follows << " snap " << snap
-          << " on " << *in << dendl;
-
-  if (snap == CEPH_NOSNAP) {
-    // hmm, i guess snap was already deleted?  just ack!
-    dout(10) << " wow, the snap following " << follows
-            << " was already deleted.  nothing to record, just ack." << dendl;
-    if (ack)
-      mds->send_message_client_counted(ack, m->get_connection());
-    return;
-  }
-
-  EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
-  mds->mdlog->start_entry(le);
-  MutationRef mut = new MutationImpl();
-  mut->ls = mds->mdlog->get_current_segment();
-
-  // normal metadata updates that we can apply to the head as well.
-
-  // update xattrs?
-  bool xattrs = false;
-  map<string,bufferptr> *px = 0;
-  if ((dirty & CEPH_CAP_XATTR_EXCL) && 
-      m->xattrbl.length() &&
-      m->head.xattr_version > in->get_projected_inode()->xattr_version)
-    xattrs = true;
-
-  old_inode_t *oi = 0;
-  if (in->is_multiversion()) {
-    oi = in->pick_old_inode(snap);
-  }
-
-  inode_t *pi;
-  if (oi) {
-    dout(10) << " writing into old inode" << dendl;
-    pi = in->project_inode();
-    pi->version = in->pre_dirty();
-    if (snap > oi->first)
-      in->split_old_inode(snap);
-    pi = &oi->inode;
-    if (xattrs)
-      px = &oi->xattrs;
-  } else {
-    if (xattrs)
-      px = new map<string,bufferptr>;
-    pi = in->project_inode(px);
-    pi->version = in->pre_dirty();
-  }
-
-  _update_cap_fields(in, dirty, m, pi);
-
-  // xattr
-  if (px) {
-    dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version
-           << " len " << m->xattrbl.length() << dendl;
-    pi->xattr_version = m->head.xattr_version;
-    bufferlist::iterator p = m->xattrbl.begin();
-    ::decode(*px, p);
-  }
-
-  if (pi->client_ranges.count(client)) {
-    if (in->last == snap) {
-      dout(10) << "  removing client_range entirely" << dendl;
-      pi->client_ranges.erase(client);
-    } else {
-      dout(10) << "  client_range now follows " << snap << dendl;
-      pi->client_ranges[client].follows = snap;
-    }
-  }
-
-  mut->auth_pin(in);
-  mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
-  mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
-
-  // "oldest flush tid" > 0 means client uses unique TID for each flush
-  if (ack && ack->get_oldest_flush_tid() > 0)
-    le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
-                                 ack->get_oldest_flush_tid());
-
-  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
-                                                             client, ack));
-}
-
-void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi)
-{
-  if (dirty == 0)
-    return;
-
-  /* m must be valid if there are dirty caps */
-  assert(m);
-  uint64_t features = m->get_connection()->get_features();
-
-  if (m->get_ctime() > pi->ctime) {
-    dout(7) << "  ctime " << pi->ctime << " -> " << m->get_ctime()
-           << " for " << *in << dendl;
-    pi->ctime = m->get_ctime();
-  }
-
-  if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
-      m->get_change_attr() > pi->change_attr) {
-    dout(7) << "  change_attr " << pi->change_attr << " -> " << m->get_change_attr()
-           << " for " << *in << dendl;
-    pi->change_attr = m->get_change_attr();
-  }
-
-  // file
-  if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
-    utime_t atime = m->get_atime();
-    utime_t mtime = m->get_mtime();
-    uint64_t size = m->get_size();
-    version_t inline_version = m->inline_version;
-    
-    if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
-       ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
-      dout(7) << "  mtime " << pi->mtime << " -> " << mtime
-             << " for " << *in << dendl;
-      pi->mtime = mtime;
-    }
-    if (in->inode.is_file() &&   // ONLY if regular file
-       size > pi->size) {
-      dout(7) << "  size " << pi->size << " -> " << size
-             << " for " << *in << dendl;
-      pi->size = size;
-      pi->rstat.rbytes = size;
-    }
-    if (in->inode.is_file() &&
-        (dirty & CEPH_CAP_FILE_WR) &&
-        inline_version > pi->inline_data.version) {
-      pi->inline_data.version = inline_version;
-      if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
-       pi->inline_data.get_data() = m->inline_data;
-      else
-       pi->inline_data.free_data();
-    }
-    if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
-      dout(7) << "  atime " << pi->atime << " -> " << atime
-             << " for " << *in << dendl;
-      pi->atime = atime;
-    }
-    if ((dirty & CEPH_CAP_FILE_EXCL) &&
-       ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
-      dout(7) << "  time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
-             << " for " << *in << dendl;
-      pi->time_warp_seq = m->get_time_warp_seq();
-    }
-  }
-  // auth
-  if (dirty & CEPH_CAP_AUTH_EXCL) {
-    if (m->head.uid != pi->uid) {
-      dout(7) << "  uid " << pi->uid
-             << " -> " << m->head.uid
-             << " for " << *in << dendl;
-      pi->uid = m->head.uid;
-    }
-    if (m->head.gid != pi->gid) {
-      dout(7) << "  gid " << pi->gid
-             << " -> " << m->head.gid
-             << " for " << *in << dendl;
-      pi->gid = m->head.gid;
-    }
-    if (m->head.mode != pi->mode) {
-      dout(7) << "  mode " << oct << pi->mode
-             << " -> " << m->head.mode << dec
-             << " for " << *in << dendl;
-      pi->mode = m->head.mode;
-    }
-    if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
-      dout(7) << "  btime " << oct << pi->btime
-             << " -> " << m->get_btime() << dec
-             << " for " << *in << dendl;
-      pi->btime = m->get_btime();
-    }
-  }
-}
-
-/*
- * update inode based on cap flush|flushsnap|wanted.
- *  adjust max_size, if needed.
- * if we update, return true; otherwise, false (no updated needed).
- */
-bool Locker::_do_cap_update(CInode *in, Capability *cap,
-                           int dirty, snapid_t follows,
-                           MClientCaps *m, MClientCaps *ack,
-                           bool *need_flush)
-{
-  dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
-          << " issued " << ccap_string(cap ? cap->issued() : 0)
-          << " wanted " << ccap_string(cap ? cap->wanted() : 0)
-          << " on " << *in << dendl;
-  assert(in->is_auth());
-  client_t client = m->get_source().num();
-  inode_t *latest = in->get_projected_inode();
-
-  // increase or zero max_size?
-  uint64_t size = m->get_size();
-  bool change_max = false;
-  uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
-  uint64_t new_max = old_max;
-  
-  if (in->is_file()) {
-    bool forced_change_max = false;
-    dout(20) << "inode is file" << dendl;
-    if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
-      dout(20) << "client has write caps; m->get_max_size="
-               << m->get_max_size() << "; old_max=" << old_max << dendl;
-      if (m->get_max_size() > new_max) {
-       dout(10) << "client requests file_max " << m->get_max_size()
-                << " > max " << old_max << dendl;
-       change_max = true;
-       forced_change_max = true;
-       new_max = calc_new_max_size(latest, m->get_max_size());
-      } else {
-       new_max = calc_new_max_size(latest, size);
-
-       if (new_max > old_max)
-         change_max = true;
-       else
-         new_max = old_max;
-      }
-    } else {
-      if (old_max) {
-       change_max = true;
-       new_max = 0;
-      }
-    }
-
-    if (in->last == CEPH_NOSNAP &&
-       change_max &&
-       !in->filelock.can_wrlock(client) &&
-       !in->filelock.can_force_wrlock(client)) {
-      dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
-      if (in->filelock.is_stable()) {
-       bool need_issue = false;
-       if (cap)
-         cap->inc_suppress();
-       if (in->mds_caps_wanted.empty() &&
-           (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
-         if (in->filelock.get_state() != LOCK_EXCL)
-           file_excl(&in->filelock, &need_issue);
-       } else
-         simple_lock(&in->filelock, &need_issue);
-       if (need_issue)
-         issue_caps(in);
-       if (cap)
-         cap->dec_suppress();
-      }
-      if (!in->filelock.can_wrlock(client) &&
-         !in->filelock.can_force_wrlock(client)) {
-       C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
-                                                        forced_change_max ? new_max : 0,
-                                                        0, utime_t());
-
-       in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
-       change_max = false;
-      }
-    }
-  }
-
-  if (m->flockbl.length()) {
-    int32_t num_locks;
-    bufferlist::iterator bli = m->flockbl.begin();
-    ::decode(num_locks, bli);
-    for ( int i=0; i < num_locks; ++i) {
-      ceph_filelock decoded_lock;
-      ::decode(decoded_lock, bli);
-      in->get_fcntl_lock_state()->held_locks.
-       insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
-      ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
-    }
-    ::decode(num_locks, bli);
-    for ( int i=0; i < num_locks; ++i) {
-      ceph_filelock decoded_lock;
-      ::decode(decoded_lock, bli);
-      in->get_flock_lock_state()->held_locks.
-       insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
-      ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
-    }
-  }
-
-  if (!dirty && !change_max)
-    return false;
-
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
-  if (session->check_access(in, MAY_WRITE,
-                           m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
-    session->put();
-    dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
-    return false;
-  }
-  session->put();
-
-  // do the update.
-  EUpdate *le = new EUpdate(mds->mdlog, "cap update");
-  mds->mdlog->start_entry(le);
-
-  // xattrs update?
-  map<string,bufferptr> *px = 0;
-  if ((dirty & CEPH_CAP_XATTR_EXCL) && 
-      m->xattrbl.length() &&
-      m->head.xattr_version > in->get_projected_inode()->xattr_version)
-    px = new map<string,bufferptr>;
-
-  inode_t *pi = in->project_inode(px);
-  pi->version = in->pre_dirty();
-
-  MutationRef mut(new MutationImpl());
-  mut->ls = mds->mdlog->get_current_segment();
-
-  _update_cap_fields(in, dirty, m, pi);
-
-  if (change_max) {
-    dout(7) << "  max_size " << old_max << " -> " << new_max
-           << " for " << *in << dendl;
-    if (new_max) {
-      pi->client_ranges[client].range.first = 0;
-      pi->client_ranges[client].range.last = new_max;
-      pi->client_ranges[client].follows = in->first - 1;
-    } else 
-      pi->client_ranges.erase(client);
-  }
-    
-  if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) 
-    wrlock_force(&in->filelock, mut);  // wrlock for duration of journal
-
-  // auth
-  if (dirty & CEPH_CAP_AUTH_EXCL)
-    wrlock_force(&in->authlock, mut);
-
-  // xattr
-  if (px) {
-    dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl;
-    pi->xattr_version = m->head.xattr_version;
-    bufferlist::iterator p = m->xattrbl.begin();
-    ::decode(*px, p);
-
-    wrlock_force(&in->xattrlock, mut);
-  }
-  
-  mut->auth_pin(in);
-  mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
-  mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
-
-  // "oldest flush tid" > 0 means client uses unique TID for each flush
-  if (ack && ack->get_oldest_flush_tid() > 0)
-    le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
-                                 ack->get_oldest_flush_tid());
-
-  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
-                                                             change_max, !!cap,
-                                                             client, ack));
-  if (need_flush && !*need_flush &&
-      ((change_max && new_max) || // max INCREASE
-       _need_flush_mdlog(in, dirty)))
-    *need_flush = true;
-
-  return true;
-}
-
-/* This function DOES put the passed message before returning */
-void Locker::handle_client_cap_release(MClientCapRelease *m)
-{
-  client_t client = m->get_source().num();
-  dout(10) << "handle_client_cap_release " << *m << dendl;
-
-  if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
-    mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
-    return;
-  }
-
-  if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
-    // Pause RADOS operations until we see the required epoch
-    mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
-  }
-
-  if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
-    // Record the barrier so that we will retransmit it to clients
-    mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
-  }
-
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
-
-  for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
-    _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
-  }
-
-  if (session) {
-    session->notify_cap_release(m->caps.size());
-  }
-
-  m->put();
-}
-
-class C_Locker_RetryCapRelease : public LockerContext {
-  client_t client;
-  inodeno_t ino;
-  uint64_t cap_id;
-  ceph_seq_t migrate_seq;
-  ceph_seq_t issue_seq;
-public:
-  C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
-                          ceph_seq_t mseq, ceph_seq_t seq) :
-    LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
-  void finish(int r) override {
-    locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
-  }
-};
-
-void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
-                            ceph_seq_t mseq, ceph_seq_t seq)
-{
-  CInode *in = mdcache->get_inode(ino);
-  if (!in) {
-    dout(7) << "_do_cap_release missing ino " << ino << dendl;
-    return;
-  }
-  Capability *cap = in->get_client_cap(client);
-  if (!cap) {
-    dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
-    return;
-  }
-
-  dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
-  if (cap->get_cap_id() != cap_id) {
-    dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
-    return;
-  }
-  if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
-    dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
-    return;
-  }
-  if (should_defer_client_cap_frozen(in)) {
-    dout(7) << " freezing|frozen, deferring" << dendl;
-    in->add_waiter(CInode::WAIT_UNFREEZE,
-                  new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
-    return;
-  }
-  if (seq != cap->get_last_issue()) {
-    dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
-    // clean out any old revoke history
-    cap->clean_revoke_from(seq);
-    eval_cap_gather(in);
-    return;
-  }
-  remove_client_cap(in, client);
-}
-
-/* This function DOES put the passed message before returning */
-
-void Locker::remove_client_cap(CInode *in, client_t client)
-{
-  // clean out any pending snapflush state
-  if (!in->client_need_snapflush.empty())
-    _do_null_snapflush(in, client);
-
-  in->remove_client_cap(client);
-
-  if (in->is_auth()) {
-    // make sure we clear out the client byte range
-    if (in->get_projected_inode()->client_ranges.count(client) &&
-       !(in->inode.nlink == 0 && !in->is_any_caps()))    // unless it's unlink + stray
-      check_inode_max_size(in);
-  } else {
-    request_inode_file_caps(in);
-  }
-  
-  try_eval(in, CEPH_CAP_LOCKS);
-}
-
-
-/**
- * Return true if any currently revoking caps exceed the
- * mds_revoke_cap_timeout threshold.
- */
-bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
-{
-    xlist<Capability*>::const_iterator p = revoking.begin();
-    if (p.end()) {
-      // No revoking caps at the moment
-      return false;
-    } else {
-      utime_t now = ceph_clock_now();
-      utime_t age = now - (*p)->get_last_revoke_stamp();
-      if (age <= g_conf->mds_revoke_cap_timeout) {
-          return false;
-      } else {
-          return true;
-      }
-    }
-}
-
-
-void Locker::get_late_revoking_clients(std::list<client_t> *result) const
-{
-  if (!any_late_revoking_caps(revoking_caps)) {
-    // Fast path: no misbehaving clients, execute in O(1)
-    return;
-  }
-
-  // Slow path: execute in O(N_clients)
-  std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
-  for (client_rc_iter = revoking_caps_by_client.begin();
-       client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
-    xlist<Capability*> const &client_rc = client_rc_iter->second;
-    bool any_late = any_late_revoking_caps(client_rc);
-    if (any_late) {
-        result->push_back(client_rc_iter->first);
-    }
-  }
-}
-
-// Hard-code instead of surfacing a config settings because this is
-// really a hack that should go away at some point when we have better
-// inspection tools for getting at detailed cap state (#7316)
-#define MAX_WARN_CAPS 100
-
-void Locker::caps_tick()
-{
-  utime_t now = ceph_clock_now();
-
-  dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
-
-  int i = 0;
-  for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
-    Capability *cap = *p;
-
-    utime_t age = now - cap->get_last_revoke_stamp();
-    dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
-    if (age <= g_conf->mds_revoke_cap_timeout) {
-      dout(20) << __func__ << " age below timeout " << g_conf->mds_revoke_cap_timeout << dendl;
-      break;
-    } else {
-      ++i;
-      if (i > MAX_WARN_CAPS) {
-        dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
-          << "revoking, ignoring subsequent caps" << dendl;
-        break;
-      }
-    }
-    // exponential backoff of warning intervals
-    if (age > g_conf->mds_revoke_cap_timeout * (1 << cap->get_num_revoke_warnings())) {
-      cap->inc_num_revoke_warnings();
-      stringstream ss;
-      ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
-        << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
-        << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
-      mds->clog->warn() << ss.str();
-      dout(20) << __func__ << " " << ss.str() << dendl;
-    } else {
-      dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
-    }
-  }
-}
-
-
-void Locker::handle_client_lease(MClientLease *m)
-{
-  dout(10) << "handle_client_lease " << *m << dendl;
-
-  assert(m->get_source().is_client());
-  client_t client = m->get_source().num();
-
-  CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
-  if (!in) {
-    dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
-    m->put();
-    return;
-  }
-  CDentry *dn = 0;
-
-  frag_t fg = in->pick_dirfrag(m->dname);
-  CDir *dir = in->get_dirfrag(fg);
-  if (dir) 
-    dn = dir->lookup(m->dname);
-  if (!dn) {
-    dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
-    m->put();
-    return;
-  }
-  dout(10) << " on " << *dn << dendl;
-
-  // replica and lock
-  ClientLease *l = dn->get_client_lease(client);
-  if (!l) {
-    dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
-    m->put();
-    return;
-  } 
-
-  switch (m->get_action()) {
-  case CEPH_MDS_LEASE_REVOKE_ACK:
-  case CEPH_MDS_LEASE_RELEASE:
-    if (l->seq != m->get_seq()) {
-      dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
-    } else {
-      dout(7) << "handle_client_lease client." << client
-             << " on " << *dn << dendl;
-      dn->remove_client_lease(l, this);
-    }
-    m->put();
-    break;
-
-  case CEPH_MDS_LEASE_RENEW:
-    {
-      dout(7) << "handle_client_lease client." << client << " renew on " << *dn
-             << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
-      if (dn->lock.can_lease(client)) {
-       int pool = 1;   // fixme.. do something smart!
-       m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
-       m->h.seq = ++l->seq;
-       m->clear_payload();
-
-       utime_t now = ceph_clock_now();
-       now += mdcache->client_lease_durations[pool];
-       mdcache->touch_client_lease(l, pool, now);
-
-       mds->send_message_client_counted(m, m->get_connection());
-      }
-    }
-    break;
-
-  default:
-    ceph_abort(); // implement me
-    break;
-  }
-}
-
-
-void Locker::issue_client_lease(CDentry *dn, client_t client,
-                              bufferlist &bl, utime_t now, Session *session)
-{
-  CInode *diri = dn->get_dir()->get_inode();
-  if (!diri->is_stray() &&  // do not issue dn leases in stray dir!
-      ((!diri->filelock.can_lease(client) &&
-       (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
-      dn->lock.can_lease(client)) {
-    int pool = 1;   // fixme.. do something smart!
-    // issue a dentry lease
-    ClientLease *l = dn->add_client_lease(client, session);
-    session->touch_lease(l);
-    
-    now += mdcache->client_lease_durations[pool];
-    mdcache->touch_client_lease(l, pool, now);
-
-    LeaseStat e;
-    e.mask = 1 | CEPH_LOCK_DN;  // old and new bit values
-    e.seq = ++l->seq;
-    e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
-    ::encode(e, bl);
-    dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
-            << " on " << *dn << dendl;
-  } else {
-    // null lease
-    LeaseStat e;
-    e.mask = 0;
-    e.seq = 0;
-    e.duration_ms = 0;
-    ::encode(e, bl);
-    dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
-  }
-}
-
-
-void Locker::revoke_client_leases(SimpleLock *lock)
-{
-  int n = 0;
-  CDentry *dn = static_cast<CDentry*>(lock->get_parent());
-  for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
-       p != dn->client_lease_map.end();
-       ++p) {
-    ClientLease *l = p->second;
-    
-    n++;
-    assert(lock->get_type() == CEPH_LOCK_DN);
-
-    CDentry *dn = static_cast<CDentry*>(lock->get_parent());
-    int mask = 1 | CEPH_LOCK_DN; // old and new bits
-    
-    // i should also revoke the dir ICONTENT lease, if they have it!
-    CInode *diri = dn->get_dir()->get_inode();
-    mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
-                                             mask,
-                                             diri->ino(),
-                                             diri->first, CEPH_NOSNAP,
-                                             dn->get_name()),
-                            l->client);
-  }
-  assert(n == lock->get_num_client_lease());
-}
-
-
-
-// locks ----------------------------------------------------------------
-
-SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info) 
-{
-  switch (lock_type) {
-  case CEPH_LOCK_DN:
-    {
-      // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
-      CInode *diri = mdcache->get_inode(info.dirfrag.ino);
-      frag_t fg;
-      CDir *dir = 0;
-      CDentry *dn = 0;
-      if (diri) {
-       fg = diri->pick_dirfrag(info.dname);
-       dir = diri->get_dirfrag(fg);
-       if (dir) 
-         dn = dir->lookup(info.dname, info.snapid);
-      }
-      if (!dn) {
-       dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
-       return 0;
-      }
-      return &dn->lock;
-    }
-
-  case CEPH_LOCK_IAUTH:
-  case CEPH_LOCK_ILINK:
-  case CEPH_LOCK_IDFT:
-  case CEPH_LOCK_IFILE:
-  case CEPH_LOCK_INEST:
-  case CEPH_LOCK_IXATTR:
-  case CEPH_LOCK_ISNAP:
-  case CEPH_LOCK_IFLOCK:
-  case CEPH_LOCK_IPOLICY:
-    {
-      CInode *in = mdcache->get_inode(info.ino, info.snapid);
-      if (!in) {
-       dout(7) << "get_lock don't have ino " << info.ino << dendl;
-       return 0;
-      }
-      switch (lock_type) {
-      case CEPH_LOCK_IAUTH: return &in->authlock;
-      case CEPH_LOCK_ILINK: return &in->linklock;
-      case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
-      case CEPH_LOCK_IFILE: return &in->filelock;
-      case CEPH_LOCK_INEST: return &in->nestlock;
-      case CEPH_LOCK_IXATTR: return &in->xattrlock;
-      case CEPH_LOCK_ISNAP: return &in->snaplock;
-      case CEPH_LOCK_IFLOCK: return &in->flocklock;
-      case CEPH_LOCK_IPOLICY: return &in->policylock;
-      }
-    }
-
-  default:
-    dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
-    ceph_abort();
-    break;
-  }
-
-  return 0;  
-}
-
-/* This function DOES put the passed message before returning */
-void Locker::handle_lock(MLock *m)
-{
-  // nobody should be talking to us during recovery.
-  assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
-
-  SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
-  if (!lock) {
-    dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
-    m->put();
-    return;
-  }
-
-  switch (lock->get_type()) {
-  case CEPH_LOCK_DN:
-  case CEPH_LOCK_IAUTH:
-  case CEPH_LOCK_ILINK:
-  case CEPH_LOCK_ISNAP:
-  case CEPH_LOCK_IXATTR:
-  case CEPH_LOCK_IFLOCK:
-  case CEPH_LOCK_IPOLICY:
-    handle_simple_lock(lock, m);
-    break;
-    
-  case CEPH_LOCK_IDFT:
-  case CEPH_LOCK_INEST:
-    //handle_scatter_lock((ScatterLock*)lock, m);
-    //break;
-
-  case CEPH_LOCK_IFILE:
-    handle_file_lock(static_cast<ScatterLock*>(lock), m);
-    break;
-    
-  default:
-    dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
-    ceph_abort();
-    break;
-  }
-}
-
-
-
-
-// ==========================================================================
-// simple lock
-
-/** This function may take a reference to m if it needs one, but does
- * not put references. */
-void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
-{
-  MDSCacheObject *parent = lock->get_parent();
-  if (parent->is_auth() &&
-      lock->get_state() != LOCK_SYNC &&
-      !parent->is_frozen()) {
-    dout(7) << "handle_reqrdlock got rdlock request on " << *lock
-           << " on " << *parent << dendl;
-    assert(parent->is_auth()); // replica auth pinned if they're doing this!
-    if (lock->is_stable()) {
-      simple_sync(lock);
-    } else {
-      dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
-      lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
-                       new C_MDS_RetryMessage(mds, m->get()));
-    }
-  } else {
-    dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
-           << " on " << *parent << dendl;
-    // replica should retry
-  }
-}
-
-/* This function DOES put the passed message before returning */
-void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
-{
-  int from = m->get_asker();
-  
-  dout(10) << "handle_simple_lock " << *m
-          << " on " << *lock << " " << *lock->get_parent() << dendl;
-
-  if (mds->is_rejoin()) {
-    if (lock->get_parent()->is_rejoining()) {
-      dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
-             << ", dropping " << *m << dendl;
-      m->put();
-      return;
-    }
-  }
-
-  switch (m->get_action()) {
-    // -- replica --
-  case LOCK_AC_SYNC:
-    assert(lock->get_state() == LOCK_LOCK);
-    lock->decode_locked_state(m->get_data());
-    lock->set_state(LOCK_SYNC);
-    lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
-    break;
-    
-  case LOCK_AC_LOCK:
-    assert(lock->get_state() == LOCK_SYNC);
-    lock->set_state(LOCK_SYNC_LOCK);
-    if (lock->is_leased())
-      revoke_client_leases(lock);
-    eval_gather(lock, true);
-    if (lock->is_unstable_and_locked())
-      mds->mdlog->flush();
-    break;
-
-
-    // -- auth --
-  case LOCK_AC_LOCKACK:
-    assert(lock->get_state() == LOCK_SYNC_LOCK ||
-          lock->get_state() == LOCK_SYNC_EXCL);
-    assert(lock->is_gathering(from));
-    lock->remove_gather(from);
-    
-    if (lock->is_gathering()) {
-      dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
-             << ", still gathering " << lock->get_gather_set() << dendl;
-    } else {
-      dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
-             << ", last one" << dendl;
-      eval_gather(lock);
-    }
-    break;
-
-  case LOCK_AC_REQRDLOCK:
-    handle_reqrdlock(lock, m);
-    break;
-
-  }
-
-  m->put();
-}
-
-/* unused, currently.
-
-class C_Locker_SimpleEval : public Context {
-  Locker *locker;
-  SimpleLock *lock;
-public:
-  C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
-  void finish(int r) {
-    locker->try_simple_eval(lock);
-  }
-};
-
-void Locker::try_simple_eval(SimpleLock *lock)
-{
-  // unstable and ambiguous auth?
-  if (!lock->is_stable() &&
-      lock->get_parent()->is_ambiguous_auth()) {
-    dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
-    //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
-    lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
-    return;
-  }
-
-  if (!lock->get_parent()->is_auth()) {
-    dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
-    return;
-  }
-
-  if (!lock->get_parent()->can_auth_pin()) {
-    dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
-    //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
-    lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
-    return;
-  }
-
-  if (lock->is_stable())
-    simple_eval(lock);
-}
-*/
-
-
-void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
-{
-  dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
-
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
-
-  if (lock->get_parent()->is_freezing_or_frozen()) {
-    // dentry lock in unreadable state can block path traverse
-    if ((lock->get_type() != CEPH_LOCK_DN ||
-        lock->get_state() == LOCK_SYNC ||
-        lock->get_parent()->is_frozen()))
-      return;
-  }
-
-  if (mdcache->is_readonly()) {
-    if (lock->get_state() != LOCK_SYNC) {
-      dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
-      simple_sync(lock, need_issue);
-    }
-    return;
-  }
-
-  CInode *in = 0;
-  int wanted = 0;
-  if (lock->get_type() != CEPH_LOCK_DN) {
-    in = static_cast<CInode*>(lock->get_parent());
-    in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
-  }
-  
-  // -> excl?
-  if (lock->get_state() != LOCK_EXCL &&
-      in && in->get_target_loner() >= 0 &&
-      (wanted & CEPH_CAP_GEXCL)) {
-    dout(7) << "simple_eval stable, going to excl " << *lock 
-           << " on " << *lock->get_parent() << dendl;
-    simple_excl(lock, need_issue);
-  }
-
-  // stable -> sync?
-  else if (lock->get_state() != LOCK_SYNC &&
-          !lock->is_wrlocked() &&
-          ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
-           (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
-    dout(7) << "simple_eval stable, syncing " << *lock 
-           << " on " << *lock->get_parent() << dendl;
-    simple_sync(lock, need_issue);
-  }
-}
-
-
-// mid
-
-bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
-{
-  dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
-
-  CInode *in = 0;
-  if (lock->get_cap_shift())
-    in = static_cast<CInode *>(lock->get_parent());
-
-  int old_state = lock->get_state();
-
-  if (old_state != LOCK_TSYN) {
-
-    switch (lock->get_state()) {
-    case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
-    case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
-    case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
-    case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
-    default: ceph_abort();
-    }
-
-    int gather = 0;
-    if (lock->is_wrlocked())
-      gather++;
-    
-    if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
-      send_lock_message(lock, LOCK_AC_SYNC);
-      lock->init_gather();
-      gather++;
-    }
-    
-    if (in && in->is_head()) {
-      if (in->issued_caps_need_gather(lock)) {
-       if (need_issue)
-         *need_issue = true;
-       else
-         issue_caps(in);
-       gather++;
-      }
-    }
-    
-    bool need_recover = false;
-    if (lock->get_type() == CEPH_LOCK_IFILE) {
-      assert(in);
-      if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
-        mds->mdcache->queue_file_recover(in);
-       need_recover = true;
-        gather++;
-      }
-    }
-    
-    if (!gather && lock->is_dirty()) {
-      lock->get_parent()->auth_pin(lock);
-      scatter_writebehind(static_cast<ScatterLock*>(lock));
-      mds->mdlog->flush();
-      return false;
-    }
-
-    if (gather) {
-      lock->get_parent()->auth_pin(lock);
-      if (need_recover)
-       mds->mdcache->do_file_recover();
-      return false;
-    }
-  }
-
-  if (lock->get_parent()->is_replicated()) {    // FIXME
-    bufferlist data;
-    lock->encode_locked_state(data);
-    send_lock_message(lock, LOCK_AC_SYNC, data);
-  }
-  lock->set_state(LOCK_SYNC);
-  lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
-  if (in && in->is_head()) {
-    if (need_issue)
-      *need_issue = true;
-    else
-      issue_caps(in);
-  }
-  return true;
-}
-
-void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
-{
-  dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
-
-  CInode *in = 0;
-  if (lock->get_cap_shift())
-    in = static_cast<CInode *>(lock->get_parent());
-
-  switch (lock->get_state()) {
-  case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
-  case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
-  case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
-  default: ceph_abort();
-  }
-  
-  int gather = 0;
-  if (lock->is_rdlocked())
-    gather++;
-  if (lock->is_wrlocked())
-    gather++;
-
-  if (lock->get_parent()->is_replicated() && 
-      lock->get_state() != LOCK_LOCK_EXCL &&
-      lock->get_state() != LOCK_XSYN_EXCL) {
-    send_lock_message(lock, LOCK_AC_LOCK);
-    lock->init_gather();
-    gather++;
-  }
-  
-  if (in && in->is_head()) {
-    if (in->issued_caps_need_gather(lock)) {
-      if (need_issue)
-       *need_issue = true;
-      else
-       issue_caps(in);
-      gather++;
-    }
-  }
-  
-  if (gather) {
-    lock->get_parent()->auth_pin(lock);
-  } else {
-    lock->set_state(LOCK_EXCL);
-    lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
-    if (in) {
-      if (need_issue)
-       *need_issue = true;
-      else
-       issue_caps(in);
-    }
-  }
-}
-
-void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
-{
-  dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
-  assert(lock->get_state() != LOCK_LOCK);
-  
-  CInode *in = 0;
-  if (lock->get_cap_shift())
-    in = static_cast<CInode *>(lock->get_parent());
-
-  int old_state = lock->get_state();
-
-  switch (lock->get_state()) {
-  case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
-  case LOCK_XSYN:
-    file_excl(static_cast<ScatterLock*>(lock), need_issue);
-    if (lock->get_state() != LOCK_EXCL)
-      return;
-    // fall-thru
-  case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
-  case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
-    (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
-    break;
-  case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
-  default: ceph_abort();
-  }
-
-  int gather = 0;
-  if (lock->is_leased()) {
-    gather++;
-    revoke_client_leases(lock);
-  }
-  if (lock->is_rdlocked())
-    gather++;
-  if (in && in->is_head()) {
-    if (in->issued_caps_need_gather(lock)) {
-      if (need_issue)
-       *need_issue = true;
-      else
-       issue_caps(in);
-      gather++;
-    }
-  }
-
-  bool need_recover = false;
-  if (lock->get_type() == CEPH_LOCK_IFILE) {
-    assert(in);
-    if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
-      mds->mdcache->queue_file_recover(in);
-      need_recover = true;
-      gather++;
-    }
-  }
-
-  if (lock->get_parent()->is_replicated() &&
-      lock->get_state() == LOCK_MIX_LOCK &&
-      gather) {
-    dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
-  } else {
-    // move to second stage of gather now, so we don't send the lock action later.
-    if (lock->get_state() == LOCK_MIX_LOCK)
-      lock->set_state(LOCK_MIX_LOCK2);
-
-    if (lock->get_parent()->is_replicated() &&
-       lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) {  // replica may already be LOCK
-      gather++;
-      send_lock_message(lock, LOCK_AC_LOCK);
-      lock->init_gather();
-    }
-  }
-
-  if (!gather && lock->is_dirty()) {
-    lock->get_parent()->auth_pin(lock);
-    scatter_writebehind(static_cast<ScatterLock*>(lock));
-    mds->mdlog->flush();
-    return;
-  }
-
-  if (gather) {
-    lock->get_parent()->auth_pin(lock);
-    if (need_recover)
-      mds->mdcache->do_file_recover();
-  } else {
-    lock->set_state(LOCK_LOCK);
-    lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
-  }
-}
-
-
-void Locker::simple_xlock(SimpleLock *lock)
-{
-  dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
-  //assert(lock->is_stable());
-  assert(lock->get_state() != LOCK_XLOCK);
-  
-  CInode *in = 0;
-  if (lock->get_cap_shift())
-    in = static_cast<CInode *>(lock->get_parent());
-
-  if (lock->is_stable())
-    lock->get_parent()->auth_pin(lock);
-
-  switch (lock->get_state()) {
-  case LOCK_LOCK: 
-  case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
-  default: ceph_abort();
-  }
-
-  int gather = 0;
-  if (lock->is_rdlocked())
-    gather++;
-  if (lock->is_wrlocked())
-    gather++;
-  
-  if (in && in->is_head()) {
-    if (in->issued_caps_need_gather(lock)) {
-      issue_caps(in);
-      gather++;
-    }
-  }
-
-  if (!gather) {
-    lock->set_state(LOCK_PREXLOCK);
-    //assert("shouldn't be called if we are already xlockable" == 0);
-  }
-}
-
-
-
-
-
-// ==========================================================================
-// scatter lock
-
-/*
-
-Some notes on scatterlocks.
-
- - The scatter/gather is driven by the inode lock.  The scatter always
-   brings in the latest metadata from the fragments.
-
- - When in a scattered/MIX state, fragments are only allowed to
-   update/be written to if the accounted stat matches the inode's
-   current version.
-
- - That means, on gather, we _only_ assimilate diffs for frag metadata
-   that match the current version, because those are the only ones
-   written during this scatter/gather cycle.  (Others didn't permit
-   it.)  We increment the version and journal this to disk.
-
- - When possible, we also simultaneously update our local frag
-   accounted stats to match.
-
- - On scatter, the new inode info is broadcast to frags, both local
-   and remote.  If possible (auth and !frozen), the dirfrag auth
-   should update the accounted state (if it isn't already up to date).
-   Note that this may occur on both the local inode auth node and
-   inode replicas, so there are two potential paths. If it is NOT
-   possible, they need to mark_stale to prevent any possible writes.
-
- - A scatter can be to MIX (potentially writeable) or to SYNC (read
-   only).  Both are opportunities to update the frag accounted stats,
-   even though only the MIX case is affected by a stale dirfrag.
-
- - Because many scatter/gather cycles can potentially go by without a
-   frag being able to update its accounted stats (due to being frozen
-   by exports/refragments in progress), the frag may have (even very)
-   old stat versions.  That's fine.  If when we do want to update it,
-   we can update accounted_* and the version first.
-
-*/
-
-class C_Locker_ScatterWB : public LockerLogContext {
-  ScatterLock *lock;
-  MutationRef mut;
-public:
-  C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
-    LockerLogContext(l), lock(sl), mut(m) {}
-  void finish(int r) override { 
-    locker->scatter_writebehind_finish(lock, mut); 
-  }
-};
-
-void Locker::scatter_writebehind(ScatterLock *lock)
-{
-  CInode *in = static_cast<CInode*>(lock->get_parent());
-  dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
-
-  // journal
-  MutationRef mut(new MutationImpl());
-  mut->ls = mds->mdlog->get_current_segment();
-
-  // forcefully take a wrlock
-  lock->get_wrlock(true);
-  mut->wrlocks.insert(lock);
-  mut->locks.insert(lock);
-
-  in->pre_cow_old_inode();  // avoid cow mayhem
-
-  inode_t *pi = in->project_inode();
-  pi->version = in->pre_dirty();
-
-  in->finish_scatter_gather_update(lock->get_type());
-  lock->start_flush();
-
-  EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
-  mds->mdlog->start_entry(le);
-
-  mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
-  mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
-  
-  in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
-
-  mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
-}
-
-void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
-{
-  CInode *in = static_cast<CInode*>(lock->get_parent());
-  dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
-  in->pop_and_dirty_projected_inode(mut->ls);
-
-  lock->finish_flush();
-
-  // if replicas may have flushed in a mix->lock state, send another
-  // message so they can finish_flush().
-  if (in->is_replicated()) {
-    switch (lock->get_state()) {
-    case LOCK_MIX_LOCK:
-    case LOCK_MIX_LOCK2:
-    case LOCK_MIX_EXCL:
-    case LOCK_MIX_TSYN:
-      send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
-    }
-  }
-
-  mut->apply();
-  drop_locks(mut.get());
-  mut->cleanup();
-
-  if (lock->is_stable())
-    lock->finish_waiters(ScatterLock::WAIT_STABLE);
-
-  //scatter_eval_gather(lock);
-}
-
-void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
-{
-  dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
-
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
-
-  if (lock->get_parent()->is_freezing_or_frozen()) {
-    dout(20) << "  freezing|frozen" << dendl;
-    return;
-  }
-
-  if (mdcache->is_readonly()) {
-    if (lock->get_state() != LOCK_SYNC) {
-      dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
-      simple_sync(lock, need_issue);
-    }
-    return;
-  }
-  
-  if (!lock->is_rdlocked() &&
-      lock->get_state() != LOCK_MIX &&
-      lock->get_scatter_wanted()) {
-    dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
-            << " on " << *lock->get_parent() << dendl;
-    scatter_mix(lock, need_issue);
-    return;
-  }
-
-  if (lock->get_type() == CEPH_LOCK_INEST) {
-    // in general, we want to keep INEST writable at all times.
-    if (!lock->is_rdlocked()) {
-      if (lock->get_parent()->is_replicated()) {
-       if (lock->get_state() != LOCK_MIX)
-         scatter_mix(lock, need_issue);
-      } else {
-       if (lock->get_state() != LOCK_LOCK)
-         simple_lock(lock, need_issue);
-      }
-    }
-    return;
-  }
-
-  CInode *in = static_cast<CInode*>(lock->get_parent());
-  if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
-    // i _should_ be sync.
-    if (!lock->is_wrlocked() &&
-       lock->get_state() != LOCK_SYNC) {
-      dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
-      simple_sync(lock, need_issue);
-    }
-  }
-}
-
-
-/*
- * mark a scatterlock to indicate that the dir fnode has some dirty data
- */
-void Locker::mark_updated_scatterlock(ScatterLock *lock)
-{
-  lock->mark_dirty();
-  if (lock->get_updated_item()->is_on_list()) {
-    dout(10) << "mark_updated_scatterlock " << *lock
-            << " - already on list since " << lock->get_update_stamp() << dendl;
-  } else {
-    updated_scatterlocks.push_back(lock->get_updated_item());
-    utime_t now = ceph_clock_now();
-    lock->set_update_stamp(now);
-    dout(10) << "mark_updated_scatterlock " << *lock
-            << " - added at " << now << dendl;
-  }
-}
-
-/*
- * this is called by scatter_tick and LogSegment::try_to_trim() when
- * trying to flush dirty scattered data (i.e. updated fnode) back to
- * the inode.
- *
- * we need to lock|scatter in order to push fnode changes into the
- * inode.dirstat.
- */
-void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
-{
-  CInode *p = static_cast<CInode *>(lock->get_parent());
-
-  if (p->is_frozen() || p->is_freezing()) {
-    dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
-    if (c) 
-      p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
-    else
-      // just requeue.  not ideal.. starvation prone..
-      updated_scatterlocks.push_back(lock->get_updated_item());
-    return;
-  }
-
-  if (p->is_ambiguous_auth()) {
-    dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
-    if (c) 
-      p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
-    else
-      // just requeue.  not ideal.. starvation prone..
-      updated_scatterlocks.push_back(lock->get_updated_item());
-    return;
-  }
-
-  if (p->is_auth()) {
-    int count = 0;
-    while (true) {
-      if (lock->is_stable()) {
-       // can we do it now?
-       //  (only if we're not replicated.. if we are, we really do need
-       //   to nudge the lock state!)
-       /*
-         actually, even if we're not replicated, we can't stay in MIX, because another mds
-         could discover and replicate us at any time.  if that happens while we're flushing,
-         they end up in MIX but their inode has the old scatterstat version.
-
-       if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
-         dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
-         scatter_writebehind(lock);
-         if (c)
-           lock->add_waiter(SimpleLock::WAIT_STABLE, c);
-         return;
-       }
-       */
-
-       if (mdcache->is_readonly()) {
-         if (lock->get_state() != LOCK_SYNC) {
-           dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
-           simple_sync(static_cast<ScatterLock*>(lock));
-         }
-         break;
-       }
-
-       // adjust lock state
-       dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
-       switch (lock->get_type()) {
-       case CEPH_LOCK_IFILE:
-         if (p->is_replicated() && lock->get_state() != LOCK_MIX)
-           scatter_mix(static_cast<ScatterLock*>(lock));
-         else if (lock->get_state() != LOCK_LOCK)
-           simple_lock(static_cast<ScatterLock*>(lock));
-         else
-           simple_sync(static_cast<ScatterLock*>(lock));
-         break;
-         
-       case CEPH_LOCK_IDFT:
-       case CEPH_LOCK_INEST:
-         if (p->is_replicated() && lock->get_state() != LOCK_MIX)
-           scatter_mix(lock);
-         else if (lock->get_state() != LOCK_LOCK)
-           simple_lock(lock);
-         else
-           simple_sync(lock);
-         break;
-       default:
-         ceph_abort();
-       }
-       ++count;
-       if (lock->is_stable() && count == 2) {
-         dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
-         // this should only realy happen when called via
-         // handle_file_lock due to AC_NUDGE, because the rest of the
-         // time we are replicated or have dirty data and won't get
-         // called.  bailing here avoids an infinite loop.
-         assert(!c); 
-         break;
-       }
-      } else {
-       dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
-       if (c)
-         lock->add_waiter(SimpleLock::WAIT_STABLE, c);
-       return;
-      }
-    }
-  } else {
-    dout(10) << "scatter_nudge replica, requesting scatter/unscatter of " 
-            << *lock << " on " << *p << dendl;
-    // request unscatter?
-    mds_rank_t auth = lock->get_parent()->authority().first;
-    if (!mds->is_cluster_degraded() ||
-       mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
-      mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
-
-    // wait...
-    if (c)
-      lock->add_waiter(SimpleLock::WAIT_STABLE, c);
-
-    // also, requeue, in case we had wrong auth or something
-    updated_scatterlocks.push_back(lock->get_updated_item());
-  }
-}
-
-void Locker::scatter_tick()
-{
-  dout(10) << "scatter_tick" << dendl;
-  
-  // updated
-  utime_t now = ceph_clock_now();
-  int n = updated_scatterlocks.size();
-  while (!updated_scatterlocks.empty()) {
-    ScatterLock *lock = updated_scatterlocks.front();
-
-    if (n-- == 0) break;  // scatter_nudge() may requeue; avoid looping
-    
-    if (!lock->is_dirty()) {
-      updated_scatterlocks.pop_front();
-      dout(10) << " removing from updated_scatterlocks " 
-              << *lock << " " << *lock->get_parent() << dendl;
-      continue;
-    }
-    if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
-      break;
-    updated_scatterlocks.pop_front();
-    scatter_nudge(lock, 0);
-  }
-  mds->mdlog->flush();
-}
-
-
-void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
-{
-  dout(10) << "scatter_tempsync " << *lock
-          << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
-
-  assert(0 == "not fully implemented, at least not for filelock");
-
-  CInode *in = static_cast<CInode *>(lock->get_parent());
-
-  switch (lock->get_state()) {
-  case LOCK_SYNC: ceph_abort();   // this shouldn't happen
-  case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
-  case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
-  default: ceph_abort();
-  }
-
-  int gather = 0;
-  if (lock->is_wrlocked())
-    gather++;
-
-  if (lock->get_cap_shift() &&
-      in->is_head() &&
-      in->issued_caps_need_gather(lock)) {
-    if (need_issue)
-      *need_issue = true;
-    else
-      issue_caps(in);
-    gather++;
-  }
-
-  if (lock->get_state() == LOCK_MIX_TSYN &&
-      in->is_replicated()) {
-    lock->init_gather();
-    send_lock_message(lock, LOCK_AC_LOCK);
-    gather++;
-  }
-
-  if (gather) {
-    in->auth_pin(lock);
-  } else {
-    // do tempsync
-    lock->set_state(LOCK_TSYN);
-    lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
-    if (lock->get_cap_shift()) {
-      if (need_issue)
-       *need_issue = true;
-      else
-       issue_caps(in);
-    }
-  }
-}
-
-
-
-// ==========================================================================
-// local lock
-
-void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
-{
-  dout(7) << "local_wrlock_grab  on " << *lock
-         << " on " << *lock->get_parent() << dendl;  
-  
-  assert(lock->get_parent()->is_auth());
-  assert(lock->can_wrlock());
-  assert(!mut->wrlocks.count(lock));
-  lock->get_wrlock(mut->get_client());
-  mut->wrlocks.insert(lock);
-  mut->locks.insert(lock);
-}
-
-bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
-{
-  dout(7) << "local_wrlock_start  on " << *lock
-         << " on " << *lock->get_parent() << dendl;  
-  
-  assert(lock->get_parent()->is_auth());
-  if (lock->can_wrlock()) {
-    assert(!mut->wrlocks.count(lock));
-    lock->get_wrlock(mut->get_client());
-    mut->wrlocks.insert(lock);
-    mut->locks.insert(lock);
-    return true;
-  } else {
-    lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
-    return false;
-  }
-}
-
-void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
-{
-  dout(7) << "local_wrlock_finish  on " << *lock
-         << " on " << *lock->get_parent() << dendl;  
-  lock->put_wrlock();
-  mut->wrlocks.erase(lock);
-  mut->locks.erase(lock);
-  if (lock->get_num_wrlocks() == 0) {
-    lock->finish_waiters(SimpleLock::WAIT_STABLE |
-                         SimpleLock::WAIT_WR |
-                         SimpleLock::WAIT_RD);
-  }
-}
-
-bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
-{
-  dout(7) << "local_xlock_start  on " << *lock
-         << " on " << *lock->get_parent() << dendl;  
-  
-  assert(lock->get_parent()->is_auth());
-  if (!lock->can_xlock_local()) {
-    lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
-    return false;
-  }
-
-  lock->get_xlock(mut, mut->get_client());
-  mut->xlocks.insert(lock);
-  mut->locks.insert(lock);
-  return true;
-}
-
-void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
-{
-  dout(7) << "local_xlock_finish  on " << *lock
-         << " on " << *lock->get_parent() << dendl;  
-  lock->put_xlock();
-  mut->xlocks.erase(lock);
-  mut->locks.erase(lock);
-
-  lock->finish_waiters(SimpleLock::WAIT_STABLE | 
-                      SimpleLock::WAIT_WR | 
-                      SimpleLock::WAIT_RD);
-}
-
-
-
-// ==========================================================================
-// file lock
-
-
-void Locker::file_eval(ScatterLock *lock, bool *need_issue)
-{
-  CInode *in = static_cast<CInode*>(lock->get_parent());
-  int loner_wanted, other_wanted;
-  int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
-  dout(7) << "file_eval wanted=" << gcap_string(wanted)
-         << " loner_wanted=" << gcap_string(loner_wanted)
-         << " other_wanted=" << gcap_string(other_wanted)
-         << "  filelock=" << *lock << " on " << *lock->get_parent()
-         << dendl;
-
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
-
-  if (lock->get_parent()->is_freezing_or_frozen())
-    return;
-
-  if (mdcache->is_readonly()) {
-    if (lock->get_state() != LOCK_SYNC) {
-      dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
-      simple_sync(lock, need_issue);
-    }
-    return;
-  }
-
-  // excl -> *?
-  if (lock->get_state() == LOCK_EXCL) {
-    dout(20) << " is excl" << dendl;
-    int loner_issued, other_issued, xlocker_issued;
-    in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
-    dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
-            << " other_issued=" << gcap_string(other_issued)
-           << " xlocker_issued=" << gcap_string(xlocker_issued)
-           << dendl;
-    if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
-        (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
-       (in->inode.is_dir() && in->multiple_nonstale_caps())) {  // FIXME.. :/
-      dout(20) << " should lose it" << dendl;
-      // we should lose it.
-      //  loner  other   want
-      //  R      R       SYNC
-      //  R      R|W     MIX
-      //  R      W       MIX
-      //  R|W    R       MIX
-      //  R|W    R|W     MIX
-      //  R|W    W       MIX
-      //  W      R       MIX
-      //  W      R|W     MIX
-      //  W      W       MIX
-      // -> any writer means MIX; RD doesn't matter.
-      if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
-         lock->is_waiter_for(SimpleLock::WAIT_WR))
-       scatter_mix(lock, need_issue);
-      else if (!lock->is_wrlocked())   // let excl wrlocks drain first
-       simple_sync(lock, need_issue);
-      else
-       dout(10) << " waiting for wrlock to drain" << dendl;
-    }    
-  }
-
-  // * -> excl?
-  else if (lock->get_state() != LOCK_EXCL &&
-          !lock->is_rdlocked() &&
-          //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
-          ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
-           (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
-          in->get_target_loner() >= 0) {
-    dout(7) << "file_eval stable, bump to loner " << *lock
-           << " on " << *lock->get_parent() << dendl;
-    file_excl(lock, need_issue);
-  }
-
-  // * -> mixed?
-  else if (lock->get_state() != LOCK_MIX &&
-          !lock->is_rdlocked() &&
-          //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
-          (lock->get_scatter_wanted() ||
-           (in->get_wanted_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
-    dout(7) << "file_eval stable, bump to mixed " << *lock
-           << " on " << *lock->get_parent() << dendl;
-    scatter_mix(lock, need_issue);
-  }
-  
-  // * -> sync?
-  else if (lock->get_state() != LOCK_SYNC &&
-          !lock->is_wrlocked() &&   // drain wrlocks first!
-          !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
-          !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
-          !((lock->get_state() == LOCK_MIX) &&
-            in->is_dir() && in->has_subtree_or_exporting_dirfrag())  // if we are a delegation point, stay where we are
-          //((wanted & CEPH_CAP_RD) || 
-          //in->is_replicated() || 
-          //lock->get_num_client_lease() || 
-          //(!loner && lock->get_state() == LOCK_EXCL)) &&
-          ) {
-    dout(7) << "file_eval stable, bump to sync " << *lock 
-           << " on " << *lock->get_parent() << dendl;
-    simple_sync(lock, need_issue);
-  }
-}
-
-
-
-void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
-{
-  dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
-
-  CInode *in = static_cast<CInode*>(lock->get_parent());
-  assert(in->is_auth());
-  assert(lock->is_stable());
-
-  if (lock->get_state() == LOCK_LOCK) {
-    in->start_scatter(lock);
-    if (in->is_replicated()) {
-      // data
-      bufferlist softdata;
-      lock->encode_locked_state(softdata);
-
-      // bcast to replicas
-      send_lock_message(lock, LOCK_AC_MIX, softdata);
-    }
-
-    // change lock
-    lock->set_state(LOCK_MIX);
-    lock->clear_scatter_wanted();
-    if (lock->get_cap_shift()) {
-      if (need_issue)
-       *need_issue = true;
-      else
-       issue_caps(in);
-    }
-  } else {
-    // gather?
-    switch (lock->get_state()) {
-    case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
-    case LOCK_XSYN:
-      file_excl(lock, need_issue);
-      if (lock->get_state() != LOCK_EXCL)
-       return;
-      // fall-thru
-    case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
-    case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
-    default: ceph_abort();
-    }
-
-    int gather = 0;
-    if (lock->is_rdlocked())
-      gather++;
-    if (in->is_replicated()) {
-      if (lock->get_state() != LOCK_EXCL_MIX &&   // EXCL replica is already LOCK
-         lock->get_state() != LOCK_XSYN_EXCL) {  // XSYN replica is already LOCK;  ** FIXME here too!
-       send_lock_message(lock, LOCK_AC_MIX);
-       lock->init_gather();
-       gather++;
-      }
-    }
-    if (lock->is_leased()) {
-      revoke_client_leases(lock);
-      gather++;
-    }
-    if (lock->get_cap_shift() &&
-       in->is_head() &&
-       in->issued_caps_need_gather(lock)) {
-      if (need_issue)
-       *need_issue = true;
-      else
-       issue_caps(in);
-      gather++;
-    }
-    bool need_recover = false;
-    if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
-      mds->mdcache->queue_file_recover(in);
-      need_recover = true;
-      gather++;
-    }
-
-    if (gather) {
-      lock->get_parent()->auth_pin(lock);
-      if (need_recover)
-       mds->mdcache->do_file_recover();
-    } else {
-      in->start_scatter(lock);
-      lock->set_state(LOCK_MIX);
-      lock->clear_scatter_wanted();
-      if (in->is_replicated()) {
-       bufferlist softdata;
-       lock->encode_locked_state(softdata);
-       send_lock_message(lock, LOCK_AC_MIX, softdata);
-      }
-      if (lock->get_cap_shift()) {
-       if (need_issue)
-         *need_issue = true;
-       else
-         issue_caps(in);
-      }
-    }
-  }
-}
-
-
-void Locker::file_excl(ScatterLock *lock, bool *need_issue)
-{
-  CInode *in = static_cast<CInode*>(lock->get_parent());
-  dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;  
-
-  assert(in->is_auth());
-  assert(lock->is_stable());
-
-  assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
-        (lock->get_state() == LOCK_XSYN));  // must do xsyn -> excl -> <anything else>
-  
-  switch (lock->get_state()) {
-  case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
-  case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
-  case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
-  case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
-  default: ceph_abort();
-  }
-  int gather = 0;
-  
-  if (lock->is_rdlocked())
-    gather++;
-  if (lock->is_wrlocked())
-    gather++;
-
-  if (in->is_replicated() &&
-      lock->get_state() != LOCK_LOCK_EXCL &&
-      lock->get_state() != LOCK_XSYN_EXCL) {  // if we were lock, replicas are already lock.
-    send_lock_message(lock, LOCK_AC_LOCK);
-    lock->init_gather();
-    gather++;
-  }
-  if (lock->is_leased()) {
-    revoke_client_leases(lock);
-    gather++;
-  }
-  if (in->is_head() &&
-      in->issued_caps_need_gather(lock)) {
-    if (need_issue)
-      *need_issue = true;
-    else
-      issue_caps(in);
-    gather++;
-  }
-  bool need_recover = false;
-  if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
-    mds->mdcache->queue_file_recover(in);
-    need_recover = true;
-    gather++;
-  }
-  
-  if (gather) {
-    lock->get_parent()->auth_pin(lock);
-    if (need_recover)
-      mds->mdcache->do_file_recover();
-  } else {
-    lock->set_state(LOCK_EXCL);
-    if (need_issue)
-      *need_issue = true;
-    else
-      issue_caps(in);
-  }
-}
-
-void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
-{
-  dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
-  CInode *in = static_cast<CInode *>(lock->get_parent());
-  assert(in->is_auth());
-  assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
-
-  switch (lock->get_state()) {
-  case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
-  default: ceph_abort();
-  }
-  
-  int gather = 0;
-  if (lock->is_wrlocked())
-    gather++;
-
-  if (in->is_head() &&
-      in->issued_caps_need_gather(lock)) {
-    if (need_issue)
-      *need_issue = true;
-    else
-      issue_caps(in);
-    gather++;
-  }
-  
-  if (gather) {
-    lock->get_parent()->auth_pin(lock);
-  } else {
-    lock->set_state(LOCK_XSYN);
-    lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
-    if (need_issue)
-      *need_issue = true;
-    else
-      issue_caps(in);
-  }
-}
-
-void Locker::file_recover(ScatterLock *lock)
-{
-  CInode *in = static_cast<CInode *>(lock->get_parent());
-  dout(7) << "file_recover " << *lock << " on " << *in << dendl;
-
-  assert(in->is_auth());
-  //assert(lock->is_stable());
-  assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
-
-  int gather = 0;
-  
-  /*
-  if (in->is_replicated()
-      lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
-    send_lock_message(lock, LOCK_AC_LOCK);
-    lock->init_gather();
-    gather++;
-  }
-  */
-  if (in->is_head() &&
-      in->issued_caps_need_gather(lock)) {
-    issue_caps(in);
-    gather++;
-  }
-
-  lock->set_state(LOCK_SCAN);
-  if (gather)
-    in->state_set(CInode::STATE_NEEDSRECOVER);
-  else
-    mds->mdcache->queue_file_recover(in);
-}
-
-
-// messenger
-/* This function DOES put the passed message before returning */
-void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
-{
-  CInode *in = static_cast<CInode*>(lock->get_parent());
-  int from = m->get_asker();
-
-  if (mds->is_rejoin()) {
-    if (in->is_rejoining()) {
-      dout(7) << "handle_file_lock still rejoining " << *in
-             << ", dropping " << *m << dendl;
-      m->put();
-      return;
-    }
-  }
-
-  dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
-         << " on " << *lock
-         << " from mds." << from << " " 
-         << *in << dendl;
-
-  bool caps = lock->get_cap_shift();
-  
-  switch (m->get_action()) {
-    // -- replica --
-  case LOCK_AC_SYNC:
-    assert(lock->get_state() == LOCK_LOCK ||
-          lock->get_state() == LOCK_MIX ||
-          lock->get_state() == LOCK_MIX_SYNC2);
-    
-    if (lock->get_state() == LOCK_MIX) {
-      lock->set_state(LOCK_MIX_SYNC);
-      eval_gather(lock, true);
-      if (lock->is_unstable_and_locked())
-       mds->mdlog->flush();
-      break;
-    }
-
-    (static_cast<ScatterLock *>(lock))->finish_flush();
-    (static_cast<ScatterLock *>(lock))->clear_flushed();
-
-    // ok
-    lock->decode_locked_state(m->get_data());
-    lock->set_state(LOCK_SYNC);
-
-    lock->get_rdlock();
-    if (caps)
-      issue_caps(in);
-    lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
-    lock->put_rdlock();
-    break;
-    
-  case LOCK_AC_LOCK:
-    switch (lock->get_state()) {
-    case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
-    case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
-    default: ceph_abort();
-    }
-
-    eval_gather(lock, true);
-    if (lock->is_unstable_and_locked())
-      mds->mdlog->flush();
-
-    break;
-
-  case LOCK_AC_LOCKFLUSHED:
-    (static_cast<ScatterLock *>(lock))->finish_flush();
-    (static_cast<ScatterLock *>(lock))->clear_flushed();
-    // wake up scatter_nudge waiters
-    if (lock->is_stable())
-      lock->finish_waiters(SimpleLock::WAIT_STABLE);
-    break;
-    
-  case LOCK_AC_MIX:
-    assert(lock->get_state() == LOCK_SYNC ||
-           lock->get_state() == LOCK_LOCK ||
-          lock->get_state() == LOCK_SYNC_MIX2);
-    
-    if (lock->get_state() == LOCK_SYNC) {
-      // MIXED
-      lock->set_state(LOCK_SYNC_MIX);
-      eval_gather(lock, true);
-      if (lock->is_unstable_and_locked())
-       mds->mdlog->flush();
-      break;
-    } 
-
-    // ok
-    lock->set_state(LOCK_MIX);
-    lock->decode_locked_state(m->get_data());
-
-    if (caps)
-      issue_caps(in);
-    
-    lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
-    break;
-
-
-    // -- auth --
-  case LOCK_AC_LOCKACK:
-    assert(lock->get_state() == LOCK_SYNC_LOCK ||
-           lock->get_state() == LOCK_MIX_LOCK ||
-           lock->get_state() == LOCK_MIX_LOCK2 ||
-           lock->get_state() == LOCK_MIX_EXCL ||
-           lock->get_state() == LOCK_SYNC_EXCL ||
-           lock->get_state() == LOCK_SYNC_MIX ||
-          lock->get_state() == LOCK_MIX_TSYN);
-    assert(lock->is_gathering(from));
-    lock->remove_gather(from);
-    
-    if (lock->get_state() == LOCK_MIX_LOCK ||
-       lock->get_state() == LOCK_MIX_LOCK2 ||
-       lock->get_state() == LOCK_MIX_EXCL ||
-       lock->get_state() == LOCK_MIX_TSYN) {
-      lock->decode_locked_state(m->get_data());
-      // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
-      // delay calling scatter_writebehind().
-      lock->clear_flushed();
-    }
-
-    if (lock->is_gathering()) {
-      dout(7) << "handle_file_lock " << *in << " from " << from
-             << ", still gathering " << lock->get_gather_set() << dendl;
-    } else {
-      dout(7) << "handle_file_lock " << *in << " from " << from
-             << ", last one" << dendl;
-      eval_gather(lock);
-    }
-    break;
-    
-  case LOCK_AC_SYNCACK:
-    assert(lock->get_state() == LOCK_MIX_SYNC);
-    assert(lock->is_gathering(from));
-    lock->remove_gather(from);
-    
-    lock->decode_locked_state(m->get_data());
-
-    if (lock->is_gathering()) {
-      dout(7) << "handle_file_lock " << *in << " from " << from
-             << ", still gathering " << lock->get_gather_set() << dendl;
-    } else {
-      dout(7) << "handle_file_lock " << *in << " from " << from
-             << ", last one" << dendl;
-      eval_gather(lock);
-    }
-    break;
-
-  case LOCK_AC_MIXACK:
-    assert(lock->get_state() == LOCK_SYNC_MIX);
-    assert(lock->is_gathering(from));
-    lock->remove_gather(from);
-    
-    if (lock->is_gathering()) {
-      dout(7) << "handle_file_lock " << *in << " from " << from
-             << ", still gathering " << lock->get_gather_set() << dendl;
-    } else {
-      dout(7) << "handle_file_lock " << *in << " from " << from
-             << ", last one" << dendl;
-      eval_gather(lock);
-    }
-    break;
-
-
-    // requests....
-  case LOCK_AC_REQSCATTER:
-    if (lock->is_stable()) {
-      /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
-       *  because the replica should be holding an auth_pin if they're
-       *  doing this (and thus, we are freezing, not frozen, and indefinite
-       *  starvation isn't an issue).
-       */
-      dout(7) << "handle_file_lock got scatter request on " << *lock
-             << " on " << *lock->get_parent() << dendl;
-      if (lock->get_state() != LOCK_MIX)  // i.e., the reqscatter didn't race with an actual mix/scatter
-       scatter_mix(lock);
-    } else {
-      dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
-             << " on " << *lock->get_parent() << dendl;
-      lock->set_scatter_wanted();
-    }
-    break;
-
-  case LOCK_AC_REQUNSCATTER:
-    if (lock->is_stable()) {
-      /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
-       *  because the replica should be holding an auth_pin if they're
-       *  doing this (and thus, we are freezing, not frozen, and indefinite
-       *  starvation isn't an issue).
-       */
-      dout(7) << "handle_file_lock got unscatter request on " << *lock
-             << " on " << *lock->get_parent() << dendl;
-      if (lock->get_state() == LOCK_MIX)  // i.e., the reqscatter didn't race with an actual mix/scatter
-       simple_lock(lock);  // FIXME tempsync?
-    } else {
-      dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
-             << " on " << *lock->get_parent() << dendl;
-      lock->set_unscatter_wanted();
-    }
-    break;
-
-  case LOCK_AC_REQRDLOCK:
-    handle_reqrdlock(lock, m);
-    break;
-
-  case LOCK_AC_NUDGE:
-    if (!lock->get_parent()->is_auth()) {
-      dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
-             << " on " << *lock->get_parent() << dendl;
-    } else if (!lock->get_parent()->is_replicated()) {
-      dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
-             << " on " << *lock->get_parent() << dendl;
-    } else {
-      dout(7) << "handle_file_lock trying nudge on " << *lock
-             << " on " << *lock->get_parent() << dendl;
-      scatter_nudge(lock, 0, true);
-      mds->mdlog->flush();
-    }
-    break;
-
-  default:
-    ceph_abort();
-  }  
-  
-  m->put();
-}
-
-
-
-
-
-