initial code repo
[stor4nfv.git] / src / ceph / src / mds / Locker.cc
diff --git a/src/ceph/src/mds/Locker.cc b/src/ceph/src/mds/Locker.cc
new file mode 100644 (file)
index 0000000..a0ccf96
--- /dev/null
@@ -0,0 +1,5316 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#include "MDSRank.h"
+#include "MDCache.h"
+#include "Locker.h"
+#include "CInode.h"
+#include "CDir.h"
+#include "CDentry.h"
+#include "Mutation.h"
+#include "MDSContext.h"
+
+#include "MDLog.h"
+#include "MDSMap.h"
+
+#include "events/EUpdate.h"
+#include "events/EOpen.h"
+
+#include "msg/Messenger.h"
+#include "osdc/Objecter.h"
+
+#include "messages/MInodeFileCaps.h"
+#include "messages/MLock.h"
+#include "messages/MClientLease.h"
+#include "messages/MClientReply.h"
+#include "messages/MClientCaps.h"
+#include "messages/MClientCapRelease.h"
+
+#include "messages/MMDSSlaveRequest.h"
+
+#include <errno.h>
+
+#include "common/config.h"
+
+
+#define dout_subsys ceph_subsys_mds
+#undef dout_prefix
+#define dout_context g_ceph_context
+#define dout_prefix _prefix(_dout, mds)
+static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
+  return *_dout << "mds." << mds->get_nodeid() << ".locker ";
+}
+
+
+class LockerContext : public MDSInternalContextBase {
+protected:
+  Locker *locker;
+  MDSRank *get_mds() override
+  {
+    return locker->mds;
+  }
+
+public:
+  explicit LockerContext(Locker *locker_) : locker(locker_) {
+    assert(locker != NULL);
+  }
+};
+
+class LockerLogContext : public MDSLogContextBase {
+protected:
+  Locker *locker;
+  MDSRank *get_mds() override
+  {
+    return locker->mds;
+  }
+
+public:
+  explicit LockerLogContext(Locker *locker_) : locker(locker_) {
+    assert(locker != NULL);
+  }
+};
+
+/* This function DOES put the passed message before returning */
+void Locker::dispatch(Message *m)
+{
+
+  switch (m->get_type()) {
+
+    // inter-mds locking
+  case MSG_MDS_LOCK:
+    handle_lock(static_cast<MLock*>(m));
+    break;
+    // inter-mds caps
+  case MSG_MDS_INODEFILECAPS:
+    handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
+    break;
+
+    // client sync
+  case CEPH_MSG_CLIENT_CAPS:
+    handle_client_caps(static_cast<MClientCaps*>(m));
+
+    break;
+  case CEPH_MSG_CLIENT_CAPRELEASE:
+    handle_client_cap_release(static_cast<MClientCapRelease*>(m));
+    break;
+  case CEPH_MSG_CLIENT_LEASE:
+    handle_client_lease(static_cast<MClientLease*>(m));
+    break;
+    
+  default:
+    derr << "locker unknown message " << m->get_type() << dendl;
+    assert(0 == "locker unknown message");
+  }
+}
+
+void Locker::tick()
+{
+  scatter_tick();
+  caps_tick();
+}
+
+/*
+ * locks vs rejoin
+ *
+ * 
+ *
+ */
+
+void Locker::send_lock_message(SimpleLock *lock, int msg)
+{
+  for (const auto &it : lock->get_parent()->get_replicas()) {
+    if (mds->is_cluster_degraded() &&
+       mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
+      continue;
+    MLock *m = new MLock(lock, msg, mds->get_nodeid());
+    mds->send_message_mds(m, it.first);
+  }
+}
+
+void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
+{
+  for (const auto &it : lock->get_parent()->get_replicas()) {
+    if (mds->is_cluster_degraded() &&
+       mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
+      continue;
+    MLock *m = new MLock(lock, msg, mds->get_nodeid());
+    m->set_data(data);
+    mds->send_message_mds(m, it.first);
+  }
+}
+
+
+
+
+void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
+{
+  // rdlock ancestor snaps
+  CInode *t = in;
+  rdlocks.insert(&in->snaplock);
+  while (t->get_projected_parent_dn()) {
+    t = t->get_projected_parent_dn()->get_dir()->get_inode();
+    rdlocks.insert(&t->snaplock);
+  }
+}
+
+void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
+                                         file_layout_t **layout)
+{
+  //rdlock ancestor snaps
+  CInode *t = in;
+  rdlocks.insert(&in->snaplock);
+  rdlocks.insert(&in->policylock);
+  bool found_layout = false;
+  while (t) {
+    rdlocks.insert(&t->snaplock);
+    if (!found_layout) {
+      rdlocks.insert(&t->policylock);
+      if (t->get_projected_inode()->has_layout()) {
+        *layout = &t->get_projected_inode()->layout;
+        found_layout = true;
+      }
+    }
+    if (t->get_projected_parent_dn() &&
+        t->get_projected_parent_dn()->get_dir())
+      t = t->get_projected_parent_dn()->get_dir()->get_inode();
+    else t = NULL;
+  }
+}
+
+struct MarkEventOnDestruct {
+  MDRequestRef& mdr;
+  const char* message;
+  bool mark_event;
+  MarkEventOnDestruct(MDRequestRef& _mdr,
+                      const char *_message) : mdr(_mdr),
+                          message(_message),
+                          mark_event(true) {}
+  ~MarkEventOnDestruct() {
+    if (mark_event)
+      mdr->mark_event(message);
+  }
+};
+
+/* If this function returns false, the mdr has been placed
+ * on the appropriate wait list */
+bool Locker::acquire_locks(MDRequestRef& mdr,
+                          set<SimpleLock*> &rdlocks,
+                          set<SimpleLock*> &wrlocks,
+                          set<SimpleLock*> &xlocks,
+                          map<SimpleLock*,mds_rank_t> *remote_wrlocks,
+                          CInode *auth_pin_freeze,
+                          bool auth_pin_nonblock)
+{
+  if (mdr->done_locking &&
+      !mdr->is_slave()) {  // not on slaves!  master requests locks piecemeal.
+    dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;    
+    return true;  // at least we had better be!
+  }
+  dout(10) << "acquire_locks " << *mdr << dendl;
+
+  MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
+
+  client_t client = mdr->get_client();
+
+  set<SimpleLock*, SimpleLock::ptr_lt> sorted;  // sort everything we will lock
+  set<MDSCacheObject*> mustpin;            // items to authpin
+
+  // xlocks
+  for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
+    dout(20) << " must xlock " << **p << " " << *(*p)->get_parent() << dendl;
+    sorted.insert(*p);
+    mustpin.insert((*p)->get_parent());
+
+    // augment xlock with a versionlock?
+    if ((*p)->get_type() == CEPH_LOCK_DN) {
+      CDentry *dn = (CDentry*)(*p)->get_parent();
+      if (!dn->is_auth())
+       continue;
+
+      if (xlocks.count(&dn->versionlock))
+       continue;  // we're xlocking the versionlock too; don't wrlock it!
+
+      if (mdr->is_master()) {
+       // master.  wrlock versionlock so we can pipeline dentry updates to journal.
+       wrlocks.insert(&dn->versionlock);
+      } else {
+       // slave.  exclusively lock the dentry version (i.e. block other journal updates).
+       // this makes rollback safe.
+       xlocks.insert(&dn->versionlock);
+       sorted.insert(&dn->versionlock);
+      }
+    }
+    if ((*p)->get_type() > CEPH_LOCK_IVERSION) {
+      // inode version lock?
+      CInode *in = (CInode*)(*p)->get_parent();
+      if (!in->is_auth())
+       continue;
+      if (mdr->is_master()) {
+       // master.  wrlock versionlock so we can pipeline inode updates to journal.
+       wrlocks.insert(&in->versionlock);
+      } else {
+       // slave.  exclusively lock the inode version (i.e. block other journal updates).
+       // this makes rollback safe.
+       xlocks.insert(&in->versionlock);
+       sorted.insert(&in->versionlock);
+      }
+    }
+  }
+
+  // wrlocks
+  for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
+    MDSCacheObject *object = (*p)->get_parent();
+    dout(20) << " must wrlock " << **p << " " << *object << dendl;
+    sorted.insert(*p);
+    if (object->is_auth())
+      mustpin.insert(object);
+    else if (!object->is_auth() &&
+            !(*p)->can_wrlock(client) &&  // we might have to request a scatter
+            !mdr->is_slave()) {           // if we are slave (remote_wrlock), the master already authpinned
+      dout(15) << " will also auth_pin " << *object
+              << " in case we need to request a scatter" << dendl;
+      mustpin.insert(object);
+    }
+  }
+
+  // remote_wrlocks
+  if (remote_wrlocks) {
+    for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
+      MDSCacheObject *object = p->first->get_parent();
+      dout(20) << " must remote_wrlock on mds." << p->second << " "
+              << *p->first << " " << *object << dendl;
+      sorted.insert(p->first);
+      mustpin.insert(object);
+    }
+  }
+
+  // rdlocks
+  for (set<SimpleLock*>::iterator p = rdlocks.begin();
+        p != rdlocks.end();
+       ++p) {
+    MDSCacheObject *object = (*p)->get_parent();
+    dout(20) << " must rdlock " << **p << " " << *object << dendl;
+    sorted.insert(*p);
+    if (object->is_auth())
+      mustpin.insert(object);
+    else if (!object->is_auth() &&
+            !(*p)->can_rdlock(client)) {      // we might have to request an rdlock
+      dout(15) << " will also auth_pin " << *object
+              << " in case we need to request a rdlock" << dendl;
+      mustpin.insert(object);
+    }
+  }
+
+  // AUTH PINS
+  map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote;  // mds -> (object set)
+  
+  // can i auth pin them all now?
+  marker.message = "failed to authpin local pins";
+  for (set<MDSCacheObject*>::iterator p = mustpin.begin();
+       p != mustpin.end();
+       ++p) {
+    MDSCacheObject *object = *p;
+
+    dout(10) << " must authpin " << *object << dendl;
+
+    if (mdr->is_auth_pinned(object)) {
+      if (object != (MDSCacheObject*)auth_pin_freeze)
+       continue;
+      if (mdr->more()->is_remote_frozen_authpin) {
+       if (mdr->more()->rename_inode == auth_pin_freeze)
+         continue;
+       // unfreeze auth pin for the wrong inode
+       mustpin_remote[mdr->more()->rename_inode->authority().first].size();
+      }
+    }
+    
+    if (!object->is_auth()) {
+      if (!mdr->locks.empty())
+       drop_locks(mdr.get());
+      if (object->is_ambiguous_auth()) {
+       // wait
+       dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
+       object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
+       mdr->drop_local_auth_pins();
+       return false;
+      }
+      mustpin_remote[object->authority().first].insert(object);
+      continue;
+    }
+    if (!object->can_auth_pin()) {
+      // wait
+      drop_locks(mdr.get());
+      mdr->drop_local_auth_pins();
+      if (auth_pin_nonblock) {
+       dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
+       mdr->aborted = true;
+       return false;
+      }
+      dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
+      object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
+
+      if (!mdr->remote_auth_pins.empty())
+       notify_freeze_waiter(object);
+
+      return false;
+    }
+  }
+
+  // ok, grab local auth pins
+  for (set<MDSCacheObject*>::iterator p = mustpin.begin();
+       p != mustpin.end();
+       ++p) {
+    MDSCacheObject *object = *p;
+    if (mdr->is_auth_pinned(object)) {
+      dout(10) << " already auth_pinned " << *object << dendl;
+    } else if (object->is_auth()) {
+      dout(10) << " auth_pinning " << *object << dendl;
+      mdr->auth_pin(object);
+    }
+  }
+
+  // request remote auth_pins
+  if (!mustpin_remote.empty()) {
+    marker.message = "requesting remote authpins";
+    for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
+        p != mdr->remote_auth_pins.end();
+        ++p) {
+      if (mustpin.count(p->first)) {
+       assert(p->second == p->first->authority().first);
+       map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
+       if (q != mustpin_remote.end())
+         q->second.insert(p->first);
+      }
+    }
+    for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
+        p != mustpin_remote.end();
+        ++p) {
+      dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
+
+      // wait for active auth
+      if (mds->is_cluster_degraded() &&
+         !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
+       dout(10) << " mds." << p->first << " is not active" << dendl;
+       if (mdr->more()->waiting_on_slave.empty())
+         mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
+       return false;
+      }
+      
+      MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
+                                                  MMDSSlaveRequest::OP_AUTHPIN);
+      for (set<MDSCacheObject*>::iterator q = p->second.begin();
+          q != p->second.end();
+          ++q) {
+       dout(10) << " req remote auth_pin of " << **q << dendl;
+       MDSCacheObjectInfo info;
+       (*q)->set_object_info(info);
+       req->get_authpins().push_back(info);
+       if (*q == auth_pin_freeze)
+         (*q)->set_object_info(req->get_authpin_freeze());
+       mdr->pin(*q);
+      }
+      if (auth_pin_nonblock)
+       req->mark_nonblock();
+      mds->send_message_mds(req, p->first);
+
+      // put in waiting list
+      assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
+      mdr->more()->waiting_on_slave.insert(p->first);
+    }
+    return false;
+  }
+
+  // caps i'll need to issue
+  set<CInode*> issue_set;
+  bool result = false;
+
+  // acquire locks.
+  // make sure they match currently acquired locks.
+  set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
+  for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
+       p != sorted.end();
+       ++p) {
+    bool need_wrlock = !!wrlocks.count(*p);
+    bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
+
+    // already locked?
+    if (existing != mdr->locks.end() && *existing == *p) {
+      // right kind?
+      SimpleLock *have = *existing;
+      ++existing;
+      if (xlocks.count(have) && mdr->xlocks.count(have)) {
+       dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
+       continue;
+      }
+      if (mdr->remote_wrlocks.count(have)) {
+       if (!need_remote_wrlock ||
+           mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
+         dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
+                  << " " << *have << " " << *have->get_parent() << dendl;
+         remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
+       }
+      }
+      if (need_wrlock || need_remote_wrlock) {
+       if (need_wrlock == !!mdr->wrlocks.count(have) &&
+           need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
+         if (need_wrlock)
+           dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
+         if (need_remote_wrlock)
+           dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
+         continue;
+       }
+      }
+      if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
+       dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
+       continue;
+      }
+    }
+    
+    // hose any stray locks
+    if (existing != mdr->locks.end() && *existing == *p) {
+      assert(need_wrlock || need_remote_wrlock);
+      SimpleLock *lock = *existing;
+      if (mdr->wrlocks.count(lock)) {
+       if (!need_wrlock)
+         dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
+       else if (need_remote_wrlock) // acquire remote_wrlock first
+         dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
+       bool need_issue = false;
+       wrlock_finish(lock, mdr.get(), &need_issue);
+       if (need_issue)
+         issue_set.insert(static_cast<CInode*>(lock->get_parent()));
+      }
+      ++existing;
+    }
+    while (existing != mdr->locks.end()) {
+      SimpleLock *stray = *existing;
+      ++existing;
+      dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
+      bool need_issue = false;
+      if (mdr->xlocks.count(stray)) {
+       xlock_finish(stray, mdr.get(), &need_issue);
+      } else if (mdr->rdlocks.count(stray)) {
+       rdlock_finish(stray, mdr.get(), &need_issue);
+      } else {
+       // may have acquired both wrlock and remore wrlock
+       if (mdr->wrlocks.count(stray))
+         wrlock_finish(stray, mdr.get(), &need_issue);
+       if (mdr->remote_wrlocks.count(stray))
+         remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
+      }
+      if (need_issue)
+       issue_set.insert(static_cast<CInode*>(stray->get_parent()));
+    }
+
+    // lock
+    if (mdr->locking && *p != mdr->locking) {
+      cancel_locking(mdr.get(), &issue_set);
+    }
+    if (xlocks.count(*p)) {
+      marker.message = "failed to xlock, waiting";
+      if (!xlock_start(*p, mdr)) 
+       goto out;
+      dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
+    } else if (need_wrlock || need_remote_wrlock) {
+      if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
+        marker.message = "waiting for remote wrlocks";
+       remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
+       goto out;
+      }
+      if (need_wrlock && !mdr->wrlocks.count(*p)) {
+        marker.message = "failed to wrlock, waiting";
+       if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
+         marker.message = "failed to wrlock, dropping remote wrlock and waiting";
+         // can't take the wrlock because the scatter lock is gathering. need to
+         // release the remote wrlock, so that the gathering process can finish.
+         remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
+         remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
+         goto out;
+       }
+       // nowait if we have already gotten remote wrlock
+       if (!wrlock_start(*p, mdr, need_remote_wrlock))
+         goto out;
+       dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
+      }
+    } else {
+      assert(mdr->is_master());
+      if ((*p)->is_scatterlock()) {
+       ScatterLock *slock = static_cast<ScatterLock *>(*p);
+       if (slock->is_rejoin_mix()) {
+         // If there is a recovering mds who replcated an object when it failed
+         // and scatterlock in the object was in MIX state, It's possible that
+         // the recovering mds needs to take wrlock on the scatterlock when it
+         // replays unsafe requests. So this mds should delay taking rdlock on
+         // the scatterlock until the recovering mds finishes replaying unsafe.
+         // Otherwise unsafe requests may get replayed after current request.
+         //
+         // For example:
+         // The recovering mds is auth mds of a dirfrag, this mds is auth mds
+         // of correspinding inode. when 'rm -rf' the direcotry, this mds should
+         // delay the rmdir request until the recovering mds has replayed unlink
+         // requests.
+         if (mds->is_cluster_degraded()) {
+           if (!mdr->is_replay()) {
+             drop_locks(mdr.get());
+             mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
+             dout(10) << " rejoin mix scatterlock " << *slock << " " << *(*p)->get_parent()
+                      << ", waiting for cluster recovered" << dendl;
+             marker.message = "rejoin mix scatterlock, waiting for cluster recovered";
+             return false;
+           }
+         } else {
+           slock->clear_rejoin_mix();
+         }
+       }
+      }
+
+      marker.message = "failed to rdlock, waiting";
+      if (!rdlock_start(*p, mdr)) 
+       goto out;
+      dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
+    }
+  }
+    
+  // any extra unneeded locks?
+  while (existing != mdr->locks.end()) {
+    SimpleLock *stray = *existing;
+    ++existing;
+    dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
+    bool need_issue = false;
+    if (mdr->xlocks.count(stray)) {
+      xlock_finish(stray, mdr.get(), &need_issue);
+    } else if (mdr->rdlocks.count(stray)) {
+      rdlock_finish(stray, mdr.get(), &need_issue);
+    } else {
+      // may have acquired both wrlock and remore wrlock
+      if (mdr->wrlocks.count(stray))
+       wrlock_finish(stray, mdr.get(), &need_issue);
+      if (mdr->remote_wrlocks.count(stray))
+       remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
+    }
+    if (need_issue)
+      issue_set.insert(static_cast<CInode*>(stray->get_parent()));
+  }
+
+  mdr->done_locking = true;
+  mdr->set_mds_stamp(ceph_clock_now());
+  result = true;
+  marker.message = "acquired locks";
+
+ out:
+  issue_caps_set(issue_set);
+  return result;
+}
+
+void Locker::notify_freeze_waiter(MDSCacheObject *o)
+{
+  CDir *dir = NULL;
+  if (CInode *in = dynamic_cast<CInode*>(o)) {
+    if (!in->is_root())
+      dir = in->get_parent_dir();
+  } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
+    dir = dn->get_dir();
+  } else {
+    dir = dynamic_cast<CDir*>(o);
+    assert(dir);
+  }
+  if (dir) {
+    if (dir->is_freezing_dir())
+      mdcache->fragment_freeze_inc_num_waiters(dir);
+    if (dir->is_freezing_tree()) {
+      while (!dir->is_freezing_tree_root())
+       dir = dir->get_parent_dir();
+      mdcache->migrator->export_freeze_inc_num_waiters(dir);
+    }
+  }
+}
+
+void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
+{
+  for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
+       p != mut->xlocks.end();
+       ++p) {
+    MDSCacheObject *object = (*p)->get_parent();
+    assert(object->is_auth());
+    if (skip_dentry &&
+       ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
+      continue;
+    dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
+    (*p)->set_xlock_done();
+  }
+}
+
+void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+{
+  while (!mut->rdlocks.empty()) {
+    bool ni = false;
+    MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
+    rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
+    if (ni)
+      pneed_issue->insert(static_cast<CInode*>(p));
+  }
+}
+
+void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+{
+  set<mds_rank_t> slaves;
+
+  while (!mut->xlocks.empty()) {
+    SimpleLock *lock = *mut->xlocks.begin();
+    MDSCacheObject *p = lock->get_parent();
+    if (!p->is_auth()) {
+      assert(lock->get_sm()->can_remote_xlock);
+      slaves.insert(p->authority().first);
+      lock->put_xlock();
+      mut->locks.erase(lock);
+      mut->xlocks.erase(lock);
+      continue;
+    }
+    bool ni = false;
+    xlock_finish(lock, mut, &ni);
+    if (ni)
+      pneed_issue->insert(static_cast<CInode*>(p));
+  }
+
+  while (!mut->remote_wrlocks.empty()) {
+    map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
+    slaves.insert(p->second);
+    if (mut->wrlocks.count(p->first) == 0)
+      mut->locks.erase(p->first);
+    mut->remote_wrlocks.erase(p);
+  }
+
+  while (!mut->wrlocks.empty()) {
+    bool ni = false;
+    MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
+    wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
+    if (ni)
+      pneed_issue->insert(static_cast<CInode*>(p));
+  }
+
+  for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
+    if (!mds->is_cluster_degraded() ||
+       mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
+      dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
+      MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
+                                                       MMDSSlaveRequest::OP_DROPLOCKS);
+      mds->send_message_mds(slavereq, *p);
+    }
+  }
+}
+
+void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
+{
+  SimpleLock *lock = mut->locking;
+  assert(lock);
+  dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
+
+  if (lock->get_parent()->is_auth()) {
+    bool need_issue = false;
+    if (lock->get_state() == LOCK_PREXLOCK) {
+      _finish_xlock(lock, -1, &need_issue);
+    } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
+              lock->get_num_xlocks() == 0) {
+      lock->set_state(LOCK_XLOCKDONE);
+      eval_gather(lock, true, &need_issue);
+    }
+    if (need_issue)
+      pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
+  }
+  mut->finish_locking(lock);
+}
+
+void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
+{
+  // leftover locks
+  set<CInode*> my_need_issue;
+  if (!pneed_issue)
+    pneed_issue = &my_need_issue;
+
+  if (mut->locking)
+    cancel_locking(mut, pneed_issue);
+  _drop_non_rdlocks(mut, pneed_issue);
+  _drop_rdlocks(mut, pneed_issue);
+
+  if (pneed_issue == &my_need_issue)
+    issue_caps_set(*pneed_issue);
+  mut->done_locking = false;
+}
+
+void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+{
+  set<CInode*> my_need_issue;
+  if (!pneed_issue)
+    pneed_issue = &my_need_issue;
+
+  _drop_non_rdlocks(mut, pneed_issue);
+
+  if (pneed_issue == &my_need_issue)
+    issue_caps_set(*pneed_issue);
+}
+
+void Locker::drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+{
+  set<CInode*> my_need_issue;
+  if (!pneed_issue)
+    pneed_issue = &my_need_issue;
+
+  _drop_rdlocks(mut, pneed_issue);
+
+  if (pneed_issue == &my_need_issue)
+    issue_caps_set(*pneed_issue);
+}
+
+
+// generics
+
+void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
+{
+  dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
+  assert(!lock->is_stable());
+
+  int next = lock->get_next_state();
+
+  CInode *in = 0;
+  bool caps = lock->get_cap_shift();
+  if (lock->get_type() != CEPH_LOCK_DN)
+    in = static_cast<CInode *>(lock->get_parent());
+
+  bool need_issue = false;
+
+  int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
+  assert(!caps || in != NULL);
+  if (caps && in->is_head()) {
+    in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
+                       lock->get_cap_shift(), lock->get_cap_mask());
+    dout(10) << " next state is " << lock->get_state_name(next) 
+            << " issued/allows loner " << gcap_string(loner_issued)
+            << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
+            << " xlocker " << gcap_string(xlocker_issued)
+            << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
+            << " other " << gcap_string(other_issued)
+            << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
+            << dendl;
+
+    if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
+                 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
+                 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
+      need_issue = true;
+  }
+
+#define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
+  bool auth = lock->get_parent()->is_auth();
+  if (!lock->is_gathering() &&
+      (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
+      (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
+      (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
+      (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
+      !(lock->get_parent()->is_auth() && lock->is_flushing()) &&  // i.e. wait for scatter_writebehind!
+      (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
+                (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
+                (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
+      lock->get_state() != LOCK_SYNC_MIX2 &&  // these states need an explicit trigger from the auth mds
+      lock->get_state() != LOCK_MIX_SYNC2
+      ) {
+    dout(7) << "eval_gather finished gather on " << *lock
+           << " on " << *lock->get_parent() << dendl;
+
+    if (lock->get_sm() == &sm_filelock) {
+      assert(in);
+      if (in->state_test(CInode::STATE_RECOVERING)) {
+       dout(7) << "eval_gather finished gather, but still recovering" << dendl;
+       return;
+      } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
+       dout(7) << "eval_gather finished gather, but need to recover" << dendl;
+       mds->mdcache->queue_file_recover(in);
+       mds->mdcache->do_file_recover();
+       return;
+      }
+    }
+
+    if (!lock->get_parent()->is_auth()) {
+      // replica: tell auth
+      mds_rank_t auth = lock->get_parent()->authority().first;
+
+      if (lock->get_parent()->is_rejoining() &&
+         mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
+       dout(7) << "eval_gather finished gather, but still rejoining "
+               << *lock->get_parent() << dendl;
+       return;
+      }
+
+      if (!mds->is_cluster_degraded() ||
+         mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
+       switch (lock->get_state()) {
+       case LOCK_SYNC_LOCK:
+         mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
+                               auth);
+         break;
+
+       case LOCK_MIX_SYNC:
+         {
+           MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
+           lock->encode_locked_state(reply->get_data());
+           mds->send_message_mds(reply, auth);
+           next = LOCK_MIX_SYNC2;
+           (static_cast<ScatterLock *>(lock))->start_flush();
+         }
+         break;
+
+       case LOCK_MIX_SYNC2:
+         (static_cast<ScatterLock *>(lock))->finish_flush();
+         (static_cast<ScatterLock *>(lock))->clear_flushed();
+
+       case LOCK_SYNC_MIX2:
+         // do nothing, we already acked
+         break;
+         
+       case LOCK_SYNC_MIX:
+         { 
+           MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
+           mds->send_message_mds(reply, auth);
+           next = LOCK_SYNC_MIX2;
+         }
+         break;
+
+       case LOCK_MIX_LOCK:
+         {
+           bufferlist data;
+           lock->encode_locked_state(data);
+           mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
+           (static_cast<ScatterLock *>(lock))->start_flush();
+           // we'll get an AC_LOCKFLUSHED to complete
+         }
+         break;
+
+       default:
+         ceph_abort();
+       }
+      }
+    } else {
+      // auth
+
+      // once the first (local) stage of mix->lock gather complete we can
+      // gather from replicas
+      if (lock->get_state() == LOCK_MIX_LOCK &&
+         lock->get_parent()->is_replicated()) {
+       dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
+       send_lock_message(lock, LOCK_AC_LOCK);
+       lock->init_gather();
+       lock->set_state(LOCK_MIX_LOCK2);
+       return;
+      }
+
+      if (lock->is_dirty() && !lock->is_flushed()) {
+       scatter_writebehind(static_cast<ScatterLock *>(lock));
+       mds->mdlog->flush();
+       return;
+      }
+      lock->clear_flushed();
+      
+      switch (lock->get_state()) {
+       // to mixed
+      case LOCK_TSYN_MIX:
+      case LOCK_SYNC_MIX:
+      case LOCK_EXCL_MIX:
+       in->start_scatter(static_cast<ScatterLock *>(lock));
+       if (lock->get_parent()->is_replicated()) {
+         bufferlist softdata;
+         lock->encode_locked_state(softdata);
+         send_lock_message(lock, LOCK_AC_MIX, softdata);
+       }
+       (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
+       break;
+
+      case LOCK_XLOCK:
+      case LOCK_XLOCKDONE:
+       if (next != LOCK_SYNC)
+         break;
+       // fall-thru
+
+       // to sync
+      case LOCK_EXCL_SYNC:
+      case LOCK_LOCK_SYNC:
+      case LOCK_MIX_SYNC:
+      case LOCK_XSYN_SYNC:
+       if (lock->get_parent()->is_replicated()) {
+         bufferlist softdata;
+         lock->encode_locked_state(softdata);
+         send_lock_message(lock, LOCK_AC_SYNC, softdata);
+       }
+       break;
+      }
+
+    }
+
+    lock->set_state(next);
+    
+    if (lock->get_parent()->is_auth() &&
+       lock->is_stable())
+      lock->get_parent()->auth_unpin(lock);
+
+    // drop loner before doing waiters
+    if (caps &&
+       in->is_head() &&
+       in->is_auth() &&
+       in->get_wanted_loner() != in->get_loner()) {
+      dout(10) << "  trying to drop loner" << dendl;
+      if (in->try_drop_loner()) {
+       dout(10) << "  dropped loner" << dendl;
+       need_issue = true;
+      }
+    }
+
+    if (pfinishers)
+      lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
+                        *pfinishers);
+    else
+      lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
+    
+    if (caps && in->is_head())
+      need_issue = true;
+
+    if (lock->get_parent()->is_auth() &&
+       lock->is_stable())
+      try_eval(lock, &need_issue);
+  }
+
+  if (need_issue) {
+    if (pneed_issue)
+      *pneed_issue = true;
+    else if (in->is_head())
+      issue_caps(in);
+  }
+
+}
+
+bool Locker::eval(CInode *in, int mask, bool caps_imported)
+{
+  bool need_issue = caps_imported;
+  list<MDSInternalContextBase*> finishers;
+  
+  dout(10) << "eval " << mask << " " << *in << dendl;
+
+  // choose loner?
+  if (in->is_auth() && in->is_head()) {
+    if (in->choose_ideal_loner() >= 0) {
+      if (in->try_set_loner()) {
+       dout(10) << "eval set loner to client." << in->get_loner() << dendl;
+       need_issue = true;
+       mask = -1;
+      } else
+       dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
+    } else
+      dout(10) << "eval doesn't want loner" << dendl;
+  }
+
+ retry:
+  if (mask & CEPH_LOCK_IFILE)
+    eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
+  if (mask & CEPH_LOCK_IAUTH)
+    eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
+  if (mask & CEPH_LOCK_ILINK)
+    eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
+  if (mask & CEPH_LOCK_IXATTR)
+    eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
+  if (mask & CEPH_LOCK_INEST)
+    eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
+  if (mask & CEPH_LOCK_IFLOCK)
+    eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
+  if (mask & CEPH_LOCK_IPOLICY)
+    eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
+
+  // drop loner?
+  if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
+    dout(10) << "  trying to drop loner" << dendl;
+    if (in->try_drop_loner()) {
+      dout(10) << "  dropped loner" << dendl;
+      need_issue = true;
+
+      if (in->get_wanted_loner() >= 0) {
+       if (in->try_set_loner()) {
+         dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
+         mask = -1;
+         goto retry;
+       } else {
+         dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
+       }
+      }
+    }
+  }
+
+  finish_contexts(g_ceph_context, finishers);
+
+  if (need_issue && in->is_head())
+    issue_caps(in);
+
+  dout(10) << "eval done" << dendl;
+  return need_issue;
+}
+
+class C_Locker_Eval : public LockerContext {
+  MDSCacheObject *p;
+  int mask;
+public:
+  C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
+    // We are used as an MDSCacheObject waiter, so should
+    // only be invoked by someone already holding the big lock.
+    assert(locker->mds->mds_lock.is_locked_by_me());
+    p->get(MDSCacheObject::PIN_PTRWAITER);    
+  }
+  void finish(int r) override {
+    locker->try_eval(p, mask);
+    p->put(MDSCacheObject::PIN_PTRWAITER);
+  }
+};
+
+void Locker::try_eval(MDSCacheObject *p, int mask)
+{
+  // unstable and ambiguous auth?
+  if (p->is_ambiguous_auth()) {
+    dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
+    p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
+    return;
+  }
+
+  if (p->is_auth() && p->is_frozen()) {
+    dout(7) << "try_eval frozen, waiting on " << *p << dendl;
+    p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
+    return;
+  }
+
+  if (mask & CEPH_LOCK_DN) {
+    assert(mask == CEPH_LOCK_DN);
+    bool need_issue = false;  // ignore this, no caps on dentries
+    CDentry *dn = static_cast<CDentry *>(p);
+    eval_any(&dn->lock, &need_issue);
+  } else {
+    CInode *in = static_cast<CInode *>(p);
+    eval(in, mask);
+  }
+}
+
+void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
+{
+  MDSCacheObject *p = lock->get_parent();
+
+  // unstable and ambiguous auth?
+  if (p->is_ambiguous_auth()) {
+    dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
+    p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
+    return;
+  }
+  
+  if (!p->is_auth()) {
+    dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
+    return;
+  }
+
+  if (p->is_frozen()) {
+    dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
+    p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
+    return;
+  }
+
+  /*
+   * We could have a situation like:
+   *
+   * - mds A authpins item on mds B
+   * - mds B starts to freeze tree containing item
+   * - mds A tries wrlock_start on A, sends REQSCATTER to B
+   * - mds B lock is unstable, sets scatter_wanted
+   * - mds B lock stabilizes, calls try_eval.
+   *
+   * We can defer while freezing without causing a deadlock.  Honor
+   * scatter_wanted flag here.  This will never get deferred by the
+   * checks above due to the auth_pin held by the master.
+   */
+  if (lock->is_scatterlock()) {
+    ScatterLock *slock = static_cast<ScatterLock *>(lock);
+    if (slock->get_scatter_wanted() &&
+       slock->get_state() != LOCK_MIX) {
+      scatter_mix(slock, pneed_issue);
+      if (!lock->is_stable())
+       return;
+    } else if (slock->get_unscatter_wanted() &&
+        slock->get_state() != LOCK_LOCK) {
+      simple_lock(slock, pneed_issue);
+      if (!lock->is_stable()) {
+        return;
+      }
+    }
+  }
+
+  if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
+    dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
+    p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
+    return;
+  }
+
+  eval(lock, pneed_issue);
+}
+
+void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
+{
+  bool need_issue = false;
+  list<MDSInternalContextBase*> finishers;
+
+  // kick locks now
+  if (!in->filelock.is_stable())
+    eval_gather(&in->filelock, false, &need_issue, &finishers);
+  if (!in->authlock.is_stable())
+    eval_gather(&in->authlock, false, &need_issue, &finishers);
+  if (!in->linklock.is_stable())
+    eval_gather(&in->linklock, false, &need_issue, &finishers);
+  if (!in->xattrlock.is_stable())
+    eval_gather(&in->xattrlock, false, &need_issue, &finishers);
+
+  if (need_issue && in->is_head()) {
+    if (issue_set)
+      issue_set->insert(in);
+    else
+      issue_caps(in);
+  }
+
+  finish_contexts(g_ceph_context, finishers);
+}
+
+void Locker::eval_scatter_gathers(CInode *in)
+{
+  bool need_issue = false;
+  list<MDSInternalContextBase*> finishers;
+
+  dout(10) << "eval_scatter_gathers " << *in << dendl;
+
+  // kick locks now
+  if (!in->filelock.is_stable())
+    eval_gather(&in->filelock, false, &need_issue, &finishers);
+  if (!in->nestlock.is_stable())
+    eval_gather(&in->nestlock, false, &need_issue, &finishers);
+  if (!in->dirfragtreelock.is_stable())
+    eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
+  
+  if (need_issue && in->is_head())
+    issue_caps(in);
+  
+  finish_contexts(g_ceph_context, finishers);
+}
+
+void Locker::eval(SimpleLock *lock, bool *need_issue)
+{
+  switch (lock->get_type()) {
+  case CEPH_LOCK_IFILE:
+    return file_eval(static_cast<ScatterLock*>(lock), need_issue);
+  case CEPH_LOCK_IDFT:
+  case CEPH_LOCK_INEST:
+    return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
+  default:
+    return simple_eval(lock, need_issue);
+  }
+}
+
+
+// ------------------
+// rdlock
+
+bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
+{
+  // kick the lock
+  if (lock->is_stable()) {
+    if (lock->get_parent()->is_auth()) {
+      if (lock->get_sm() == &sm_scatterlock) {
+       // not until tempsync is fully implemented
+       //if (lock->get_parent()->is_replicated())
+       //scatter_tempsync((ScatterLock*)lock);
+       //else
+       simple_sync(lock);
+      } else if (lock->get_sm() == &sm_filelock) {
+       CInode *in = static_cast<CInode*>(lock->get_parent());
+       if (lock->get_state() == LOCK_EXCL &&
+           in->get_target_loner() >= 0 &&
+           !in->is_dir() && !as_anon)   // as_anon => caller wants SYNC, not XSYN
+         file_xsyn(lock);
+       else
+         simple_sync(lock);
+      } else
+       simple_sync(lock);
+      return true;
+    } else {
+      // request rdlock state change from auth
+      mds_rank_t auth = lock->get_parent()->authority().first;
+      if (!mds->is_cluster_degraded() ||
+         mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
+       dout(10) << "requesting rdlock from auth on "
+                << *lock << " on " << *lock->get_parent() << dendl;
+       mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
+      }
+      return false;
+    }
+  }
+  if (lock->get_type() == CEPH_LOCK_IFILE) {
+    CInode *in = static_cast<CInode *>(lock->get_parent());
+    if (in->state_test(CInode::STATE_RECOVERING)) {
+      mds->mdcache->recovery_queue.prioritize(in);
+    }
+  }
+
+  return false;
+}
+
+bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
+{
+  dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;  
+
+  // can read?  grab ref.
+  if (lock->can_rdlock(client)) 
+    return true;
+  
+  _rdlock_kick(lock, false);
+
+  if (lock->can_rdlock(client)) 
+    return true;
+
+  // wait!
+  if (con) {
+    dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
+    lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
+  }
+  return false;
+}
+
+bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
+{
+  dout(7) << "rdlock_start  on " << *lock << " on " << *lock->get_parent() << dendl;  
+
+  // client may be allowed to rdlock the same item it has xlocked.
+  //  UNLESS someone passes in as_anon, or we're reading snapped version here.
+  if (mut->snapid != CEPH_NOSNAP)
+    as_anon = true;
+  client_t client = as_anon ? -1 : mut->get_client();
+
+  CInode *in = 0;
+  if (lock->get_type() != CEPH_LOCK_DN)
+    in = static_cast<CInode *>(lock->get_parent());
+
+  /*
+  if (!lock->get_parent()->is_auth() &&
+      lock->fw_rdlock_to_auth()) {
+    mdcache->request_forward(mut, lock->get_parent()->authority().first);
+    return false;
+  }
+  */
+
+  while (1) {
+    // can read?  grab ref.
+    if (lock->can_rdlock(client)) {
+      lock->get_rdlock();
+      mut->rdlocks.insert(lock);
+      mut->locks.insert(lock);
+      return true;
+    }
+
+    // hmm, wait a second.
+    if (in && !in->is_head() && in->is_auth() &&
+       lock->get_state() == LOCK_SNAP_SYNC) {
+      // okay, we actually need to kick the head's lock to get ourselves synced up.
+      CInode *head = mdcache->get_inode(in->ino());
+      assert(head);
+      SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
+      if (hlock->get_state() == LOCK_SYNC)
+       hlock = head->get_lock(lock->get_type());
+
+      if (hlock->get_state() != LOCK_SYNC) {
+       dout(10) << "rdlock_start trying head inode " << *head << dendl;
+       if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
+         return false;
+       // oh, check our lock again then
+      }
+    }
+
+    if (!_rdlock_kick(lock, as_anon))
+      break;
+  }
+
+  // wait!
+  int wait_on;
+  if (lock->get_parent()->is_auth() && lock->is_stable())
+    wait_on = SimpleLock::WAIT_RD;
+  else
+    wait_on = SimpleLock::WAIT_STABLE;  // REQRDLOCK is ignored if lock is unstable, so we need to retry.
+  dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
+  lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
+  nudge_log(lock);
+  return false;
+}
+
+void Locker::nudge_log(SimpleLock *lock)
+{
+  dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
+  if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked())    // as with xlockdone, or cap flush
+    mds->mdlog->flush();
+}
+
+void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+{
+  // drop ref
+  lock->put_rdlock();
+  if (mut) {
+    mut->rdlocks.erase(lock);
+    mut->locks.erase(lock);
+  }
+
+  dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
+  
+  // last one?
+  if (!lock->is_rdlocked()) {
+    if (!lock->is_stable())
+      eval_gather(lock, false, pneed_issue);
+    else if (lock->get_parent()->is_auth())
+      try_eval(lock, pneed_issue);
+  }
+}
+
+
+bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
+{
+  dout(10) << "can_rdlock_set " << locks << dendl;
+  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
+    if (!(*p)->can_rdlock(-1)) {
+      dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
+      return false;
+    }
+  return true;
+}
+
+bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
+{
+  dout(10) << "rdlock_try_set " << locks << dendl;
+  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
+    if (!rdlock_try(*p, -1, NULL)) {
+      dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
+      return false;
+    }
+  return true;
+}
+
+void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
+{
+  dout(10) << "rdlock_take_set " << locks << dendl;
+  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
+    (*p)->get_rdlock();
+    mut->rdlocks.insert(*p);
+    mut->locks.insert(*p);
+  }
+}
+
+// ------------------
+// wrlock
+
+void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
+{
+  if (lock->get_type() == CEPH_LOCK_IVERSION ||
+      lock->get_type() == CEPH_LOCK_DVERSION)
+    return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
+
+  dout(7) << "wrlock_force  on " << *lock
+         << " on " << *lock->get_parent() << dendl;  
+  lock->get_wrlock(true);
+  mut->wrlocks.insert(lock);
+  mut->locks.insert(lock);
+}
+
+bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
+{
+  if (lock->get_type() == CEPH_LOCK_IVERSION ||
+      lock->get_type() == CEPH_LOCK_DVERSION)
+    return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
+
+  dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
+
+  CInode *in = static_cast<CInode *>(lock->get_parent());
+  client_t client = mut->get_client();
+  bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
+                     (in->has_subtree_or_exporting_dirfrag() ||
+                      static_cast<ScatterLock*>(lock)->get_scatter_wanted());
+
+  while (1) {
+    // wrlock?
+    if (lock->can_wrlock(client) &&
+       (!want_scatter || lock->get_state() == LOCK_MIX)) {
+      lock->get_wrlock();
+      mut->wrlocks.insert(lock);
+      mut->locks.insert(lock);
+      return true;
+    }
+
+    if (lock->get_type() == CEPH_LOCK_IFILE &&
+       in->state_test(CInode::STATE_RECOVERING)) {
+      mds->mdcache->recovery_queue.prioritize(in);
+    }
+
+    if (!lock->is_stable())
+      break;
+
+    if (in->is_auth()) {
+      // don't do nested lock state change if we have dirty scatterdata and
+      // may scatter_writebehind or start_scatter, because nowait==true implies
+      // that the caller already has a log entry open!
+      if (nowait && lock->is_dirty())
+       return false;
+
+      if (want_scatter)
+       scatter_mix(static_cast<ScatterLock*>(lock));
+      else
+       simple_lock(lock);
+
+      if (nowait && !lock->can_wrlock(client))
+       return false;
+      
+    } else {
+      // replica.
+      // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
+      mds_rank_t auth = lock->get_parent()->authority().first;
+      if (!mds->is_cluster_degraded() ||
+         mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
+       dout(10) << "requesting scatter from auth on "
+                << *lock << " on " << *lock->get_parent() << dendl;
+       mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
+      }
+      break;
+    }
+  }
+
+  if (!nowait) {
+    dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
+    lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
+    nudge_log(lock);
+  }
+    
+  return false;
+}
+
+void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+{
+  if (lock->get_type() == CEPH_LOCK_IVERSION ||
+      lock->get_type() == CEPH_LOCK_DVERSION)
+    return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
+
+  dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
+  lock->put_wrlock();
+  if (mut) {
+    mut->wrlocks.erase(lock);
+    if (mut->remote_wrlocks.count(lock) == 0)
+      mut->locks.erase(lock);
+  }
+
+  if (!lock->is_wrlocked()) {
+    if (!lock->is_stable())
+      eval_gather(lock, false, pneed_issue);
+    else if (lock->get_parent()->is_auth())
+      try_eval(lock, pneed_issue);
+  }
+}
+
+
+// remote wrlock
+
+void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
+{
+  dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
+
+  // wait for active target
+  if (mds->is_cluster_degraded() &&
+      !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
+    dout(7) << " mds." << target << " is not active" << dendl;
+    if (mut->more()->waiting_on_slave.empty())
+      mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
+    return;
+  }
+
+  // send lock request
+  mut->start_locking(lock, target);
+  mut->more()->slaves.insert(target);
+  MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
+                                            MMDSSlaveRequest::OP_WRLOCK);
+  r->set_lock_type(lock->get_type());
+  lock->get_parent()->set_object_info(r->get_object_info());
+  mds->send_message_mds(r, target);
+
+  assert(mut->more()->waiting_on_slave.count(target) == 0);
+  mut->more()->waiting_on_slave.insert(target);
+}
+
+void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
+                                  MutationImpl *mut)
+{
+  // drop ref
+  mut->remote_wrlocks.erase(lock);
+  if (mut->wrlocks.count(lock) == 0)
+    mut->locks.erase(lock);
+  
+  dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
+         << " " << *lock->get_parent()  << dendl;
+  if (!mds->is_cluster_degraded() ||
+      mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
+    MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
+                                                     MMDSSlaveRequest::OP_UNWRLOCK);
+    slavereq->set_lock_type(lock->get_type());
+    lock->get_parent()->set_object_info(slavereq->get_object_info());
+    mds->send_message_mds(slavereq, target);
+  }
+}
+
+
+// ------------------
+// xlock
+
+bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
+{
+  if (lock->get_type() == CEPH_LOCK_IVERSION ||
+      lock->get_type() == CEPH_LOCK_DVERSION)
+    return local_xlock_start(static_cast<LocalLock*>(lock), mut);
+
+  dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
+  client_t client = mut->get_client();
+
+  // auth?
+  if (lock->get_parent()->is_auth()) {
+    // auth
+    while (1) {
+      if (lock->can_xlock(client)) {
+       lock->set_state(LOCK_XLOCK);
+       lock->get_xlock(mut, client);
+       mut->xlocks.insert(lock);
+       mut->locks.insert(lock);
+       mut->finish_locking(lock);
+       return true;
+      }
+      
+      if (lock->get_type() == CEPH_LOCK_IFILE) {
+       CInode *in = static_cast<CInode*>(lock->get_parent());
+       if (in->state_test(CInode::STATE_RECOVERING)) {
+         mds->mdcache->recovery_queue.prioritize(in);
+       }
+      }
+
+      if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
+                                lock->get_xlock_by_client() != client ||
+                                lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
+       break;
+
+      if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
+       mut->start_locking(lock);
+       simple_xlock(lock);
+      } else {
+       simple_lock(lock);
+      }
+    }
+    
+    lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
+    nudge_log(lock);
+    return false;
+  } else {
+    // replica
+    assert(lock->get_sm()->can_remote_xlock);
+    assert(!mut->slave_request);
+    
+    // wait for single auth
+    if (lock->get_parent()->is_ambiguous_auth()) {
+      lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, 
+                                    new C_MDS_RetryRequest(mdcache, mut));
+      return false;
+    }
+    
+    // wait for active auth
+    mds_rank_t auth = lock->get_parent()->authority().first;
+    if (mds->is_cluster_degraded() &&
+       !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
+      dout(7) << " mds." << auth << " is not active" << dendl;
+      if (mut->more()->waiting_on_slave.empty())
+       mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
+      return false;
+    }
+
+    // send lock request
+    mut->more()->slaves.insert(auth);
+    mut->start_locking(lock, auth);
+    MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
+                                              MMDSSlaveRequest::OP_XLOCK);
+    r->set_lock_type(lock->get_type());
+    lock->get_parent()->set_object_info(r->get_object_info());
+    mds->send_message_mds(r, auth);
+
+    assert(mut->more()->waiting_on_slave.count(auth) == 0);
+    mut->more()->waiting_on_slave.insert(auth);
+
+    return false;
+  }
+}
+
+void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
+{
+  assert(!lock->is_stable());
+  if (lock->get_num_rdlocks() == 0 &&
+      lock->get_num_wrlocks() == 0 &&
+      lock->get_num_client_lease() == 0 &&
+      lock->get_state() != LOCK_XLOCKSNAP &&
+      lock->get_type() != CEPH_LOCK_DN) {
+    CInode *in = static_cast<CInode*>(lock->get_parent());
+    client_t loner = in->get_target_loner();
+    if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
+      lock->set_state(LOCK_EXCL);
+      lock->get_parent()->auth_unpin(lock);
+      lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
+      if (lock->get_cap_shift())
+       *pneed_issue = true;
+      if (lock->get_parent()->is_auth() &&
+         lock->is_stable())
+       try_eval(lock, pneed_issue);
+      return;
+    }
+  }
+  // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
+  eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
+}
+
+void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+{
+  if (lock->get_type() == CEPH_LOCK_IVERSION ||
+      lock->get_type() == CEPH_LOCK_DVERSION)
+    return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
+
+  dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
+
+  client_t xlocker = lock->get_xlock_by_client();
+
+  // drop ref
+  lock->put_xlock();
+  assert(mut);
+  mut->xlocks.erase(lock);
+  mut->locks.erase(lock);
+  
+  bool do_issue = false;
+
+  // remote xlock?
+  if (!lock->get_parent()->is_auth()) {
+    assert(lock->get_sm()->can_remote_xlock);
+
+    // tell auth
+    dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent()  << dendl;
+    mds_rank_t auth = lock->get_parent()->authority().first;
+    if (!mds->is_cluster_degraded() ||
+       mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
+      MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
+                                                       MMDSSlaveRequest::OP_UNXLOCK);
+      slavereq->set_lock_type(lock->get_type());
+      lock->get_parent()->set_object_info(slavereq->get_object_info());
+      mds->send_message_mds(slavereq, auth);
+    }
+    // others waiting?
+    lock->finish_waiters(SimpleLock::WAIT_STABLE |
+                        SimpleLock::WAIT_WR | 
+                        SimpleLock::WAIT_RD, 0); 
+  } else {
+    if (lock->get_num_xlocks() == 0) {
+      if (lock->get_state() == LOCK_LOCK_XLOCK)
+       lock->set_state(LOCK_XLOCKDONE);
+      _finish_xlock(lock, xlocker, &do_issue);
+    }
+  }
+  
+  if (do_issue) {
+    CInode *in = static_cast<CInode*>(lock->get_parent());
+    if (in->is_head()) {
+      if (pneed_issue)
+       *pneed_issue = true;
+      else
+       issue_caps(in);
+    }
+  }
+}
+
+void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
+{
+  dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
+
+  lock->put_xlock();
+  mut->xlocks.erase(lock);
+  mut->locks.erase(lock);
+
+  MDSCacheObject *p = lock->get_parent();
+  assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH));  // we are exporting this (inode)
+
+  if (!lock->is_stable())
+    lock->get_parent()->auth_unpin(lock);
+
+  lock->set_state(LOCK_LOCK);
+}
+
+void Locker::xlock_import(SimpleLock *lock)
+{
+  dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
+  lock->get_parent()->auth_pin(lock);
+}
+
+
+
+// file i/o -----------------------------------------
+
+version_t Locker::issue_file_data_version(CInode *in)
+{
+  dout(7) << "issue_file_data_version on " << *in << dendl;
+  return in->inode.file_data_version;
+}
+
+class C_Locker_FileUpdate_finish : public LockerLogContext {
+  CInode *in;
+  MutationRef mut;
+  bool share_max;
+  bool need_issue;
+  client_t client;
+  MClientCaps *ack;
+public:
+  C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
+                               bool sm=false, bool ni=false, client_t c=-1,
+                               MClientCaps *ac = 0)
+    : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
+      client(c), ack(ac) {
+    in->get(CInode::PIN_PTRWAITER);
+  }
+  void finish(int r) override {
+    locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
+    in->put(CInode::PIN_PTRWAITER);
+  }
+};
+
+void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
+                               client_t client, MClientCaps *ack)
+{
+  dout(10) << "file_update_finish on " << *in << dendl;
+  in->pop_and_dirty_projected_inode(mut->ls);
+
+  mut->apply();
+  
+  if (ack) {
+    Session *session = mds->get_session(client);
+    if (session) {
+      // "oldest flush tid" > 0 means client uses unique TID for each flush
+      if (ack->get_oldest_flush_tid() > 0)
+       session->add_completed_flush(ack->get_client_tid());
+      mds->send_message_client_counted(ack, session);
+    } else {
+      dout(10) << " no session for client." << client << " " << *ack << dendl;
+      ack->put();
+    }
+  }
+
+  set<CInode*> need_issue;
+  drop_locks(mut.get(), &need_issue);
+
+  if (!in->is_head() && !in->client_snap_caps.empty()) {
+    dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
+    // check for snap writeback completion
+    bool gather = false;
+    compact_map<int,set<client_t> >::iterator p = in->client_snap_caps.begin();
+    while (p != in->client_snap_caps.end()) {
+      SimpleLock *lock = in->get_lock(p->first);
+      assert(lock);
+      dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
+              << " lock " << *lock << " on " << *in << dendl;
+      lock->put_wrlock();
+
+      p->second.erase(client);
+      if (p->second.empty()) {
+       gather = true;
+       in->client_snap_caps.erase(p++);
+      } else
+       ++p;
+    }
+    if (gather) {
+      if (in->client_snap_caps.empty())
+       in->item_open_file.remove_myself();
+      eval_cap_gather(in, &need_issue);
+    }
+  } else {
+    if (issue_client_cap && need_issue.count(in) == 0) {
+      Capability *cap = in->get_client_cap(client);
+      if (cap && (cap->wanted() & ~cap->pending()))
+       issue_caps(in, cap);
+    }
+  
+    if (share_max && in->is_auth() &&
+       (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
+      share_inode_max_size(in);
+  }
+  issue_caps_set(need_issue);
+
+  // auth unpin after issuing caps
+  mut->cleanup();
+}
+
+Capability* Locker::issue_new_caps(CInode *in,
+                                  int mode,
+                                  Session *session,
+                                  SnapRealm *realm,
+                                  bool is_replay)
+{
+  dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
+  bool is_new;
+
+  // if replay, try to reconnect cap, and otherwise do nothing.
+  if (is_replay) {
+    mds->mdcache->try_reconnect_cap(in, session);
+    return 0;
+  }
+
+  // my needs
+  assert(session->info.inst.name.is_client());
+  client_t my_client = session->info.inst.name.num();
+  int my_want = ceph_caps_for_mode(mode);
+
+  // register a capability
+  Capability *cap = in->get_client_cap(my_client);
+  if (!cap) {
+    // new cap
+    cap = in->add_client_cap(my_client, session, realm);
+    cap->set_wanted(my_want);
+    cap->mark_new();
+    cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
+    is_new = true;
+  } else {
+    is_new = false;
+    // make sure it wants sufficient caps
+    if (my_want & ~cap->wanted()) {
+      // augment wanted caps for this client
+      cap->set_wanted(cap->wanted() | my_want);
+    }
+  }
+
+  if (in->is_auth()) {
+    // [auth] twiddle mode?
+    eval(in, CEPH_CAP_LOCKS);
+
+    if (_need_flush_mdlog(in, my_want))
+      mds->mdlog->flush();
+
+  } else {
+    // [replica] tell auth about any new caps wanted
+    request_inode_file_caps(in);
+  }
+
+  // issue caps (pot. incl new one)
+  //issue_caps(in);  // note: _eval above may have done this already...
+
+  // re-issue whatever we can
+  //cap->issue(cap->pending());
+
+  if (is_new)
+    cap->dec_suppress();
+
+  return cap;
+}
+
+
+void Locker::issue_caps_set(set<CInode*>& inset)
+{
+  for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
+    issue_caps(*p);
+}
+
+bool Locker::issue_caps(CInode *in, Capability *only_cap)
+{
+  // allowed caps are determined by the lock mode.
+  int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
+  int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
+  int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
+
+  client_t loner = in->get_loner();
+  if (loner >= 0) {
+    dout(7) << "issue_caps loner client." << loner
+           << " allowed=" << ccap_string(loner_allowed) 
+           << ", xlocker allowed=" << ccap_string(xlocker_allowed)
+           << ", others allowed=" << ccap_string(all_allowed)
+           << " on " << *in << dendl;
+  } else {
+    dout(7) << "issue_caps allowed=" << ccap_string(all_allowed) 
+           << ", xlocker allowed=" << ccap_string(xlocker_allowed)
+           << " on " << *in << dendl;
+  }
+
+  assert(in->is_head());
+
+  // count conflicts with
+  int nissued = 0;        
+
+  // client caps
+  map<client_t, Capability*>::iterator it;
+  if (only_cap)
+    it = in->client_caps.find(only_cap->get_client());
+  else
+    it = in->client_caps.begin();
+  for (; it != in->client_caps.end(); ++it) {
+    Capability *cap = it->second;
+    if (cap->is_stale())
+      continue;
+
+    // do not issue _new_ bits when size|mtime is projected
+    int allowed;
+    if (loner == it->first)
+      allowed = loner_allowed;
+    else
+      allowed = all_allowed;
+
+    // add in any xlocker-only caps (for locks this client is the xlocker for)
+    allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
+
+    Session *session = mds->get_session(it->first);
+    if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
+       !(session && session->connection &&
+         session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
+      allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
+
+    int pending = cap->pending();
+    int wanted = cap->wanted();
+
+    dout(20) << " client." << it->first
+            << " pending " << ccap_string(pending) 
+            << " allowed " << ccap_string(allowed) 
+            << " wanted " << ccap_string(wanted)
+            << dendl;
+
+    if (!(pending & ~allowed)) {
+      // skip if suppress or new, and not revocation
+      if (cap->is_new() || cap->is_suppress()) {
+       dout(20) << "  !revoke and new|suppressed, skipping client." << it->first << dendl;
+       continue;
+      }
+    }
+
+    // notify clients about deleted inode, to make sure they release caps ASAP.
+    if (in->inode.nlink == 0)
+      wanted |= CEPH_CAP_LINK_SHARED;
+
+    // are there caps that the client _wants_ and can have, but aren't pending?
+    // or do we need to revoke?
+    if (((wanted & allowed) & ~pending) ||  // missing wanted+allowed caps
+       (pending & ~allowed)) {             // need to revoke ~allowed caps.
+      // issue
+      nissued++;
+
+      // include caps that clients generally like, while we're at it.
+      int likes = in->get_caps_liked();      
+      int before = pending;
+      long seq;
+      if (pending & ~allowed)
+       seq = cap->issue((wanted|likes) & allowed & pending);  // if revoking, don't issue anything new.
+      else
+       seq = cap->issue((wanted|likes) & allowed);
+      int after = cap->pending();
+
+      if (cap->is_new()) {
+       // haven't send caps to client yet
+       if (before & ~after)
+         cap->confirm_receipt(seq, after);
+      } else {
+        dout(7) << "   sending MClientCaps to client." << it->first
+               << " seq " << cap->get_last_seq()
+               << " new pending " << ccap_string(after) << " was " << ccap_string(before) 
+               << dendl;
+
+       int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
+       if (op == CEPH_CAP_OP_REVOKE) {
+               revoking_caps.push_back(&cap->item_revoking_caps);
+               revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
+               cap->set_last_revoke_stamp(ceph_clock_now());
+               cap->reset_num_revoke_warnings();
+       }
+
+       MClientCaps *m = new MClientCaps(op, in->ino(),
+                                        in->find_snaprealm()->inode->ino(),
+                                        cap->get_cap_id(), cap->get_last_seq(),
+                                        after, wanted, 0,
+                                        cap->get_mseq(),
+                                         mds->get_osd_epoch_barrier());
+       in->encode_cap_message(m, cap);
+
+       mds->send_message_client_counted(m, it->first);
+      }
+    }
+
+    if (only_cap)
+      break;
+  }
+
+  return (nissued == 0);  // true if no re-issued, no callbacks
+}
+
+void Locker::issue_truncate(CInode *in)
+{
+  dout(7) << "issue_truncate on " << *in << dendl;
+  
+  for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
+       it != in->client_caps.end();
+       ++it) {
+    Capability *cap = it->second;
+    MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
+                                    in->ino(),
+                                    in->find_snaprealm()->inode->ino(),
+                                    cap->get_cap_id(), cap->get_last_seq(),
+                                    cap->pending(), cap->wanted(), 0,
+                                    cap->get_mseq(),
+                                     mds->get_osd_epoch_barrier());
+    in->encode_cap_message(m, cap);                         
+    mds->send_message_client_counted(m, it->first);
+  }
+
+  // should we increase max_size?
+  if (in->is_auth() && in->is_file())
+    check_inode_max_size(in);
+}
+
+
+void Locker::revoke_stale_caps(Capability *cap)
+{
+  CInode *in = cap->get_inode();
+  if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
+    // if export succeeds, the cap will be removed. if export fails, we need to
+    // revoke the cap if it's still stale.
+    in->state_set(CInode::STATE_EVALSTALECAPS);
+    return;
+  }
+
+  int issued = cap->issued();
+  if (issued & ~CEPH_CAP_PIN) {
+    dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
+    cap->revoke();
+
+    if (in->is_auth() &&
+       in->inode.client_ranges.count(cap->get_client()))
+      in->state_set(CInode::STATE_NEEDSRECOVER);
+
+    if (!in->filelock.is_stable()) eval_gather(&in->filelock);
+    if (!in->linklock.is_stable()) eval_gather(&in->linklock);
+    if (!in->authlock.is_stable()) eval_gather(&in->authlock);
+    if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
+
+    if (in->is_auth()) {
+      try_eval(in, CEPH_CAP_LOCKS);
+    } else {
+      request_inode_file_caps(in);
+    }
+  }
+}
+
+void Locker::revoke_stale_caps(Session *session)
+{
+  dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
+
+  for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
+    Capability *cap = *p;
+    cap->mark_stale();
+    revoke_stale_caps(cap);
+  }
+}
+
+void Locker::resume_stale_caps(Session *session)
+{
+  dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
+
+  for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
+    Capability *cap = *p;
+    CInode *in = cap->get_inode();
+    assert(in->is_head());
+    if (cap->is_stale()) {
+      dout(10) << " clearing stale flag on " << *in << dendl;
+      cap->clear_stale();
+
+      if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
+       // if export succeeds, the cap will be removed. if export fails,
+       // we need to re-issue the cap if it's not stale.
+       in->state_set(CInode::STATE_EVALSTALECAPS);
+       continue;
+      }
+
+      if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
+       issue_caps(in, cap);
+    }
+  }
+}
+
+void Locker::remove_stale_leases(Session *session)
+{
+  dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
+  xlist<ClientLease*>::iterator p = session->leases.begin();
+  while (!p.end()) {
+    ClientLease *l = *p;
+    ++p;
+    CDentry *parent = static_cast<CDentry*>(l->parent);
+    dout(15) << " removing lease on " << *parent << dendl;
+    parent->remove_client_lease(l, this);
+  }
+}
+
+
+class C_MDL_RequestInodeFileCaps : public LockerContext {
+  CInode *in;
+public:
+  C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
+    in->get(CInode::PIN_PTRWAITER);
+  }
+  void finish(int r) override {
+    if (!in->is_auth())
+      locker->request_inode_file_caps(in);
+    in->put(CInode::PIN_PTRWAITER);
+  }
+};
+
+void Locker::request_inode_file_caps(CInode *in)
+{
+  assert(!in->is_auth());
+
+  int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
+  if (wanted != in->replica_caps_wanted) {
+    // wait for single auth
+    if (in->is_ambiguous_auth()) {
+      in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, 
+                     new C_MDL_RequestInodeFileCaps(this, in));
+      return;
+    }
+
+    mds_rank_t auth = in->authority().first;
+    if (mds->is_cluster_degraded() &&
+       mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
+      mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
+      return;
+    }
+
+    dout(7) << "request_inode_file_caps " << ccap_string(wanted)
+            << " was " << ccap_string(in->replica_caps_wanted) 
+            << " on " << *in << " to mds." << auth << dendl;
+
+    in->replica_caps_wanted = wanted;
+
+    if (!mds->is_cluster_degraded() ||
+       mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
+      mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
+                           auth);
+  }
+}
+
+/* This function DOES put the passed message before returning */
+void Locker::handle_inode_file_caps(MInodeFileCaps *m)
+{
+  // nobody should be talking to us during recovery.
+  assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
+
+  // ok
+  CInode *in = mdcache->get_inode(m->get_ino());
+  mds_rank_t from = mds_rank_t(m->get_source().num());
+
+  assert(in);
+  assert(in->is_auth());
+
+  dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
+
+  if (m->get_caps())
+    in->mds_caps_wanted[from] = m->get_caps();
+  else
+    in->mds_caps_wanted.erase(from);
+
+  try_eval(in, CEPH_CAP_LOCKS);
+  m->put();
+}
+
+
+class C_MDL_CheckMaxSize : public LockerContext {
+  CInode *in;
+  uint64_t new_max_size;
+  uint64_t newsize;
+  utime_t mtime;
+
+public:
+  C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
+                     uint64_t _newsize, utime_t _mtime) :
+    LockerContext(l), in(i),
+    new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
+  {
+    in->get(CInode::PIN_PTRWAITER);
+  }
+  void finish(int r) override {
+    if (in->is_auth())
+      locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
+    in->put(CInode::PIN_PTRWAITER);
+  }
+};
+
+uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size)
+{
+  uint64_t new_max = (size + 1) << 1;
+  uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
+  if (max_inc > 0) {
+    max_inc *= pi->get_layout_size_increment();
+    new_max = MIN(new_max, size + max_inc);
+  }
+  return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
+}
+
+void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
+                                   map<client_t,client_writeable_range_t> *new_ranges,
+                                   bool *max_increased)
+{
+  inode_t *latest = in->get_projected_inode();
+  uint64_t ms;
+  if(latest->has_layout()) {
+    ms = calc_new_max_size(latest, size);
+  } else {
+    // Layout-less directories like ~mds0/, have zero size
+    ms = 0;
+  }
+
+  // increase ranges as appropriate.
+  // shrink to 0 if no WR|BUFFER caps issued.
+  for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
+       p != in->client_caps.end();
+       ++p) {
+    if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
+      client_writeable_range_t& nr = (*new_ranges)[p->first];
+      nr.range.first = 0;
+      if (latest->client_ranges.count(p->first)) {
+       client_writeable_range_t& oldr = latest->client_ranges[p->first];
+       if (ms > oldr.range.last)
+         *max_increased = true;
+       nr.range.last = MAX(ms, oldr.range.last);
+       nr.follows = oldr.follows;
+      } else {
+       *max_increased = true;
+       nr.range.last = ms;
+       nr.follows = in->first - 1;
+      }
+    }
+  }
+}
+
+bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
+                                 uint64_t new_max_size, uint64_t new_size,
+                                 utime_t new_mtime)
+{
+  assert(in->is_auth());
+  assert(in->is_file());
+
+  inode_t *latest = in->get_projected_inode();
+  map<client_t, client_writeable_range_t> new_ranges;
+  uint64_t size = latest->size;
+  bool update_size = new_size > 0;
+  bool update_max = false;
+  bool max_increased = false;
+
+  if (update_size) {
+    new_size = size = MAX(size, new_size);
+    new_mtime = MAX(new_mtime, latest->mtime);
+    if (latest->size == new_size && latest->mtime == new_mtime)
+      update_size = false;
+  }
+
+  calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
+
+  if (max_increased || latest->client_ranges != new_ranges)
+    update_max = true;
+
+  if (!update_size && !update_max) {
+    dout(20) << "check_inode_max_size no-op on " << *in << dendl;
+    return false;
+  }
+
+  dout(10) << "check_inode_max_size new_ranges " << new_ranges
+          << " update_size " << update_size
+          << " on " << *in << dendl;
+
+  if (in->is_frozen()) {
+    dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
+    C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
+                                                     new_max_size,
+                                                     new_size,
+                                                     new_mtime);
+    in->add_waiter(CInode::WAIT_UNFREEZE, cms);
+    return false;
+  }
+  if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
+    // lock?
+    if (in->filelock.is_stable()) {
+      if (in->get_target_loner() >= 0)
+       file_excl(&in->filelock);
+      else
+       simple_lock(&in->filelock);
+    }
+    if (!in->filelock.can_wrlock(in->get_loner())) {
+      // try again later
+      C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
+                                                       new_max_size,
+                                                       new_size,
+                                                       new_mtime);
+
+      in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
+      dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
+      return false;    
+    }
+  }
+
+  MutationRef mut(new MutationImpl());
+  mut->ls = mds->mdlog->get_current_segment();
+    
+  inode_t *pi = in->project_inode();
+  pi->version = in->pre_dirty();
+
+  if (update_max) {
+    dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
+    pi->client_ranges = new_ranges;
+  }
+
+  if (update_size) {
+    dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl;
+    pi->size = new_size;
+    pi->rstat.rbytes = new_size;
+    dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl;
+    pi->mtime = new_mtime;
+  }
+
+  // use EOpen if the file is still open; otherwise, use EUpdate.
+  // this is just an optimization to push open files forward into
+  // newer log segments.
+  LogEvent *le;
+  EMetaBlob *metablob;
+  if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {   
+    EOpen *eo = new EOpen(mds->mdlog);
+    eo->add_ino(in->ino());
+    metablob = &eo->metablob;
+    le = eo;
+    mut->ls->open_files.push_back(&in->item_open_file);
+  } else {
+    EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
+    metablob = &eu->metablob;
+    le = eu;
+  }
+  mds->mdlog->start_entry(le);
+  if (update_size) {  // FIXME if/when we do max_size nested accounting
+    mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
+    // no cow, here!
+    CDentry *parent = in->get_projected_parent_dn();
+    metablob->add_primary_dentry(parent, in, true);
+  } else {
+    metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
+    mdcache->journal_dirty_inode(mut.get(), metablob, in);
+  }
+  mds->mdlog->submit_entry(le,
+          new C_Locker_FileUpdate_finish(this, in, mut, true));
+  wrlock_force(&in->filelock, mut);  // wrlock for duration of journal
+  mut->auth_pin(in);
+
+  // make max_size _increase_ timely
+  if (max_increased)
+    mds->mdlog->flush();
+
+  return true;
+}
+
+
+void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
+{
+  /*
+   * only share if currently issued a WR cap.  if client doesn't have it,
+   * file_max doesn't matter, and the client will get it if/when they get
+   * the cap later.
+   */
+  dout(10) << "share_inode_max_size on " << *in << dendl;
+  map<client_t, Capability*>::iterator it;
+  if (only_cap)
+    it = in->client_caps.find(only_cap->get_client());
+  else
+    it = in->client_caps.begin();
+  for (; it != in->client_caps.end(); ++it) {
+    const client_t client = it->first;
+    Capability *cap = it->second;
+    if (cap->is_suppress())
+      continue;
+    if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
+      dout(10) << "share_inode_max_size with client." << client << dendl;
+      cap->inc_last_seq();
+      MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
+                                      in->ino(),
+                                      in->find_snaprealm()->inode->ino(),
+                                      cap->get_cap_id(), cap->get_last_seq(),
+                                      cap->pending(), cap->wanted(), 0,
+                                       cap->get_mseq(),
+                                       mds->get_osd_epoch_barrier());
+      in->encode_cap_message(m, cap);
+      mds->send_message_client_counted(m, client);
+    }
+    if (only_cap)
+      break;
+  }
+}
+
+bool Locker::_need_flush_mdlog(CInode *in, int wanted)
+{
+  /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
+   * pending mutations. */
+  if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
+       in->filelock.is_unstable_and_locked()) ||
+      ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
+       in->authlock.is_unstable_and_locked()) ||
+      ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
+       in->linklock.is_unstable_and_locked()) ||
+      ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
+       in->xattrlock.is_unstable_and_locked()))
+    return true;
+  return false;
+}
+
+void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
+{
+  if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
+    dout(10) << " wanted " << ccap_string(cap->wanted())
+            << " -> " << ccap_string(wanted) << dendl;
+    cap->set_wanted(wanted);
+  } else if (wanted & ~cap->wanted()) {
+    dout(10) << " wanted " << ccap_string(cap->wanted())
+            << " -> " << ccap_string(wanted)
+            << " (added caps even though we had seq mismatch!)" << dendl;
+    cap->set_wanted(wanted | cap->wanted());
+  } else {
+    dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
+            << " -> " << ccap_string(wanted)
+            << " (issue_seq " << issue_seq << " != last_issue "
+            << cap->get_last_issue() << ")" << dendl;
+    return;
+  }
+
+  CInode *cur = cap->get_inode();
+  if (!cur->is_auth()) {
+    request_inode_file_caps(cur);
+    return;
+  }
+
+  if (cap->wanted() == 0) {
+    if (cur->item_open_file.is_on_list() &&
+       !cur->is_any_caps_wanted()) {
+      dout(10) << " removing unwanted file from open file list " << *cur << dendl;
+      cur->item_open_file.remove_myself();
+    }
+  } else {
+    if (cur->state_test(CInode::STATE_RECOVERING) &&
+       (cap->wanted() & (CEPH_CAP_FILE_RD |
+                         CEPH_CAP_FILE_WR))) {
+      mds->mdcache->recovery_queue.prioritize(cur);
+    }
+
+    if (!cur->item_open_file.is_on_list()) {
+      dout(10) << " adding to open file list " << *cur << dendl;
+      assert(cur->last == CEPH_NOSNAP);
+      LogSegment *ls = mds->mdlog->get_current_segment();
+      EOpen *le = new EOpen(mds->mdlog);
+      mds->mdlog->start_entry(le);
+      le->add_clean_inode(cur);
+      ls->open_files.push_back(&cur->item_open_file);
+      mds->mdlog->submit_entry(le);
+    }
+  }
+}
+
+
+
+void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
+{
+  dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
+  for (auto p = head_in->client_need_snapflush.begin();
+       p != head_in->client_need_snapflush.end() && p->first < last; ) {
+    snapid_t snapid = p->first;
+    set<client_t>& clients = p->second;
+    ++p;  // be careful, q loop below depends on this
+
+    if (clients.count(client)) {
+      dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
+      CInode *sin = mdcache->get_inode(head_in->ino(), snapid);
+      if (!sin) {
+       // hrm, look forward until we find the inode. 
+       //  (we can only look it up by the last snapid it is valid for)
+       dout(10) << " didn't have " << head_in->ino() << " snapid " << snapid << dendl;
+       for (compact_map<snapid_t, set<client_t> >::iterator q = p;  // p is already at next entry
+            q != head_in->client_need_snapflush.end();
+            ++q) {
+         dout(10) << " trying snapid " << q->first << dendl;
+         sin = mdcache->get_inode(head_in->ino(), q->first);
+         if (sin) {
+           assert(sin->first <= snapid);
+           break;
+         }
+         dout(10) << " didn't have " << head_in->ino() << " snapid " << q->first << dendl;
+       }
+       if (!sin && head_in->is_multiversion())
+         sin = head_in;
+       assert(sin);
+      }
+      _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
+      head_in->remove_need_snapflush(sin, snapid, client);
+    }
+  }
+}
+
+
+bool Locker::should_defer_client_cap_frozen(CInode *in)
+{
+  /*
+   * This policy needs to be AT LEAST as permissive as allowing a client request
+   * to go forward, or else a client request can release something, the release
+   * gets deferred, but the request gets processed and deadlocks because when the
+   * caps can't get revoked.
+   *
+   * Currently, a request wait if anything locked is freezing (can't
+   * auth_pin), which would avoid any deadlock with cap release.  Thus @in
+   * _MUST_ be in the lock/auth_pin set.
+   *
+   * auth_pins==0 implies no unstable lock and not auth pinnned by
+   * client request, otherwise continue even it's freezing.
+   */
+  return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
+}
+
+/*
+ * This function DOES put the passed message before returning
+ */
+void Locker::handle_client_caps(MClientCaps *m)
+{
+  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  client_t client = m->get_source().num();
+
+  snapid_t follows = m->get_snap_follows();
+  dout(7) << "handle_client_caps "
+         << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
+         << " on " << m->get_ino()
+         << " tid " << m->get_client_tid() << " follows " << follows
+         << " op " << ceph_cap_op_name(m->get_op()) << dendl;
+
+  if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
+    if (!session) {
+      dout(5) << " no session, dropping " << *m << dendl;
+      m->put();
+      return;
+    }
+    if (session->is_closed() ||
+       session->is_closing() ||
+       session->is_killing()) {
+      dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
+      m->put();
+      return;
+    }
+    if (mds->is_reconnect() &&
+       m->get_dirty() && m->get_client_tid() > 0 &&
+       !session->have_completed_flush(m->get_client_tid())) {
+      mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
+    }
+    mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
+    return;
+  }
+
+  if (m->get_client_tid() > 0 && session &&
+      session->have_completed_flush(m->get_client_tid())) {
+    dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
+           << " for client." << client << dendl;
+    MClientCaps *ack;
+    if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
+      ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
+                           m->get_dirty(), 0, mds->get_osd_epoch_barrier());
+    } else {
+      ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
+                           m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
+                           mds->get_osd_epoch_barrier());
+    }
+    ack->set_snap_follows(follows);
+    ack->set_client_tid(m->get_client_tid());
+    mds->send_message_client_counted(ack, m->get_connection());
+    if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
+      m->put();
+      return;
+    } else {
+      // fall-thru because the message may release some caps
+      m->clear_dirty();
+      m->set_op(CEPH_CAP_OP_UPDATE);
+    }
+  }
+
+  // "oldest flush tid" > 0 means client uses unique TID for each flush
+  if (m->get_oldest_flush_tid() > 0 && session) {
+    if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
+      mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
+
+      if (session->get_num_trim_flushes_warnings() > 0 &&
+         session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
+       session->reset_num_trim_flushes_warnings();
+    } else {
+      if (session->get_num_completed_flushes() >=
+         (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
+       session->inc_num_trim_flushes_warnings();
+       stringstream ss;
+       ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
+          << m->get_oldest_flush_tid() << "), "
+          << session->get_num_completed_flushes()
+          << " completed flushes recorded in session";
+       mds->clog->warn() << ss.str();
+       dout(20) << __func__ << " " << ss.str() << dendl;
+      }
+    }
+  }
+
+  CInode *head_in = mdcache->get_inode(m->get_ino());
+  if (!head_in) {
+    if (mds->is_clientreplay()) {
+      dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
+       << ", will try again after replayed client requests" << dendl;
+      mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
+      return;
+    }
+    dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
+    m->put();
+    return;
+  }
+
+  if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
+    // Pause RADOS operations until we see the required epoch
+    mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
+  }
+
+  if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
+    // Record the barrier so that we will retransmit it to clients
+    mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
+  }
+
+  CInode *in = head_in;
+  if (follows > 0) {
+    in = mdcache->pick_inode_snap(head_in, follows);
+    if (in != head_in)
+      dout(10) << " head inode " << *head_in << dendl;
+  }
+  dout(10) << "  cap inode " << *in << dendl;
+
+  Capability *cap = 0;
+  cap = in->get_client_cap(client);
+  if (!cap && in != head_in)
+    cap = head_in->get_client_cap(client);
+  if (!cap) {
+    dout(7) << "handle_client_caps no cap for client." << client << " on " << *in << dendl;
+    m->put();
+    return;
+  }  
+  assert(cap);
+
+  // freezing|frozen?
+  if (should_defer_client_cap_frozen(in)) {
+    dout(7) << "handle_client_caps freezing|frozen on " << *in << dendl;
+    in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
+    return;
+  }
+  if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
+    dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
+           << ", dropping" << dendl;
+    m->put();
+    return;
+  }
+
+  int op = m->get_op();
+
+  // flushsnap?
+  if (op == CEPH_CAP_OP_FLUSHSNAP) {
+    if (!in->is_auth()) {
+      dout(7) << " not auth, ignoring flushsnap on " << *in << dendl;
+      goto out;
+    }
+
+    SnapRealm *realm = in->find_snaprealm();
+    snapid_t snap = realm->get_snap_following(follows);
+    dout(10) << "  flushsnap follows " << follows << " -> snap " << snap << dendl;
+
+    // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
+    // other cap ops.  (except possibly duplicate FLUSHSNAP requests, but worst
+    // case we get a dup response, so whatever.)
+    MClientCaps *ack = 0;
+    if (m->get_dirty()) {
+      ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
+      ack->set_snap_follows(follows);
+      ack->set_client_tid(m->get_client_tid());
+      ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
+    }
+
+    if (in == head_in ||
+       (head_in->client_need_snapflush.count(snap) &&
+        head_in->client_need_snapflush[snap].count(client))) {
+      dout(7) << " flushsnap snap " << snap
+             << " client." << client << " on " << *in << dendl;
+
+      // this cap now follows a later snap (i.e. the one initiating this flush, or later)
+      if (in == head_in)
+       cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
+      else if (head_in->client_need_snapflush.begin()->first < snap)
+       _do_null_snapflush(head_in, client, snap);
+   
+      _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
+
+      if (in != head_in)
+       head_in->remove_need_snapflush(in, snap, client);
+      
+    } else {
+      dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
+      if (ack)
+       mds->send_message_client_counted(ack, m->get_connection());
+    }
+    goto out;
+  }
+
+  if (cap->get_cap_id() != m->get_cap_id()) {
+    dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
+  } else {
+    // intermediate snap inodes
+    while (in != head_in) {
+      assert(in->last != CEPH_NOSNAP);
+      if (in->is_auth() && m->get_dirty()) {
+       dout(10) << " updating intermediate snapped inode " << *in << dendl;
+       _do_cap_update(in, NULL, m->get_dirty(), follows, m);
+      }
+      in = mdcache->pick_inode_snap(head_in, in->last);
+    }
+    // head inode, and cap
+    MClientCaps *ack = 0;
+
+    int caps = m->get_caps();
+    if (caps & ~cap->issued()) {
+      dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
+      caps &= cap->issued();
+    }
+    
+    cap->confirm_receipt(m->get_seq(), caps);
+    dout(10) << " follows " << follows
+            << " retains " << ccap_string(m->get_caps())
+            << " dirty " << ccap_string(m->get_dirty())
+            << " on " << *in << dendl;
+
+
+    // missing/skipped snapflush?
+    //  The client MAY send a snapflush if it is issued WR/EXCL caps, but
+    //  presently only does so when it has actual dirty metadata.  But, we
+    //  set up the need_snapflush stuff based on the issued caps.
+    //  We can infer that the client WONT send a FLUSHSNAP once they have
+    //  released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
+    //  update/release).
+    if (!head_in->client_need_snapflush.empty()) {
+      if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
+       _do_null_snapflush(head_in, client);
+      } else {
+       dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
+      }
+    }
+    
+    if (m->get_dirty() && in->is_auth()) {
+      dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty()) 
+             << " seq " << m->get_seq() << " on " << *in << dendl;
+      ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
+                           m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
+      ack->set_client_tid(m->get_client_tid());
+      ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
+    }
+
+    // filter wanted based on what we could ever give out (given auth/replica status)
+    bool need_flush = m->flags & CLIENT_CAPS_SYNC;
+    int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
+    if (new_wanted != cap->wanted()) {
+      if (!need_flush && (new_wanted & ~cap->pending())) {
+       // exapnding caps.  make sure we aren't waiting for a log flush
+       need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
+      }
+
+      adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
+    }
+      
+    if (in->is_auth() &&
+       _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
+      // updated
+      eval(in, CEPH_CAP_LOCKS);
+
+      if (!need_flush && (cap->wanted() & ~cap->pending()))
+       need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
+    } else {
+      // no update, ack now.
+      if (ack)
+       mds->send_message_client_counted(ack, m->get_connection());
+      
+      bool did_issue = eval(in, CEPH_CAP_LOCKS);
+      if (!did_issue && (cap->wanted() & ~cap->pending()))
+       issue_caps(in, cap);
+
+      if (cap->get_last_seq() == 0 &&
+         (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
+       cap->issue_norevoke(cap->issued());
+       share_inode_max_size(in, cap);
+      }
+    }
+
+    if (need_flush)
+      mds->mdlog->flush();
+  }
+
+ out:
+  m->put();
+}
+
+
+class C_Locker_RetryRequestCapRelease : public LockerContext {
+  client_t client;
+  ceph_mds_request_release item;
+public:
+  C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
+    LockerContext(l), client(c), item(it) { }
+  void finish(int r) override {
+    string dname;
+    MDRequestRef null_ref;
+    locker->process_request_cap_release(null_ref, client, item, dname);
+  }
+};
+
+void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
+                                        const string &dname)
+{
+  inodeno_t ino = (uint64_t)item.ino;
+  uint64_t cap_id = item.cap_id;
+  int caps = item.caps;
+  int wanted = item.wanted;
+  int seq = item.seq;
+  int issue_seq = item.issue_seq;
+  int mseq = item.mseq;
+
+  CInode *in = mdcache->get_inode(ino);
+  if (!in)
+    return;
+
+  if (dname.length()) {
+    frag_t fg = in->pick_dirfrag(dname);
+    CDir *dir = in->get_dirfrag(fg);
+    if (dir) {
+      CDentry *dn = dir->lookup(dname);
+      if (dn) {
+       ClientLease *l = dn->get_client_lease(client);
+       if (l) {
+         dout(10) << "process_cap_release removing lease on " << *dn << dendl;
+         dn->remove_client_lease(l, this);
+       } else {
+         dout(7) << "process_cap_release client." << client
+                 << " doesn't have lease on " << *dn << dendl;
+       }
+      } else {
+       dout(7) << "process_cap_release client." << client << " released lease on dn "
+               << dir->dirfrag() << "/" << dname << " which dne" << dendl;
+      }
+    }
+  }
+
+  Capability *cap = in->get_client_cap(client);
+  if (!cap)
+    return;
+
+  dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
+          << (mdr ? "" : " (DEFERRED, no mdr)")
+          << dendl;
+    
+  if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
+    dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
+    return;
+  }
+
+  if (cap->get_cap_id() != cap_id) {
+    dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
+    return;
+  }
+
+  if (should_defer_client_cap_frozen(in)) {
+    dout(7) << " frozen, deferring" << dendl;
+    in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
+    return;
+  }
+    
+  if (caps & ~cap->issued()) {
+    dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
+    caps &= cap->issued();
+  }
+  cap->confirm_receipt(seq, caps);
+
+  if (!in->client_need_snapflush.empty() &&
+      (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
+    _do_null_snapflush(in, client);
+  }
+
+  adjust_cap_wanted(cap, wanted, issue_seq);
+  
+  if (mdr)
+    cap->inc_suppress();
+  eval(in, CEPH_CAP_LOCKS);
+  if (mdr)
+    cap->dec_suppress();
+  
+  // take note; we may need to reissue on this cap later
+  if (mdr)
+    mdr->cap_releases[in->vino()] = cap->get_last_seq();
+}
+
+class C_Locker_RetryKickIssueCaps : public LockerContext {
+  CInode *in;
+  client_t client;
+  ceph_seq_t seq;
+public:
+  C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
+    LockerContext(l), in(i), client(c), seq(s) {
+    in->get(CInode::PIN_PTRWAITER);
+  }
+  void finish(int r) override {
+    locker->kick_issue_caps(in, client, seq);
+    in->put(CInode::PIN_PTRWAITER);
+  }
+};
+
+void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
+{
+  Capability *cap = in->get_client_cap(client);
+  if (!cap || cap->get_last_sent() != seq)
+    return;
+  if (in->is_frozen()) {
+    dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
+    in->add_waiter(CInode::WAIT_UNFREEZE,
+       new C_Locker_RetryKickIssueCaps(this, in, client, seq));
+    return;
+  }
+  dout(10) << "kick_issue_caps released at current seq " << seq
+    << ", reissuing" << dendl;
+  issue_caps(in, cap);
+}
+
+void Locker::kick_cap_releases(MDRequestRef& mdr)
+{
+  client_t client = mdr->get_client();
+  for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
+       p != mdr->cap_releases.end();
+       ++p) {
+    CInode *in = mdcache->get_inode(p->first);
+    if (!in)
+      continue;
+    kick_issue_caps(in, client, p->second);
+  }
+}
+
+/**
+ * m and ack might be NULL, so don't dereference them unless dirty != 0
+ */
+void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
+{
+  dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
+          << " follows " << follows << " snap " << snap
+          << " on " << *in << dendl;
+
+  if (snap == CEPH_NOSNAP) {
+    // hmm, i guess snap was already deleted?  just ack!
+    dout(10) << " wow, the snap following " << follows
+            << " was already deleted.  nothing to record, just ack." << dendl;
+    if (ack)
+      mds->send_message_client_counted(ack, m->get_connection());
+    return;
+  }
+
+  EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
+  mds->mdlog->start_entry(le);
+  MutationRef mut = new MutationImpl();
+  mut->ls = mds->mdlog->get_current_segment();
+
+  // normal metadata updates that we can apply to the head as well.
+
+  // update xattrs?
+  bool xattrs = false;
+  map<string,bufferptr> *px = 0;
+  if ((dirty & CEPH_CAP_XATTR_EXCL) && 
+      m->xattrbl.length() &&
+      m->head.xattr_version > in->get_projected_inode()->xattr_version)
+    xattrs = true;
+
+  old_inode_t *oi = 0;
+  if (in->is_multiversion()) {
+    oi = in->pick_old_inode(snap);
+  }
+
+  inode_t *pi;
+  if (oi) {
+    dout(10) << " writing into old inode" << dendl;
+    pi = in->project_inode();
+    pi->version = in->pre_dirty();
+    if (snap > oi->first)
+      in->split_old_inode(snap);
+    pi = &oi->inode;
+    if (xattrs)
+      px = &oi->xattrs;
+  } else {
+    if (xattrs)
+      px = new map<string,bufferptr>;
+    pi = in->project_inode(px);
+    pi->version = in->pre_dirty();
+  }
+
+  _update_cap_fields(in, dirty, m, pi);
+
+  // xattr
+  if (px) {
+    dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version
+           << " len " << m->xattrbl.length() << dendl;
+    pi->xattr_version = m->head.xattr_version;
+    bufferlist::iterator p = m->xattrbl.begin();
+    ::decode(*px, p);
+  }
+
+  if (pi->client_ranges.count(client)) {
+    if (in->last == snap) {
+      dout(10) << "  removing client_range entirely" << dendl;
+      pi->client_ranges.erase(client);
+    } else {
+      dout(10) << "  client_range now follows " << snap << dendl;
+      pi->client_ranges[client].follows = snap;
+    }
+  }
+
+  mut->auth_pin(in);
+  mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
+  mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
+
+  // "oldest flush tid" > 0 means client uses unique TID for each flush
+  if (ack && ack->get_oldest_flush_tid() > 0)
+    le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
+                                 ack->get_oldest_flush_tid());
+
+  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
+                                                             client, ack));
+}
+
+void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi)
+{
+  if (dirty == 0)
+    return;
+
+  /* m must be valid if there are dirty caps */
+  assert(m);
+  uint64_t features = m->get_connection()->get_features();
+
+  if (m->get_ctime() > pi->ctime) {
+    dout(7) << "  ctime " << pi->ctime << " -> " << m->get_ctime()
+           << " for " << *in << dendl;
+    pi->ctime = m->get_ctime();
+  }
+
+  if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
+      m->get_change_attr() > pi->change_attr) {
+    dout(7) << "  change_attr " << pi->change_attr << " -> " << m->get_change_attr()
+           << " for " << *in << dendl;
+    pi->change_attr = m->get_change_attr();
+  }
+
+  // file
+  if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
+    utime_t atime = m->get_atime();
+    utime_t mtime = m->get_mtime();
+    uint64_t size = m->get_size();
+    version_t inline_version = m->inline_version;
+    
+    if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
+       ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
+      dout(7) << "  mtime " << pi->mtime << " -> " << mtime
+             << " for " << *in << dendl;
+      pi->mtime = mtime;
+    }
+    if (in->inode.is_file() &&   // ONLY if regular file
+       size > pi->size) {
+      dout(7) << "  size " << pi->size << " -> " << size
+             << " for " << *in << dendl;
+      pi->size = size;
+      pi->rstat.rbytes = size;
+    }
+    if (in->inode.is_file() &&
+        (dirty & CEPH_CAP_FILE_WR) &&
+        inline_version > pi->inline_data.version) {
+      pi->inline_data.version = inline_version;
+      if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
+       pi->inline_data.get_data() = m->inline_data;
+      else
+       pi->inline_data.free_data();
+    }
+    if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
+      dout(7) << "  atime " << pi->atime << " -> " << atime
+             << " for " << *in << dendl;
+      pi->atime = atime;
+    }
+    if ((dirty & CEPH_CAP_FILE_EXCL) &&
+       ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
+      dout(7) << "  time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
+             << " for " << *in << dendl;
+      pi->time_warp_seq = m->get_time_warp_seq();
+    }
+  }
+  // auth
+  if (dirty & CEPH_CAP_AUTH_EXCL) {
+    if (m->head.uid != pi->uid) {
+      dout(7) << "  uid " << pi->uid
+             << " -> " << m->head.uid
+             << " for " << *in << dendl;
+      pi->uid = m->head.uid;
+    }
+    if (m->head.gid != pi->gid) {
+      dout(7) << "  gid " << pi->gid
+             << " -> " << m->head.gid
+             << " for " << *in << dendl;
+      pi->gid = m->head.gid;
+    }
+    if (m->head.mode != pi->mode) {
+      dout(7) << "  mode " << oct << pi->mode
+             << " -> " << m->head.mode << dec
+             << " for " << *in << dendl;
+      pi->mode = m->head.mode;
+    }
+    if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
+      dout(7) << "  btime " << oct << pi->btime
+             << " -> " << m->get_btime() << dec
+             << " for " << *in << dendl;
+      pi->btime = m->get_btime();
+    }
+  }
+}
+
+/*
+ * update inode based on cap flush|flushsnap|wanted.
+ *  adjust max_size, if needed.
+ * if we update, return true; otherwise, false (no updated needed).
+ */
+bool Locker::_do_cap_update(CInode *in, Capability *cap,
+                           int dirty, snapid_t follows,
+                           MClientCaps *m, MClientCaps *ack,
+                           bool *need_flush)
+{
+  dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
+          << " issued " << ccap_string(cap ? cap->issued() : 0)
+          << " wanted " << ccap_string(cap ? cap->wanted() : 0)
+          << " on " << *in << dendl;
+  assert(in->is_auth());
+  client_t client = m->get_source().num();
+  inode_t *latest = in->get_projected_inode();
+
+  // increase or zero max_size?
+  uint64_t size = m->get_size();
+  bool change_max = false;
+  uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
+  uint64_t new_max = old_max;
+  
+  if (in->is_file()) {
+    bool forced_change_max = false;
+    dout(20) << "inode is file" << dendl;
+    if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
+      dout(20) << "client has write caps; m->get_max_size="
+               << m->get_max_size() << "; old_max=" << old_max << dendl;
+      if (m->get_max_size() > new_max) {
+       dout(10) << "client requests file_max " << m->get_max_size()
+                << " > max " << old_max << dendl;
+       change_max = true;
+       forced_change_max = true;
+       new_max = calc_new_max_size(latest, m->get_max_size());
+      } else {
+       new_max = calc_new_max_size(latest, size);
+
+       if (new_max > old_max)
+         change_max = true;
+       else
+         new_max = old_max;
+      }
+    } else {
+      if (old_max) {
+       change_max = true;
+       new_max = 0;
+      }
+    }
+
+    if (in->last == CEPH_NOSNAP &&
+       change_max &&
+       !in->filelock.can_wrlock(client) &&
+       !in->filelock.can_force_wrlock(client)) {
+      dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
+      if (in->filelock.is_stable()) {
+       bool need_issue = false;
+       if (cap)
+         cap->inc_suppress();
+       if (in->mds_caps_wanted.empty() &&
+           (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
+         if (in->filelock.get_state() != LOCK_EXCL)
+           file_excl(&in->filelock, &need_issue);
+       } else
+         simple_lock(&in->filelock, &need_issue);
+       if (need_issue)
+         issue_caps(in);
+       if (cap)
+         cap->dec_suppress();
+      }
+      if (!in->filelock.can_wrlock(client) &&
+         !in->filelock.can_force_wrlock(client)) {
+       C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
+                                                        forced_change_max ? new_max : 0,
+                                                        0, utime_t());
+
+       in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
+       change_max = false;
+      }
+    }
+  }
+
+  if (m->flockbl.length()) {
+    int32_t num_locks;
+    bufferlist::iterator bli = m->flockbl.begin();
+    ::decode(num_locks, bli);
+    for ( int i=0; i < num_locks; ++i) {
+      ceph_filelock decoded_lock;
+      ::decode(decoded_lock, bli);
+      in->get_fcntl_lock_state()->held_locks.
+       insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
+      ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
+    }
+    ::decode(num_locks, bli);
+    for ( int i=0; i < num_locks; ++i) {
+      ceph_filelock decoded_lock;
+      ::decode(decoded_lock, bli);
+      in->get_flock_lock_state()->held_locks.
+       insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
+      ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
+    }
+  }
+
+  if (!dirty && !change_max)
+    return false;
+
+  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  if (session->check_access(in, MAY_WRITE,
+                           m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
+    session->put();
+    dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
+    return false;
+  }
+  session->put();
+
+  // do the update.
+  EUpdate *le = new EUpdate(mds->mdlog, "cap update");
+  mds->mdlog->start_entry(le);
+
+  // xattrs update?
+  map<string,bufferptr> *px = 0;
+  if ((dirty & CEPH_CAP_XATTR_EXCL) && 
+      m->xattrbl.length() &&
+      m->head.xattr_version > in->get_projected_inode()->xattr_version)
+    px = new map<string,bufferptr>;
+
+  inode_t *pi = in->project_inode(px);
+  pi->version = in->pre_dirty();
+
+  MutationRef mut(new MutationImpl());
+  mut->ls = mds->mdlog->get_current_segment();
+
+  _update_cap_fields(in, dirty, m, pi);
+
+  if (change_max) {
+    dout(7) << "  max_size " << old_max << " -> " << new_max
+           << " for " << *in << dendl;
+    if (new_max) {
+      pi->client_ranges[client].range.first = 0;
+      pi->client_ranges[client].range.last = new_max;
+      pi->client_ranges[client].follows = in->first - 1;
+    } else 
+      pi->client_ranges.erase(client);
+  }
+    
+  if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) 
+    wrlock_force(&in->filelock, mut);  // wrlock for duration of journal
+
+  // auth
+  if (dirty & CEPH_CAP_AUTH_EXCL)
+    wrlock_force(&in->authlock, mut);
+
+  // xattr
+  if (px) {
+    dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl;
+    pi->xattr_version = m->head.xattr_version;
+    bufferlist::iterator p = m->xattrbl.begin();
+    ::decode(*px, p);
+
+    wrlock_force(&in->xattrlock, mut);
+  }
+  
+  mut->auth_pin(in);
+  mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
+  mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
+
+  // "oldest flush tid" > 0 means client uses unique TID for each flush
+  if (ack && ack->get_oldest_flush_tid() > 0)
+    le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
+                                 ack->get_oldest_flush_tid());
+
+  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
+                                                             change_max, !!cap,
+                                                             client, ack));
+  if (need_flush && !*need_flush &&
+      ((change_max && new_max) || // max INCREASE
+       _need_flush_mdlog(in, dirty)))
+    *need_flush = true;
+
+  return true;
+}
+
+/* This function DOES put the passed message before returning */
+void Locker::handle_client_cap_release(MClientCapRelease *m)
+{
+  client_t client = m->get_source().num();
+  dout(10) << "handle_client_cap_release " << *m << dendl;
+
+  if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
+    mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
+    return;
+  }
+
+  if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
+    // Pause RADOS operations until we see the required epoch
+    mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
+  }
+
+  if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
+    // Record the barrier so that we will retransmit it to clients
+    mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
+  }
+
+  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+
+  for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
+    _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
+  }
+
+  if (session) {
+    session->notify_cap_release(m->caps.size());
+  }
+
+  m->put();
+}
+
+class C_Locker_RetryCapRelease : public LockerContext {
+  client_t client;
+  inodeno_t ino;
+  uint64_t cap_id;
+  ceph_seq_t migrate_seq;
+  ceph_seq_t issue_seq;
+public:
+  C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
+                          ceph_seq_t mseq, ceph_seq_t seq) :
+    LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
+  void finish(int r) override {
+    locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
+  }
+};
+
+void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
+                            ceph_seq_t mseq, ceph_seq_t seq)
+{
+  CInode *in = mdcache->get_inode(ino);
+  if (!in) {
+    dout(7) << "_do_cap_release missing ino " << ino << dendl;
+    return;
+  }
+  Capability *cap = in->get_client_cap(client);
+  if (!cap) {
+    dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
+    return;
+  }
+
+  dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
+  if (cap->get_cap_id() != cap_id) {
+    dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
+    return;
+  }
+  if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
+    dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
+    return;
+  }
+  if (should_defer_client_cap_frozen(in)) {
+    dout(7) << " freezing|frozen, deferring" << dendl;
+    in->add_waiter(CInode::WAIT_UNFREEZE,
+                  new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
+    return;
+  }
+  if (seq != cap->get_last_issue()) {
+    dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
+    // clean out any old revoke history
+    cap->clean_revoke_from(seq);
+    eval_cap_gather(in);
+    return;
+  }
+  remove_client_cap(in, client);
+}
+
+/* This function DOES put the passed message before returning */
+
+void Locker::remove_client_cap(CInode *in, client_t client)
+{
+  // clean out any pending snapflush state
+  if (!in->client_need_snapflush.empty())
+    _do_null_snapflush(in, client);
+
+  in->remove_client_cap(client);
+
+  if (in->is_auth()) {
+    // make sure we clear out the client byte range
+    if (in->get_projected_inode()->client_ranges.count(client) &&
+       !(in->inode.nlink == 0 && !in->is_any_caps()))    // unless it's unlink + stray
+      check_inode_max_size(in);
+  } else {
+    request_inode_file_caps(in);
+  }
+  
+  try_eval(in, CEPH_CAP_LOCKS);
+}
+
+
+/**
+ * Return true if any currently revoking caps exceed the
+ * mds_revoke_cap_timeout threshold.
+ */
+bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
+{
+    xlist<Capability*>::const_iterator p = revoking.begin();
+    if (p.end()) {
+      // No revoking caps at the moment
+      return false;
+    } else {
+      utime_t now = ceph_clock_now();
+      utime_t age = now - (*p)->get_last_revoke_stamp();
+      if (age <= g_conf->mds_revoke_cap_timeout) {
+          return false;
+      } else {
+          return true;
+      }
+    }
+}
+
+
+void Locker::get_late_revoking_clients(std::list<client_t> *result) const
+{
+  if (!any_late_revoking_caps(revoking_caps)) {
+    // Fast path: no misbehaving clients, execute in O(1)
+    return;
+  }
+
+  // Slow path: execute in O(N_clients)
+  std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
+  for (client_rc_iter = revoking_caps_by_client.begin();
+       client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
+    xlist<Capability*> const &client_rc = client_rc_iter->second;
+    bool any_late = any_late_revoking_caps(client_rc);
+    if (any_late) {
+        result->push_back(client_rc_iter->first);
+    }
+  }
+}
+
+// Hard-code instead of surfacing a config settings because this is
+// really a hack that should go away at some point when we have better
+// inspection tools for getting at detailed cap state (#7316)
+#define MAX_WARN_CAPS 100
+
+void Locker::caps_tick()
+{
+  utime_t now = ceph_clock_now();
+
+  dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
+
+  int i = 0;
+  for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
+    Capability *cap = *p;
+
+    utime_t age = now - cap->get_last_revoke_stamp();
+    dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
+    if (age <= g_conf->mds_revoke_cap_timeout) {
+      dout(20) << __func__ << " age below timeout " << g_conf->mds_revoke_cap_timeout << dendl;
+      break;
+    } else {
+      ++i;
+      if (i > MAX_WARN_CAPS) {
+        dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
+          << "revoking, ignoring subsequent caps" << dendl;
+        break;
+      }
+    }
+    // exponential backoff of warning intervals
+    if (age > g_conf->mds_revoke_cap_timeout * (1 << cap->get_num_revoke_warnings())) {
+      cap->inc_num_revoke_warnings();
+      stringstream ss;
+      ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
+        << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
+        << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
+      mds->clog->warn() << ss.str();
+      dout(20) << __func__ << " " << ss.str() << dendl;
+    } else {
+      dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
+    }
+  }
+}
+
+
+void Locker::handle_client_lease(MClientLease *m)
+{
+  dout(10) << "handle_client_lease " << *m << dendl;
+
+  assert(m->get_source().is_client());
+  client_t client = m->get_source().num();
+
+  CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
+  if (!in) {
+    dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
+    m->put();
+    return;
+  }
+  CDentry *dn = 0;
+
+  frag_t fg = in->pick_dirfrag(m->dname);
+  CDir *dir = in->get_dirfrag(fg);
+  if (dir) 
+    dn = dir->lookup(m->dname);
+  if (!dn) {
+    dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
+    m->put();
+    return;
+  }
+  dout(10) << " on " << *dn << dendl;
+
+  // replica and lock
+  ClientLease *l = dn->get_client_lease(client);
+  if (!l) {
+    dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
+    m->put();
+    return;
+  } 
+
+  switch (m->get_action()) {
+  case CEPH_MDS_LEASE_REVOKE_ACK:
+  case CEPH_MDS_LEASE_RELEASE:
+    if (l->seq != m->get_seq()) {
+      dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
+    } else {
+      dout(7) << "handle_client_lease client." << client
+             << " on " << *dn << dendl;
+      dn->remove_client_lease(l, this);
+    }
+    m->put();
+    break;
+
+  case CEPH_MDS_LEASE_RENEW:
+    {
+      dout(7) << "handle_client_lease client." << client << " renew on " << *dn
+             << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
+      if (dn->lock.can_lease(client)) {
+       int pool = 1;   // fixme.. do something smart!
+       m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
+       m->h.seq = ++l->seq;
+       m->clear_payload();
+
+       utime_t now = ceph_clock_now();
+       now += mdcache->client_lease_durations[pool];
+       mdcache->touch_client_lease(l, pool, now);
+
+       mds->send_message_client_counted(m, m->get_connection());
+      }
+    }
+    break;
+
+  default:
+    ceph_abort(); // implement me
+    break;
+  }
+}
+
+
+void Locker::issue_client_lease(CDentry *dn, client_t client,
+                              bufferlist &bl, utime_t now, Session *session)
+{
+  CInode *diri = dn->get_dir()->get_inode();
+  if (!diri->is_stray() &&  // do not issue dn leases in stray dir!
+      ((!diri->filelock.can_lease(client) &&
+       (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
+      dn->lock.can_lease(client)) {
+    int pool = 1;   // fixme.. do something smart!
+    // issue a dentry lease
+    ClientLease *l = dn->add_client_lease(client, session);
+    session->touch_lease(l);
+    
+    now += mdcache->client_lease_durations[pool];
+    mdcache->touch_client_lease(l, pool, now);
+
+    LeaseStat e;
+    e.mask = 1 | CEPH_LOCK_DN;  // old and new bit values
+    e.seq = ++l->seq;
+    e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
+    ::encode(e, bl);
+    dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
+            << " on " << *dn << dendl;
+  } else {
+    // null lease
+    LeaseStat e;
+    e.mask = 0;
+    e.seq = 0;
+    e.duration_ms = 0;
+    ::encode(e, bl);
+    dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
+  }
+}
+
+
+void Locker::revoke_client_leases(SimpleLock *lock)
+{
+  int n = 0;
+  CDentry *dn = static_cast<CDentry*>(lock->get_parent());
+  for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
+       p != dn->client_lease_map.end();
+       ++p) {
+    ClientLease *l = p->second;
+    
+    n++;
+    assert(lock->get_type() == CEPH_LOCK_DN);
+
+    CDentry *dn = static_cast<CDentry*>(lock->get_parent());
+    int mask = 1 | CEPH_LOCK_DN; // old and new bits
+    
+    // i should also revoke the dir ICONTENT lease, if they have it!
+    CInode *diri = dn->get_dir()->get_inode();
+    mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
+                                             mask,
+                                             diri->ino(),
+                                             diri->first, CEPH_NOSNAP,
+                                             dn->get_name()),
+                            l->client);
+  }
+  assert(n == lock->get_num_client_lease());
+}
+
+
+
+// locks ----------------------------------------------------------------
+
+SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info) 
+{
+  switch (lock_type) {
+  case CEPH_LOCK_DN:
+    {
+      // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
+      CInode *diri = mdcache->get_inode(info.dirfrag.ino);
+      frag_t fg;
+      CDir *dir = 0;
+      CDentry *dn = 0;
+      if (diri) {
+       fg = diri->pick_dirfrag(info.dname);
+       dir = diri->get_dirfrag(fg);
+       if (dir) 
+         dn = dir->lookup(info.dname, info.snapid);
+      }
+      if (!dn) {
+       dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
+       return 0;
+      }
+      return &dn->lock;
+    }
+
+  case CEPH_LOCK_IAUTH:
+  case CEPH_LOCK_ILINK:
+  case CEPH_LOCK_IDFT:
+  case CEPH_LOCK_IFILE:
+  case CEPH_LOCK_INEST:
+  case CEPH_LOCK_IXATTR:
+  case CEPH_LOCK_ISNAP:
+  case CEPH_LOCK_IFLOCK:
+  case CEPH_LOCK_IPOLICY:
+    {
+      CInode *in = mdcache->get_inode(info.ino, info.snapid);
+      if (!in) {
+       dout(7) << "get_lock don't have ino " << info.ino << dendl;
+       return 0;
+      }
+      switch (lock_type) {
+      case CEPH_LOCK_IAUTH: return &in->authlock;
+      case CEPH_LOCK_ILINK: return &in->linklock;
+      case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
+      case CEPH_LOCK_IFILE: return &in->filelock;
+      case CEPH_LOCK_INEST: return &in->nestlock;
+      case CEPH_LOCK_IXATTR: return &in->xattrlock;
+      case CEPH_LOCK_ISNAP: return &in->snaplock;
+      case CEPH_LOCK_IFLOCK: return &in->flocklock;
+      case CEPH_LOCK_IPOLICY: return &in->policylock;
+      }
+    }
+
+  default:
+    dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
+    ceph_abort();
+    break;
+  }
+
+  return 0;  
+}
+
+/* This function DOES put the passed message before returning */
+void Locker::handle_lock(MLock *m)
+{
+  // nobody should be talking to us during recovery.
+  assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
+
+  SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
+  if (!lock) {
+    dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
+    m->put();
+    return;
+  }
+
+  switch (lock->get_type()) {
+  case CEPH_LOCK_DN:
+  case CEPH_LOCK_IAUTH:
+  case CEPH_LOCK_ILINK:
+  case CEPH_LOCK_ISNAP:
+  case CEPH_LOCK_IXATTR:
+  case CEPH_LOCK_IFLOCK:
+  case CEPH_LOCK_IPOLICY:
+    handle_simple_lock(lock, m);
+    break;
+    
+  case CEPH_LOCK_IDFT:
+  case CEPH_LOCK_INEST:
+    //handle_scatter_lock((ScatterLock*)lock, m);
+    //break;
+
+  case CEPH_LOCK_IFILE:
+    handle_file_lock(static_cast<ScatterLock*>(lock), m);
+    break;
+    
+  default:
+    dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
+    ceph_abort();
+    break;
+  }
+}
+
+
+
+
+// ==========================================================================
+// simple lock
+
+/** This function may take a reference to m if it needs one, but does
+ * not put references. */
+void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
+{
+  MDSCacheObject *parent = lock->get_parent();
+  if (parent->is_auth() &&
+      lock->get_state() != LOCK_SYNC &&
+      !parent->is_frozen()) {
+    dout(7) << "handle_reqrdlock got rdlock request on " << *lock
+           << " on " << *parent << dendl;
+    assert(parent->is_auth()); // replica auth pinned if they're doing this!
+    if (lock->is_stable()) {
+      simple_sync(lock);
+    } else {
+      dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
+      lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
+                       new C_MDS_RetryMessage(mds, m->get()));
+    }
+  } else {
+    dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
+           << " on " << *parent << dendl;
+    // replica should retry
+  }
+}
+
+/* This function DOES put the passed message before returning */
+void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
+{
+  int from = m->get_asker();
+  
+  dout(10) << "handle_simple_lock " << *m
+          << " on " << *lock << " " << *lock->get_parent() << dendl;
+
+  if (mds->is_rejoin()) {
+    if (lock->get_parent()->is_rejoining()) {
+      dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
+             << ", dropping " << *m << dendl;
+      m->put();
+      return;
+    }
+  }
+
+  switch (m->get_action()) {
+    // -- replica --
+  case LOCK_AC_SYNC:
+    assert(lock->get_state() == LOCK_LOCK);
+    lock->decode_locked_state(m->get_data());
+    lock->set_state(LOCK_SYNC);
+    lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
+    break;
+    
+  case LOCK_AC_LOCK:
+    assert(lock->get_state() == LOCK_SYNC);
+    lock->set_state(LOCK_SYNC_LOCK);
+    if (lock->is_leased())
+      revoke_client_leases(lock);
+    eval_gather(lock, true);
+    if (lock->is_unstable_and_locked())
+      mds->mdlog->flush();
+    break;
+
+
+    // -- auth --
+  case LOCK_AC_LOCKACK:
+    assert(lock->get_state() == LOCK_SYNC_LOCK ||
+          lock->get_state() == LOCK_SYNC_EXCL);
+    assert(lock->is_gathering(from));
+    lock->remove_gather(from);
+    
+    if (lock->is_gathering()) {
+      dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
+             << ", still gathering " << lock->get_gather_set() << dendl;
+    } else {
+      dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
+             << ", last one" << dendl;
+      eval_gather(lock);
+    }
+    break;
+
+  case LOCK_AC_REQRDLOCK:
+    handle_reqrdlock(lock, m);
+    break;
+
+  }
+
+  m->put();
+}
+
+/* unused, currently.
+
+class C_Locker_SimpleEval : public Context {
+  Locker *locker;
+  SimpleLock *lock;
+public:
+  C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
+  void finish(int r) {
+    locker->try_simple_eval(lock);
+  }
+};
+
+void Locker::try_simple_eval(SimpleLock *lock)
+{
+  // unstable and ambiguous auth?
+  if (!lock->is_stable() &&
+      lock->get_parent()->is_ambiguous_auth()) {
+    dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
+    //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
+    lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
+    return;
+  }
+
+  if (!lock->get_parent()->is_auth()) {
+    dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
+    return;
+  }
+
+  if (!lock->get_parent()->can_auth_pin()) {
+    dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
+    //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
+    lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
+    return;
+  }
+
+  if (lock->is_stable())
+    simple_eval(lock);
+}
+*/
+
+
+void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
+{
+  dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
+
+  assert(lock->get_parent()->is_auth());
+  assert(lock->is_stable());
+
+  if (lock->get_parent()->is_freezing_or_frozen()) {
+    // dentry lock in unreadable state can block path traverse
+    if ((lock->get_type() != CEPH_LOCK_DN ||
+        lock->get_state() == LOCK_SYNC ||
+        lock->get_parent()->is_frozen()))
+      return;
+  }
+
+  if (mdcache->is_readonly()) {
+    if (lock->get_state() != LOCK_SYNC) {
+      dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
+      simple_sync(lock, need_issue);
+    }
+    return;
+  }
+
+  CInode *in = 0;
+  int wanted = 0;
+  if (lock->get_type() != CEPH_LOCK_DN) {
+    in = static_cast<CInode*>(lock->get_parent());
+    in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
+  }
+  
+  // -> excl?
+  if (lock->get_state() != LOCK_EXCL &&
+      in && in->get_target_loner() >= 0 &&
+      (wanted & CEPH_CAP_GEXCL)) {
+    dout(7) << "simple_eval stable, going to excl " << *lock 
+           << " on " << *lock->get_parent() << dendl;
+    simple_excl(lock, need_issue);
+  }
+
+  // stable -> sync?
+  else if (lock->get_state() != LOCK_SYNC &&
+          !lock->is_wrlocked() &&
+          ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
+           (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
+    dout(7) << "simple_eval stable, syncing " << *lock 
+           << " on " << *lock->get_parent() << dendl;
+    simple_sync(lock, need_issue);
+  }
+}
+
+
+// mid
+
+bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
+{
+  dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
+  assert(lock->get_parent()->is_auth());
+  assert(lock->is_stable());
+
+  CInode *in = 0;
+  if (lock->get_cap_shift())
+    in = static_cast<CInode *>(lock->get_parent());
+
+  int old_state = lock->get_state();
+
+  if (old_state != LOCK_TSYN) {
+
+    switch (lock->get_state()) {
+    case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
+    case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
+    case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
+    case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
+    default: ceph_abort();
+    }
+
+    int gather = 0;
+    if (lock->is_wrlocked())
+      gather++;
+    
+    if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
+      send_lock_message(lock, LOCK_AC_SYNC);
+      lock->init_gather();
+      gather++;
+    }
+    
+    if (in && in->is_head()) {
+      if (in->issued_caps_need_gather(lock)) {
+       if (need_issue)
+         *need_issue = true;
+       else
+         issue_caps(in);
+       gather++;
+      }
+    }
+    
+    bool need_recover = false;
+    if (lock->get_type() == CEPH_LOCK_IFILE) {
+      assert(in);
+      if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
+        mds->mdcache->queue_file_recover(in);
+       need_recover = true;
+        gather++;
+      }
+    }
+    
+    if (!gather && lock->is_dirty()) {
+      lock->get_parent()->auth_pin(lock);
+      scatter_writebehind(static_cast<ScatterLock*>(lock));
+      mds->mdlog->flush();
+      return false;
+    }
+
+    if (gather) {
+      lock->get_parent()->auth_pin(lock);
+      if (need_recover)
+       mds->mdcache->do_file_recover();
+      return false;
+    }
+  }
+
+  if (lock->get_parent()->is_replicated()) {    // FIXME
+    bufferlist data;
+    lock->encode_locked_state(data);
+    send_lock_message(lock, LOCK_AC_SYNC, data);
+  }
+  lock->set_state(LOCK_SYNC);
+  lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
+  if (in && in->is_head()) {
+    if (need_issue)
+      *need_issue = true;
+    else
+      issue_caps(in);
+  }
+  return true;
+}
+
+void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
+{
+  dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
+  assert(lock->get_parent()->is_auth());
+  assert(lock->is_stable());
+
+  CInode *in = 0;
+  if (lock->get_cap_shift())
+    in = static_cast<CInode *>(lock->get_parent());
+
+  switch (lock->get_state()) {
+  case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
+  case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
+  case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
+  default: ceph_abort();
+  }
+  
+  int gather = 0;
+  if (lock->is_rdlocked())
+    gather++;
+  if (lock->is_wrlocked())
+    gather++;
+
+  if (lock->get_parent()->is_replicated() && 
+      lock->get_state() != LOCK_LOCK_EXCL &&
+      lock->get_state() != LOCK_XSYN_EXCL) {
+    send_lock_message(lock, LOCK_AC_LOCK);
+    lock->init_gather();
+    gather++;
+  }
+  
+  if (in && in->is_head()) {
+    if (in->issued_caps_need_gather(lock)) {
+      if (need_issue)
+       *need_issue = true;
+      else
+       issue_caps(in);
+      gather++;
+    }
+  }
+  
+  if (gather) {
+    lock->get_parent()->auth_pin(lock);
+  } else {
+    lock->set_state(LOCK_EXCL);
+    lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
+    if (in) {
+      if (need_issue)
+       *need_issue = true;
+      else
+       issue_caps(in);
+    }
+  }
+}
+
+void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
+{
+  dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
+  assert(lock->get_parent()->is_auth());
+  assert(lock->is_stable());
+  assert(lock->get_state() != LOCK_LOCK);
+  
+  CInode *in = 0;
+  if (lock->get_cap_shift())
+    in = static_cast<CInode *>(lock->get_parent());
+
+  int old_state = lock->get_state();
+
+  switch (lock->get_state()) {
+  case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
+  case LOCK_XSYN:
+    file_excl(static_cast<ScatterLock*>(lock), need_issue);
+    if (lock->get_state() != LOCK_EXCL)
+      return;
+    // fall-thru
+  case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
+  case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
+    (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
+    break;
+  case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
+  default: ceph_abort();
+  }
+
+  int gather = 0;
+  if (lock->is_leased()) {
+    gather++;
+    revoke_client_leases(lock);
+  }
+  if (lock->is_rdlocked())
+    gather++;
+  if (in && in->is_head()) {
+    if (in->issued_caps_need_gather(lock)) {
+      if (need_issue)
+       *need_issue = true;
+      else
+       issue_caps(in);
+      gather++;
+    }
+  }
+
+  bool need_recover = false;
+  if (lock->get_type() == CEPH_LOCK_IFILE) {
+    assert(in);
+    if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
+      mds->mdcache->queue_file_recover(in);
+      need_recover = true;
+      gather++;
+    }
+  }
+
+  if (lock->get_parent()->is_replicated() &&
+      lock->get_state() == LOCK_MIX_LOCK &&
+      gather) {
+    dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
+  } else {
+    // move to second stage of gather now, so we don't send the lock action later.
+    if (lock->get_state() == LOCK_MIX_LOCK)
+      lock->set_state(LOCK_MIX_LOCK2);
+
+    if (lock->get_parent()->is_replicated() &&
+       lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) {  // replica may already be LOCK
+      gather++;
+      send_lock_message(lock, LOCK_AC_LOCK);
+      lock->init_gather();
+    }
+  }
+
+  if (!gather && lock->is_dirty()) {
+    lock->get_parent()->auth_pin(lock);
+    scatter_writebehind(static_cast<ScatterLock*>(lock));
+    mds->mdlog->flush();
+    return;
+  }
+
+  if (gather) {
+    lock->get_parent()->auth_pin(lock);
+    if (need_recover)
+      mds->mdcache->do_file_recover();
+  } else {
+    lock->set_state(LOCK_LOCK);
+    lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
+  }
+}
+
+
+void Locker::simple_xlock(SimpleLock *lock)
+{
+  dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
+  assert(lock->get_parent()->is_auth());
+  //assert(lock->is_stable());
+  assert(lock->get_state() != LOCK_XLOCK);
+  
+  CInode *in = 0;
+  if (lock->get_cap_shift())
+    in = static_cast<CInode *>(lock->get_parent());
+
+  if (lock->is_stable())
+    lock->get_parent()->auth_pin(lock);
+
+  switch (lock->get_state()) {
+  case LOCK_LOCK: 
+  case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
+  default: ceph_abort();
+  }
+
+  int gather = 0;
+  if (lock->is_rdlocked())
+    gather++;
+  if (lock->is_wrlocked())
+    gather++;
+  
+  if (in && in->is_head()) {
+    if (in->issued_caps_need_gather(lock)) {
+      issue_caps(in);
+      gather++;
+    }
+  }
+
+  if (!gather) {
+    lock->set_state(LOCK_PREXLOCK);
+    //assert("shouldn't be called if we are already xlockable" == 0);
+  }
+}
+
+
+
+
+
+// ==========================================================================
+// scatter lock
+
+/*
+
+Some notes on scatterlocks.
+
+ - The scatter/gather is driven by the inode lock.  The scatter always
+   brings in the latest metadata from the fragments.
+
+ - When in a scattered/MIX state, fragments are only allowed to
+   update/be written to if the accounted stat matches the inode's
+   current version.
+
+ - That means, on gather, we _only_ assimilate diffs for frag metadata
+   that match the current version, because those are the only ones
+   written during this scatter/gather cycle.  (Others didn't permit
+   it.)  We increment the version and journal this to disk.
+
+ - When possible, we also simultaneously update our local frag
+   accounted stats to match.
+
+ - On scatter, the new inode info is broadcast to frags, both local
+   and remote.  If possible (auth and !frozen), the dirfrag auth
+   should update the accounted state (if it isn't already up to date).
+   Note that this may occur on both the local inode auth node and
+   inode replicas, so there are two potential paths. If it is NOT
+   possible, they need to mark_stale to prevent any possible writes.
+
+ - A scatter can be to MIX (potentially writeable) or to SYNC (read
+   only).  Both are opportunities to update the frag accounted stats,
+   even though only the MIX case is affected by a stale dirfrag.
+
+ - Because many scatter/gather cycles can potentially go by without a
+   frag being able to update its accounted stats (due to being frozen
+   by exports/refragments in progress), the frag may have (even very)
+   old stat versions.  That's fine.  If when we do want to update it,
+   we can update accounted_* and the version first.
+
+*/
+
+class C_Locker_ScatterWB : public LockerLogContext {
+  ScatterLock *lock;
+  MutationRef mut;
+public:
+  C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
+    LockerLogContext(l), lock(sl), mut(m) {}
+  void finish(int r) override { 
+    locker->scatter_writebehind_finish(lock, mut); 
+  }
+};
+
+void Locker::scatter_writebehind(ScatterLock *lock)
+{
+  CInode *in = static_cast<CInode*>(lock->get_parent());
+  dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
+
+  // journal
+  MutationRef mut(new MutationImpl());
+  mut->ls = mds->mdlog->get_current_segment();
+
+  // forcefully take a wrlock
+  lock->get_wrlock(true);
+  mut->wrlocks.insert(lock);
+  mut->locks.insert(lock);
+
+  in->pre_cow_old_inode();  // avoid cow mayhem
+
+  inode_t *pi = in->project_inode();
+  pi->version = in->pre_dirty();
+
+  in->finish_scatter_gather_update(lock->get_type());
+  lock->start_flush();
+
+  EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
+  mds->mdlog->start_entry(le);
+
+  mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
+  mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
+  
+  in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
+
+  mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
+}
+
+void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
+{
+  CInode *in = static_cast<CInode*>(lock->get_parent());
+  dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
+  in->pop_and_dirty_projected_inode(mut->ls);
+
+  lock->finish_flush();
+
+  // if replicas may have flushed in a mix->lock state, send another
+  // message so they can finish_flush().
+  if (in->is_replicated()) {
+    switch (lock->get_state()) {
+    case LOCK_MIX_LOCK:
+    case LOCK_MIX_LOCK2:
+    case LOCK_MIX_EXCL:
+    case LOCK_MIX_TSYN:
+      send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
+    }
+  }
+
+  mut->apply();
+  drop_locks(mut.get());
+  mut->cleanup();
+
+  if (lock->is_stable())
+    lock->finish_waiters(ScatterLock::WAIT_STABLE);
+
+  //scatter_eval_gather(lock);
+}
+
+void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
+{
+  dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
+
+  assert(lock->get_parent()->is_auth());
+  assert(lock->is_stable());
+
+  if (lock->get_parent()->is_freezing_or_frozen()) {
+    dout(20) << "  freezing|frozen" << dendl;
+    return;
+  }
+
+  if (mdcache->is_readonly()) {
+    if (lock->get_state() != LOCK_SYNC) {
+      dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
+      simple_sync(lock, need_issue);
+    }
+    return;
+  }
+  
+  if (!lock->is_rdlocked() &&
+      lock->get_state() != LOCK_MIX &&
+      lock->get_scatter_wanted()) {
+    dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
+            << " on " << *lock->get_parent() << dendl;
+    scatter_mix(lock, need_issue);
+    return;
+  }
+
+  if (lock->get_type() == CEPH_LOCK_INEST) {
+    // in general, we want to keep INEST writable at all times.
+    if (!lock->is_rdlocked()) {
+      if (lock->get_parent()->is_replicated()) {
+       if (lock->get_state() != LOCK_MIX)
+         scatter_mix(lock, need_issue);
+      } else {
+       if (lock->get_state() != LOCK_LOCK)
+         simple_lock(lock, need_issue);
+      }
+    }
+    return;
+  }
+
+  CInode *in = static_cast<CInode*>(lock->get_parent());
+  if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
+    // i _should_ be sync.
+    if (!lock->is_wrlocked() &&
+       lock->get_state() != LOCK_SYNC) {
+      dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
+      simple_sync(lock, need_issue);
+    }
+  }
+}
+
+
+/*
+ * mark a scatterlock to indicate that the dir fnode has some dirty data
+ */
+void Locker::mark_updated_scatterlock(ScatterLock *lock)
+{
+  lock->mark_dirty();
+  if (lock->get_updated_item()->is_on_list()) {
+    dout(10) << "mark_updated_scatterlock " << *lock
+            << " - already on list since " << lock->get_update_stamp() << dendl;
+  } else {
+    updated_scatterlocks.push_back(lock->get_updated_item());
+    utime_t now = ceph_clock_now();
+    lock->set_update_stamp(now);
+    dout(10) << "mark_updated_scatterlock " << *lock
+            << " - added at " << now << dendl;
+  }
+}
+
+/*
+ * this is called by scatter_tick and LogSegment::try_to_trim() when
+ * trying to flush dirty scattered data (i.e. updated fnode) back to
+ * the inode.
+ *
+ * we need to lock|scatter in order to push fnode changes into the
+ * inode.dirstat.
+ */
+void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
+{
+  CInode *p = static_cast<CInode *>(lock->get_parent());
+
+  if (p->is_frozen() || p->is_freezing()) {
+    dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
+    if (c) 
+      p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
+    else
+      // just requeue.  not ideal.. starvation prone..
+      updated_scatterlocks.push_back(lock->get_updated_item());
+    return;
+  }
+
+  if (p->is_ambiguous_auth()) {
+    dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
+    if (c) 
+      p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
+    else
+      // just requeue.  not ideal.. starvation prone..
+      updated_scatterlocks.push_back(lock->get_updated_item());
+    return;
+  }
+
+  if (p->is_auth()) {
+    int count = 0;
+    while (true) {
+      if (lock->is_stable()) {
+       // can we do it now?
+       //  (only if we're not replicated.. if we are, we really do need
+       //   to nudge the lock state!)
+       /*
+         actually, even if we're not replicated, we can't stay in MIX, because another mds
+         could discover and replicate us at any time.  if that happens while we're flushing,
+         they end up in MIX but their inode has the old scatterstat version.
+
+       if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
+         dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
+         scatter_writebehind(lock);
+         if (c)
+           lock->add_waiter(SimpleLock::WAIT_STABLE, c);
+         return;
+       }
+       */
+
+       if (mdcache->is_readonly()) {
+         if (lock->get_state() != LOCK_SYNC) {
+           dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
+           simple_sync(static_cast<ScatterLock*>(lock));
+         }
+         break;
+       }
+
+       // adjust lock state
+       dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
+       switch (lock->get_type()) {
+       case CEPH_LOCK_IFILE:
+         if (p->is_replicated() && lock->get_state() != LOCK_MIX)
+           scatter_mix(static_cast<ScatterLock*>(lock));
+         else if (lock->get_state() != LOCK_LOCK)
+           simple_lock(static_cast<ScatterLock*>(lock));
+         else
+           simple_sync(static_cast<ScatterLock*>(lock));
+         break;
+         
+       case CEPH_LOCK_IDFT:
+       case CEPH_LOCK_INEST:
+         if (p->is_replicated() && lock->get_state() != LOCK_MIX)
+           scatter_mix(lock);
+         else if (lock->get_state() != LOCK_LOCK)
+           simple_lock(lock);
+         else
+           simple_sync(lock);
+         break;
+       default:
+         ceph_abort();
+       }
+       ++count;
+       if (lock->is_stable() && count == 2) {
+         dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
+         // this should only realy happen when called via
+         // handle_file_lock due to AC_NUDGE, because the rest of the
+         // time we are replicated or have dirty data and won't get
+         // called.  bailing here avoids an infinite loop.
+         assert(!c); 
+         break;
+       }
+      } else {
+       dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
+       if (c)
+         lock->add_waiter(SimpleLock::WAIT_STABLE, c);
+       return;
+      }
+    }
+  } else {
+    dout(10) << "scatter_nudge replica, requesting scatter/unscatter of " 
+            << *lock << " on " << *p << dendl;
+    // request unscatter?
+    mds_rank_t auth = lock->get_parent()->authority().first;
+    if (!mds->is_cluster_degraded() ||
+       mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
+      mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
+
+    // wait...
+    if (c)
+      lock->add_waiter(SimpleLock::WAIT_STABLE, c);
+
+    // also, requeue, in case we had wrong auth or something
+    updated_scatterlocks.push_back(lock->get_updated_item());
+  }
+}
+
+void Locker::scatter_tick()
+{
+  dout(10) << "scatter_tick" << dendl;
+  
+  // updated
+  utime_t now = ceph_clock_now();
+  int n = updated_scatterlocks.size();
+  while (!updated_scatterlocks.empty()) {
+    ScatterLock *lock = updated_scatterlocks.front();
+
+    if (n-- == 0) break;  // scatter_nudge() may requeue; avoid looping
+    
+    if (!lock->is_dirty()) {
+      updated_scatterlocks.pop_front();
+      dout(10) << " removing from updated_scatterlocks " 
+              << *lock << " " << *lock->get_parent() << dendl;
+      continue;
+    }
+    if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
+      break;
+    updated_scatterlocks.pop_front();
+    scatter_nudge(lock, 0);
+  }
+  mds->mdlog->flush();
+}
+
+
+void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
+{
+  dout(10) << "scatter_tempsync " << *lock
+          << " on " << *lock->get_parent() << dendl;
+  assert(lock->get_parent()->is_auth());
+  assert(lock->is_stable());
+
+  assert(0 == "not fully implemented, at least not for filelock");
+
+  CInode *in = static_cast<CInode *>(lock->get_parent());
+
+  switch (lock->get_state()) {
+  case LOCK_SYNC: ceph_abort();   // this shouldn't happen
+  case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
+  case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
+  default: ceph_abort();
+  }
+
+  int gather = 0;
+  if (lock->is_wrlocked())
+    gather++;
+
+  if (lock->get_cap_shift() &&
+      in->is_head() &&
+      in->issued_caps_need_gather(lock)) {
+    if (need_issue)
+      *need_issue = true;
+    else
+      issue_caps(in);
+    gather++;
+  }
+
+  if (lock->get_state() == LOCK_MIX_TSYN &&
+      in->is_replicated()) {
+    lock->init_gather();
+    send_lock_message(lock, LOCK_AC_LOCK);
+    gather++;
+  }
+
+  if (gather) {
+    in->auth_pin(lock);
+  } else {
+    // do tempsync
+    lock->set_state(LOCK_TSYN);
+    lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
+    if (lock->get_cap_shift()) {
+      if (need_issue)
+       *need_issue = true;
+      else
+       issue_caps(in);
+    }
+  }
+}
+
+
+
+// ==========================================================================
+// local lock
+
+void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
+{
+  dout(7) << "local_wrlock_grab  on " << *lock
+         << " on " << *lock->get_parent() << dendl;  
+  
+  assert(lock->get_parent()->is_auth());
+  assert(lock->can_wrlock());
+  assert(!mut->wrlocks.count(lock));
+  lock->get_wrlock(mut->get_client());
+  mut->wrlocks.insert(lock);
+  mut->locks.insert(lock);
+}
+
+bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
+{
+  dout(7) << "local_wrlock_start  on " << *lock
+         << " on " << *lock->get_parent() << dendl;  
+  
+  assert(lock->get_parent()->is_auth());
+  if (lock->can_wrlock()) {
+    assert(!mut->wrlocks.count(lock));
+    lock->get_wrlock(mut->get_client());
+    mut->wrlocks.insert(lock);
+    mut->locks.insert(lock);
+    return true;
+  } else {
+    lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
+    return false;
+  }
+}
+
+void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
+{
+  dout(7) << "local_wrlock_finish  on " << *lock
+         << " on " << *lock->get_parent() << dendl;  
+  lock->put_wrlock();
+  mut->wrlocks.erase(lock);
+  mut->locks.erase(lock);
+  if (lock->get_num_wrlocks() == 0) {
+    lock->finish_waiters(SimpleLock::WAIT_STABLE |
+                         SimpleLock::WAIT_WR |
+                         SimpleLock::WAIT_RD);
+  }
+}
+
+bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
+{
+  dout(7) << "local_xlock_start  on " << *lock
+         << " on " << *lock->get_parent() << dendl;  
+  
+  assert(lock->get_parent()->is_auth());
+  if (!lock->can_xlock_local()) {
+    lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
+    return false;
+  }
+
+  lock->get_xlock(mut, mut->get_client());
+  mut->xlocks.insert(lock);
+  mut->locks.insert(lock);
+  return true;
+}
+
+void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
+{
+  dout(7) << "local_xlock_finish  on " << *lock
+         << " on " << *lock->get_parent() << dendl;  
+  lock->put_xlock();
+  mut->xlocks.erase(lock);
+  mut->locks.erase(lock);
+
+  lock->finish_waiters(SimpleLock::WAIT_STABLE | 
+                      SimpleLock::WAIT_WR | 
+                      SimpleLock::WAIT_RD);
+}
+
+
+
+// ==========================================================================
+// file lock
+
+
+void Locker::file_eval(ScatterLock *lock, bool *need_issue)
+{
+  CInode *in = static_cast<CInode*>(lock->get_parent());
+  int loner_wanted, other_wanted;
+  int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
+  dout(7) << "file_eval wanted=" << gcap_string(wanted)
+         << " loner_wanted=" << gcap_string(loner_wanted)
+         << " other_wanted=" << gcap_string(other_wanted)
+         << "  filelock=" << *lock << " on " << *lock->get_parent()
+         << dendl;
+
+  assert(lock->get_parent()->is_auth());
+  assert(lock->is_stable());
+
+  if (lock->get_parent()->is_freezing_or_frozen())
+    return;
+
+  if (mdcache->is_readonly()) {
+    if (lock->get_state() != LOCK_SYNC) {
+      dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
+      simple_sync(lock, need_issue);
+    }
+    return;
+  }
+
+  // excl -> *?
+  if (lock->get_state() == LOCK_EXCL) {
+    dout(20) << " is excl" << dendl;
+    int loner_issued, other_issued, xlocker_issued;
+    in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
+    dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
+            << " other_issued=" << gcap_string(other_issued)
+           << " xlocker_issued=" << gcap_string(xlocker_issued)
+           << dendl;
+    if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
+        (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
+       (in->inode.is_dir() && in->multiple_nonstale_caps())) {  // FIXME.. :/
+      dout(20) << " should lose it" << dendl;
+      // we should lose it.
+      //  loner  other   want
+      //  R      R       SYNC
+      //  R      R|W     MIX
+      //  R      W       MIX
+      //  R|W    R       MIX
+      //  R|W    R|W     MIX
+      //  R|W    W       MIX
+      //  W      R       MIX
+      //  W      R|W     MIX
+      //  W      W       MIX
+      // -> any writer means MIX; RD doesn't matter.
+      if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
+         lock->is_waiter_for(SimpleLock::WAIT_WR))
+       scatter_mix(lock, need_issue);
+      else if (!lock->is_wrlocked())   // let excl wrlocks drain first
+       simple_sync(lock, need_issue);
+      else
+       dout(10) << " waiting for wrlock to drain" << dendl;
+    }    
+  }
+
+  // * -> excl?
+  else if (lock->get_state() != LOCK_EXCL &&
+          !lock->is_rdlocked() &&
+          //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
+          ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
+           (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
+          in->get_target_loner() >= 0) {
+    dout(7) << "file_eval stable, bump to loner " << *lock
+           << " on " << *lock->get_parent() << dendl;
+    file_excl(lock, need_issue);
+  }
+
+  // * -> mixed?
+  else if (lock->get_state() != LOCK_MIX &&
+          !lock->is_rdlocked() &&
+          //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
+          (lock->get_scatter_wanted() ||
+           (in->get_wanted_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
+    dout(7) << "file_eval stable, bump to mixed " << *lock
+           << " on " << *lock->get_parent() << dendl;
+    scatter_mix(lock, need_issue);
+  }
+  
+  // * -> sync?
+  else if (lock->get_state() != LOCK_SYNC &&
+          !lock->is_wrlocked() &&   // drain wrlocks first!
+          !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
+          !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
+          !((lock->get_state() == LOCK_MIX) &&
+            in->is_dir() && in->has_subtree_or_exporting_dirfrag())  // if we are a delegation point, stay where we are
+          //((wanted & CEPH_CAP_RD) || 
+          //in->is_replicated() || 
+          //lock->get_num_client_lease() || 
+          //(!loner && lock->get_state() == LOCK_EXCL)) &&
+          ) {
+    dout(7) << "file_eval stable, bump to sync " << *lock 
+           << " on " << *lock->get_parent() << dendl;
+    simple_sync(lock, need_issue);
+  }
+}
+
+
+
+void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
+{
+  dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
+
+  CInode *in = static_cast<CInode*>(lock->get_parent());
+  assert(in->is_auth());
+  assert(lock->is_stable());
+
+  if (lock->get_state() == LOCK_LOCK) {
+    in->start_scatter(lock);
+    if (in->is_replicated()) {
+      // data
+      bufferlist softdata;
+      lock->encode_locked_state(softdata);
+
+      // bcast to replicas
+      send_lock_message(lock, LOCK_AC_MIX, softdata);
+    }
+
+    // change lock
+    lock->set_state(LOCK_MIX);
+    lock->clear_scatter_wanted();
+    if (lock->get_cap_shift()) {
+      if (need_issue)
+       *need_issue = true;
+      else
+       issue_caps(in);
+    }
+  } else {
+    // gather?
+    switch (lock->get_state()) {
+    case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
+    case LOCK_XSYN:
+      file_excl(lock, need_issue);
+      if (lock->get_state() != LOCK_EXCL)
+       return;
+      // fall-thru
+    case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
+    case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
+    default: ceph_abort();
+    }
+
+    int gather = 0;
+    if (lock->is_rdlocked())
+      gather++;
+    if (in->is_replicated()) {
+      if (lock->get_state() != LOCK_EXCL_MIX &&   // EXCL replica is already LOCK
+         lock->get_state() != LOCK_XSYN_EXCL) {  // XSYN replica is already LOCK;  ** FIXME here too!
+       send_lock_message(lock, LOCK_AC_MIX);
+       lock->init_gather();
+       gather++;
+      }
+    }
+    if (lock->is_leased()) {
+      revoke_client_leases(lock);
+      gather++;
+    }
+    if (lock->get_cap_shift() &&
+       in->is_head() &&
+       in->issued_caps_need_gather(lock)) {
+      if (need_issue)
+       *need_issue = true;
+      else
+       issue_caps(in);
+      gather++;
+    }
+    bool need_recover = false;
+    if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
+      mds->mdcache->queue_file_recover(in);
+      need_recover = true;
+      gather++;
+    }
+
+    if (gather) {
+      lock->get_parent()->auth_pin(lock);
+      if (need_recover)
+       mds->mdcache->do_file_recover();
+    } else {
+      in->start_scatter(lock);
+      lock->set_state(LOCK_MIX);
+      lock->clear_scatter_wanted();
+      if (in->is_replicated()) {
+       bufferlist softdata;
+       lock->encode_locked_state(softdata);
+       send_lock_message(lock, LOCK_AC_MIX, softdata);
+      }
+      if (lock->get_cap_shift()) {
+       if (need_issue)
+         *need_issue = true;
+       else
+         issue_caps(in);
+      }
+    }
+  }
+}
+
+
+void Locker::file_excl(ScatterLock *lock, bool *need_issue)
+{
+  CInode *in = static_cast<CInode*>(lock->get_parent());
+  dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;  
+
+  assert(in->is_auth());
+  assert(lock->is_stable());
+
+  assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
+        (lock->get_state() == LOCK_XSYN));  // must do xsyn -> excl -> <anything else>
+  
+  switch (lock->get_state()) {
+  case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
+  case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
+  case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
+  case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
+  default: ceph_abort();
+  }
+  int gather = 0;
+  
+  if (lock->is_rdlocked())
+    gather++;
+  if (lock->is_wrlocked())
+    gather++;
+
+  if (in->is_replicated() &&
+      lock->get_state() != LOCK_LOCK_EXCL &&
+      lock->get_state() != LOCK_XSYN_EXCL) {  // if we were lock, replicas are already lock.
+    send_lock_message(lock, LOCK_AC_LOCK);
+    lock->init_gather();
+    gather++;
+  }
+  if (lock->is_leased()) {
+    revoke_client_leases(lock);
+    gather++;
+  }
+  if (in->is_head() &&
+      in->issued_caps_need_gather(lock)) {
+    if (need_issue)
+      *need_issue = true;
+    else
+      issue_caps(in);
+    gather++;
+  }
+  bool need_recover = false;
+  if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
+    mds->mdcache->queue_file_recover(in);
+    need_recover = true;
+    gather++;
+  }
+  
+  if (gather) {
+    lock->get_parent()->auth_pin(lock);
+    if (need_recover)
+      mds->mdcache->do_file_recover();
+  } else {
+    lock->set_state(LOCK_EXCL);
+    if (need_issue)
+      *need_issue = true;
+    else
+      issue_caps(in);
+  }
+}
+
+void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
+{
+  dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
+  CInode *in = static_cast<CInode *>(lock->get_parent());
+  assert(in->is_auth());
+  assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
+
+  switch (lock->get_state()) {
+  case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
+  default: ceph_abort();
+  }
+  
+  int gather = 0;
+  if (lock->is_wrlocked())
+    gather++;
+
+  if (in->is_head() &&
+      in->issued_caps_need_gather(lock)) {
+    if (need_issue)
+      *need_issue = true;
+    else
+      issue_caps(in);
+    gather++;
+  }
+  
+  if (gather) {
+    lock->get_parent()->auth_pin(lock);
+  } else {
+    lock->set_state(LOCK_XSYN);
+    lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
+    if (need_issue)
+      *need_issue = true;
+    else
+      issue_caps(in);
+  }
+}
+
+void Locker::file_recover(ScatterLock *lock)
+{
+  CInode *in = static_cast<CInode *>(lock->get_parent());
+  dout(7) << "file_recover " << *lock << " on " << *in << dendl;
+
+  assert(in->is_auth());
+  //assert(lock->is_stable());
+  assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
+
+  int gather = 0;
+  
+  /*
+  if (in->is_replicated()
+      lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
+    send_lock_message(lock, LOCK_AC_LOCK);
+    lock->init_gather();
+    gather++;
+  }
+  */
+  if (in->is_head() &&
+      in->issued_caps_need_gather(lock)) {
+    issue_caps(in);
+    gather++;
+  }
+
+  lock->set_state(LOCK_SCAN);
+  if (gather)
+    in->state_set(CInode::STATE_NEEDSRECOVER);
+  else
+    mds->mdcache->queue_file_recover(in);
+}
+
+
+// messenger
+/* This function DOES put the passed message before returning */
+void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
+{
+  CInode *in = static_cast<CInode*>(lock->get_parent());
+  int from = m->get_asker();
+
+  if (mds->is_rejoin()) {
+    if (in->is_rejoining()) {
+      dout(7) << "handle_file_lock still rejoining " << *in
+             << ", dropping " << *m << dendl;
+      m->put();
+      return;
+    }
+  }
+
+  dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
+         << " on " << *lock
+         << " from mds." << from << " " 
+         << *in << dendl;
+
+  bool caps = lock->get_cap_shift();
+  
+  switch (m->get_action()) {
+    // -- replica --
+  case LOCK_AC_SYNC:
+    assert(lock->get_state() == LOCK_LOCK ||
+          lock->get_state() == LOCK_MIX ||
+          lock->get_state() == LOCK_MIX_SYNC2);
+    
+    if (lock->get_state() == LOCK_MIX) {
+      lock->set_state(LOCK_MIX_SYNC);
+      eval_gather(lock, true);
+      if (lock->is_unstable_and_locked())
+       mds->mdlog->flush();
+      break;
+    }
+
+    (static_cast<ScatterLock *>(lock))->finish_flush();
+    (static_cast<ScatterLock *>(lock))->clear_flushed();
+
+    // ok
+    lock->decode_locked_state(m->get_data());
+    lock->set_state(LOCK_SYNC);
+
+    lock->get_rdlock();
+    if (caps)
+      issue_caps(in);
+    lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
+    lock->put_rdlock();
+    break;
+    
+  case LOCK_AC_LOCK:
+    switch (lock->get_state()) {
+    case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
+    case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
+    default: ceph_abort();
+    }
+
+    eval_gather(lock, true);
+    if (lock->is_unstable_and_locked())
+      mds->mdlog->flush();
+
+    break;
+
+  case LOCK_AC_LOCKFLUSHED:
+    (static_cast<ScatterLock *>(lock))->finish_flush();
+    (static_cast<ScatterLock *>(lock))->clear_flushed();
+    // wake up scatter_nudge waiters
+    if (lock->is_stable())
+      lock->finish_waiters(SimpleLock::WAIT_STABLE);
+    break;
+    
+  case LOCK_AC_MIX:
+    assert(lock->get_state() == LOCK_SYNC ||
+           lock->get_state() == LOCK_LOCK ||
+          lock->get_state() == LOCK_SYNC_MIX2);
+    
+    if (lock->get_state() == LOCK_SYNC) {
+      // MIXED
+      lock->set_state(LOCK_SYNC_MIX);
+      eval_gather(lock, true);
+      if (lock->is_unstable_and_locked())
+       mds->mdlog->flush();
+      break;
+    } 
+
+    // ok
+    lock->set_state(LOCK_MIX);
+    lock->decode_locked_state(m->get_data());
+
+    if (caps)
+      issue_caps(in);
+    
+    lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
+    break;
+
+
+    // -- auth --
+  case LOCK_AC_LOCKACK:
+    assert(lock->get_state() == LOCK_SYNC_LOCK ||
+           lock->get_state() == LOCK_MIX_LOCK ||
+           lock->get_state() == LOCK_MIX_LOCK2 ||
+           lock->get_state() == LOCK_MIX_EXCL ||
+           lock->get_state() == LOCK_SYNC_EXCL ||
+           lock->get_state() == LOCK_SYNC_MIX ||
+          lock->get_state() == LOCK_MIX_TSYN);
+    assert(lock->is_gathering(from));
+    lock->remove_gather(from);
+    
+    if (lock->get_state() == LOCK_MIX_LOCK ||
+       lock->get_state() == LOCK_MIX_LOCK2 ||
+       lock->get_state() == LOCK_MIX_EXCL ||
+       lock->get_state() == LOCK_MIX_TSYN) {
+      lock->decode_locked_state(m->get_data());
+      // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
+      // delay calling scatter_writebehind().
+      lock->clear_flushed();
+    }
+
+    if (lock->is_gathering()) {
+      dout(7) << "handle_file_lock " << *in << " from " << from
+             << ", still gathering " << lock->get_gather_set() << dendl;
+    } else {
+      dout(7) << "handle_file_lock " << *in << " from " << from
+             << ", last one" << dendl;
+      eval_gather(lock);
+    }
+    break;
+    
+  case LOCK_AC_SYNCACK:
+    assert(lock->get_state() == LOCK_MIX_SYNC);
+    assert(lock->is_gathering(from));
+    lock->remove_gather(from);
+    
+    lock->decode_locked_state(m->get_data());
+
+    if (lock->is_gathering()) {
+      dout(7) << "handle_file_lock " << *in << " from " << from
+             << ", still gathering " << lock->get_gather_set() << dendl;
+    } else {
+      dout(7) << "handle_file_lock " << *in << " from " << from
+             << ", last one" << dendl;
+      eval_gather(lock);
+    }
+    break;
+
+  case LOCK_AC_MIXACK:
+    assert(lock->get_state() == LOCK_SYNC_MIX);
+    assert(lock->is_gathering(from));
+    lock->remove_gather(from);
+    
+    if (lock->is_gathering()) {
+      dout(7) << "handle_file_lock " << *in << " from " << from
+             << ", still gathering " << lock->get_gather_set() << dendl;
+    } else {
+      dout(7) << "handle_file_lock " << *in << " from " << from
+             << ", last one" << dendl;
+      eval_gather(lock);
+    }
+    break;
+
+
+    // requests....
+  case LOCK_AC_REQSCATTER:
+    if (lock->is_stable()) {
+      /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
+       *  because the replica should be holding an auth_pin if they're
+       *  doing this (and thus, we are freezing, not frozen, and indefinite
+       *  starvation isn't an issue).
+       */
+      dout(7) << "handle_file_lock got scatter request on " << *lock
+             << " on " << *lock->get_parent() << dendl;
+      if (lock->get_state() != LOCK_MIX)  // i.e., the reqscatter didn't race with an actual mix/scatter
+       scatter_mix(lock);
+    } else {
+      dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
+             << " on " << *lock->get_parent() << dendl;
+      lock->set_scatter_wanted();
+    }
+    break;
+
+  case LOCK_AC_REQUNSCATTER:
+    if (lock->is_stable()) {
+      /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
+       *  because the replica should be holding an auth_pin if they're
+       *  doing this (and thus, we are freezing, not frozen, and indefinite
+       *  starvation isn't an issue).
+       */
+      dout(7) << "handle_file_lock got unscatter request on " << *lock
+             << " on " << *lock->get_parent() << dendl;
+      if (lock->get_state() == LOCK_MIX)  // i.e., the reqscatter didn't race with an actual mix/scatter
+       simple_lock(lock);  // FIXME tempsync?
+    } else {
+      dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
+             << " on " << *lock->get_parent() << dendl;
+      lock->set_unscatter_wanted();
+    }
+    break;
+
+  case LOCK_AC_REQRDLOCK:
+    handle_reqrdlock(lock, m);
+    break;
+
+  case LOCK_AC_NUDGE:
+    if (!lock->get_parent()->is_auth()) {
+      dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
+             << " on " << *lock->get_parent() << dendl;
+    } else if (!lock->get_parent()->is_replicated()) {
+      dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
+             << " on " << *lock->get_parent() << dendl;
+    } else {
+      dout(7) << "handle_file_lock trying nudge on " << *lock
+             << " on " << *lock->get_parent() << dendl;
+      scatter_nudge(lock, 0, true);
+      mds->mdlog->flush();
+    }
+    break;
+
+  default:
+    ceph_abort();
+  }  
+  
+  m->put();
+}
+
+
+
+
+
+