Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / Locker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15
16 #include "MDSRank.h"
17 #include "MDCache.h"
18 #include "Locker.h"
19 #include "CInode.h"
20 #include "CDir.h"
21 #include "CDentry.h"
22 #include "Mutation.h"
23 #include "MDSContext.h"
24
25 #include "MDLog.h"
26 #include "MDSMap.h"
27
28 #include "events/EUpdate.h"
29 #include "events/EOpen.h"
30
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
33
34 #include "messages/MInodeFileCaps.h"
35 #include "messages/MLock.h"
36 #include "messages/MClientLease.h"
37 #include "messages/MClientReply.h"
38 #include "messages/MClientCaps.h"
39 #include "messages/MClientCapRelease.h"
40
41 #include "messages/MMDSSlaveRequest.h"
42
43 #include <errno.h>
44
45 #include "common/config.h"
46
47
48 #define dout_subsys ceph_subsys_mds
49 #undef dout_prefix
50 #define dout_context g_ceph_context
51 #define dout_prefix _prefix(_dout, mds)
52 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
53   return *_dout << "mds." << mds->get_nodeid() << ".locker ";
54 }
55
56
57 class LockerContext : public MDSInternalContextBase {
58 protected:
59   Locker *locker;
60   MDSRank *get_mds() override
61   {
62     return locker->mds;
63   }
64
65 public:
66   explicit LockerContext(Locker *locker_) : locker(locker_) {
67     assert(locker != NULL);
68   }
69 };
70
71 class LockerLogContext : public MDSLogContextBase {
72 protected:
73   Locker *locker;
74   MDSRank *get_mds() override
75   {
76     return locker->mds;
77   }
78
79 public:
80   explicit LockerLogContext(Locker *locker_) : locker(locker_) {
81     assert(locker != NULL);
82   }
83 };
84
85 /* This function DOES put the passed message before returning */
86 void Locker::dispatch(Message *m)
87 {
88
89   switch (m->get_type()) {
90
91     // inter-mds locking
92   case MSG_MDS_LOCK:
93     handle_lock(static_cast<MLock*>(m));
94     break;
95     // inter-mds caps
96   case MSG_MDS_INODEFILECAPS:
97     handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
98     break;
99
100     // client sync
101   case CEPH_MSG_CLIENT_CAPS:
102     handle_client_caps(static_cast<MClientCaps*>(m));
103
104     break;
105   case CEPH_MSG_CLIENT_CAPRELEASE:
106     handle_client_cap_release(static_cast<MClientCapRelease*>(m));
107     break;
108   case CEPH_MSG_CLIENT_LEASE:
109     handle_client_lease(static_cast<MClientLease*>(m));
110     break;
111     
112   default:
113     derr << "locker unknown message " << m->get_type() << dendl;
114     assert(0 == "locker unknown message");
115   }
116 }
117
118 void Locker::tick()
119 {
120   scatter_tick();
121   caps_tick();
122 }
123
124 /*
125  * locks vs rejoin
126  *
127  * 
128  *
129  */
130
131 void Locker::send_lock_message(SimpleLock *lock, int msg)
132 {
133   for (const auto &it : lock->get_parent()->get_replicas()) {
134     if (mds->is_cluster_degraded() &&
135         mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
136       continue;
137     MLock *m = new MLock(lock, msg, mds->get_nodeid());
138     mds->send_message_mds(m, it.first);
139   }
140 }
141
142 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
143 {
144   for (const auto &it : lock->get_parent()->get_replicas()) {
145     if (mds->is_cluster_degraded() &&
146         mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
147       continue;
148     MLock *m = new MLock(lock, msg, mds->get_nodeid());
149     m->set_data(data);
150     mds->send_message_mds(m, it.first);
151   }
152 }
153
154
155
156
157 void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
158 {
159   // rdlock ancestor snaps
160   CInode *t = in;
161   rdlocks.insert(&in->snaplock);
162   while (t->get_projected_parent_dn()) {
163     t = t->get_projected_parent_dn()->get_dir()->get_inode();
164     rdlocks.insert(&t->snaplock);
165   }
166 }
167
168 void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
169                                           file_layout_t **layout)
170 {
171   //rdlock ancestor snaps
172   CInode *t = in;
173   rdlocks.insert(&in->snaplock);
174   rdlocks.insert(&in->policylock);
175   bool found_layout = false;
176   while (t) {
177     rdlocks.insert(&t->snaplock);
178     if (!found_layout) {
179       rdlocks.insert(&t->policylock);
180       if (t->get_projected_inode()->has_layout()) {
181         *layout = &t->get_projected_inode()->layout;
182         found_layout = true;
183       }
184     }
185     if (t->get_projected_parent_dn() &&
186         t->get_projected_parent_dn()->get_dir())
187       t = t->get_projected_parent_dn()->get_dir()->get_inode();
188     else t = NULL;
189   }
190 }
191
192 struct MarkEventOnDestruct {
193   MDRequestRef& mdr;
194   const char* message;
195   bool mark_event;
196   MarkEventOnDestruct(MDRequestRef& _mdr,
197                       const char *_message) : mdr(_mdr),
198                           message(_message),
199                           mark_event(true) {}
200   ~MarkEventOnDestruct() {
201     if (mark_event)
202       mdr->mark_event(message);
203   }
204 };
205
206 /* If this function returns false, the mdr has been placed
207  * on the appropriate wait list */
208 bool Locker::acquire_locks(MDRequestRef& mdr,
209                            set<SimpleLock*> &rdlocks,
210                            set<SimpleLock*> &wrlocks,
211                            set<SimpleLock*> &xlocks,
212                            map<SimpleLock*,mds_rank_t> *remote_wrlocks,
213                            CInode *auth_pin_freeze,
214                            bool auth_pin_nonblock)
215 {
216   if (mdr->done_locking &&
217       !mdr->is_slave()) {  // not on slaves!  master requests locks piecemeal.
218     dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;    
219     return true;  // at least we had better be!
220   }
221   dout(10) << "acquire_locks " << *mdr << dendl;
222
223   MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
224
225   client_t client = mdr->get_client();
226
227   set<SimpleLock*, SimpleLock::ptr_lt> sorted;  // sort everything we will lock
228   set<MDSCacheObject*> mustpin;            // items to authpin
229
230   // xlocks
231   for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
232     dout(20) << " must xlock " << **p << " " << *(*p)->get_parent() << dendl;
233     sorted.insert(*p);
234     mustpin.insert((*p)->get_parent());
235
236     // augment xlock with a versionlock?
237     if ((*p)->get_type() == CEPH_LOCK_DN) {
238       CDentry *dn = (CDentry*)(*p)->get_parent();
239       if (!dn->is_auth())
240         continue;
241
242       if (xlocks.count(&dn->versionlock))
243         continue;  // we're xlocking the versionlock too; don't wrlock it!
244
245       if (mdr->is_master()) {
246         // master.  wrlock versionlock so we can pipeline dentry updates to journal.
247         wrlocks.insert(&dn->versionlock);
248       } else {
249         // slave.  exclusively lock the dentry version (i.e. block other journal updates).
250         // this makes rollback safe.
251         xlocks.insert(&dn->versionlock);
252         sorted.insert(&dn->versionlock);
253       }
254     }
255     if ((*p)->get_type() > CEPH_LOCK_IVERSION) {
256       // inode version lock?
257       CInode *in = (CInode*)(*p)->get_parent();
258       if (!in->is_auth())
259         continue;
260       if (mdr->is_master()) {
261         // master.  wrlock versionlock so we can pipeline inode updates to journal.
262         wrlocks.insert(&in->versionlock);
263       } else {
264         // slave.  exclusively lock the inode version (i.e. block other journal updates).
265         // this makes rollback safe.
266         xlocks.insert(&in->versionlock);
267         sorted.insert(&in->versionlock);
268       }
269     }
270   }
271
272   // wrlocks
273   for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
274     MDSCacheObject *object = (*p)->get_parent();
275     dout(20) << " must wrlock " << **p << " " << *object << dendl;
276     sorted.insert(*p);
277     if (object->is_auth())
278       mustpin.insert(object);
279     else if (!object->is_auth() &&
280              !(*p)->can_wrlock(client) &&  // we might have to request a scatter
281              !mdr->is_slave()) {           // if we are slave (remote_wrlock), the master already authpinned
282       dout(15) << " will also auth_pin " << *object
283                << " in case we need to request a scatter" << dendl;
284       mustpin.insert(object);
285     }
286   }
287
288   // remote_wrlocks
289   if (remote_wrlocks) {
290     for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
291       MDSCacheObject *object = p->first->get_parent();
292       dout(20) << " must remote_wrlock on mds." << p->second << " "
293                << *p->first << " " << *object << dendl;
294       sorted.insert(p->first);
295       mustpin.insert(object);
296     }
297   }
298
299   // rdlocks
300   for (set<SimpleLock*>::iterator p = rdlocks.begin();
301          p != rdlocks.end();
302        ++p) {
303     MDSCacheObject *object = (*p)->get_parent();
304     dout(20) << " must rdlock " << **p << " " << *object << dendl;
305     sorted.insert(*p);
306     if (object->is_auth())
307       mustpin.insert(object);
308     else if (!object->is_auth() &&
309              !(*p)->can_rdlock(client)) {      // we might have to request an rdlock
310       dout(15) << " will also auth_pin " << *object
311                << " in case we need to request a rdlock" << dendl;
312       mustpin.insert(object);
313     }
314   }
315
316  
317   // AUTH PINS
318   map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote;  // mds -> (object set)
319   
320   // can i auth pin them all now?
321   marker.message = "failed to authpin local pins";
322   for (set<MDSCacheObject*>::iterator p = mustpin.begin();
323        p != mustpin.end();
324        ++p) {
325     MDSCacheObject *object = *p;
326
327     dout(10) << " must authpin " << *object << dendl;
328
329     if (mdr->is_auth_pinned(object)) {
330       if (object != (MDSCacheObject*)auth_pin_freeze)
331         continue;
332       if (mdr->more()->is_remote_frozen_authpin) {
333         if (mdr->more()->rename_inode == auth_pin_freeze)
334           continue;
335         // unfreeze auth pin for the wrong inode
336         mustpin_remote[mdr->more()->rename_inode->authority().first].size();
337       }
338     }
339     
340     if (!object->is_auth()) {
341       if (!mdr->locks.empty())
342         drop_locks(mdr.get());
343       if (object->is_ambiguous_auth()) {
344         // wait
345         dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
346         object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
347         mdr->drop_local_auth_pins();
348         return false;
349       }
350       mustpin_remote[object->authority().first].insert(object);
351       continue;
352     }
353     if (!object->can_auth_pin()) {
354       // wait
355       drop_locks(mdr.get());
356       mdr->drop_local_auth_pins();
357       if (auth_pin_nonblock) {
358         dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
359         mdr->aborted = true;
360         return false;
361       }
362       dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
363       object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
364
365       if (!mdr->remote_auth_pins.empty())
366         notify_freeze_waiter(object);
367
368       return false;
369     }
370   }
371
372   // ok, grab local auth pins
373   for (set<MDSCacheObject*>::iterator p = mustpin.begin();
374        p != mustpin.end();
375        ++p) {
376     MDSCacheObject *object = *p;
377     if (mdr->is_auth_pinned(object)) {
378       dout(10) << " already auth_pinned " << *object << dendl;
379     } else if (object->is_auth()) {
380       dout(10) << " auth_pinning " << *object << dendl;
381       mdr->auth_pin(object);
382     }
383   }
384
385   // request remote auth_pins
386   if (!mustpin_remote.empty()) {
387     marker.message = "requesting remote authpins";
388     for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
389          p != mdr->remote_auth_pins.end();
390          ++p) {
391       if (mustpin.count(p->first)) {
392         assert(p->second == p->first->authority().first);
393         map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
394         if (q != mustpin_remote.end())
395           q->second.insert(p->first);
396       }
397     }
398     for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
399          p != mustpin_remote.end();
400          ++p) {
401       dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
402
403       // wait for active auth
404       if (mds->is_cluster_degraded() &&
405           !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
406         dout(10) << " mds." << p->first << " is not active" << dendl;
407         if (mdr->more()->waiting_on_slave.empty())
408           mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
409         return false;
410       }
411       
412       MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
413                                                    MMDSSlaveRequest::OP_AUTHPIN);
414       for (set<MDSCacheObject*>::iterator q = p->second.begin();
415            q != p->second.end();
416            ++q) {
417         dout(10) << " req remote auth_pin of " << **q << dendl;
418         MDSCacheObjectInfo info;
419         (*q)->set_object_info(info);
420         req->get_authpins().push_back(info);
421         if (*q == auth_pin_freeze)
422           (*q)->set_object_info(req->get_authpin_freeze());
423         mdr->pin(*q);
424       }
425       if (auth_pin_nonblock)
426         req->mark_nonblock();
427       mds->send_message_mds(req, p->first);
428
429       // put in waiting list
430       assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
431       mdr->more()->waiting_on_slave.insert(p->first);
432     }
433     return false;
434   }
435
436   // caps i'll need to issue
437   set<CInode*> issue_set;
438   bool result = false;
439
440   // acquire locks.
441   // make sure they match currently acquired locks.
442   set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
443   for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
444        p != sorted.end();
445        ++p) {
446     bool need_wrlock = !!wrlocks.count(*p);
447     bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
448
449     // already locked?
450     if (existing != mdr->locks.end() && *existing == *p) {
451       // right kind?
452       SimpleLock *have = *existing;
453       ++existing;
454       if (xlocks.count(have) && mdr->xlocks.count(have)) {
455         dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
456         continue;
457       }
458       if (mdr->remote_wrlocks.count(have)) {
459         if (!need_remote_wrlock ||
460             mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
461           dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
462                    << " " << *have << " " << *have->get_parent() << dendl;
463           remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
464         }
465       }
466       if (need_wrlock || need_remote_wrlock) {
467         if (need_wrlock == !!mdr->wrlocks.count(have) &&
468             need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
469           if (need_wrlock)
470             dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
471           if (need_remote_wrlock)
472             dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
473           continue;
474         }
475       }
476       if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
477         dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
478         continue;
479       }
480     }
481     
482     // hose any stray locks
483     if (existing != mdr->locks.end() && *existing == *p) {
484       assert(need_wrlock || need_remote_wrlock);
485       SimpleLock *lock = *existing;
486       if (mdr->wrlocks.count(lock)) {
487         if (!need_wrlock)
488           dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
489         else if (need_remote_wrlock) // acquire remote_wrlock first
490           dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
491         bool need_issue = false;
492         wrlock_finish(lock, mdr.get(), &need_issue);
493         if (need_issue)
494           issue_set.insert(static_cast<CInode*>(lock->get_parent()));
495       }
496       ++existing;
497     }
498     while (existing != mdr->locks.end()) {
499       SimpleLock *stray = *existing;
500       ++existing;
501       dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
502       bool need_issue = false;
503       if (mdr->xlocks.count(stray)) {
504         xlock_finish(stray, mdr.get(), &need_issue);
505       } else if (mdr->rdlocks.count(stray)) {
506         rdlock_finish(stray, mdr.get(), &need_issue);
507       } else {
508         // may have acquired both wrlock and remore wrlock
509         if (mdr->wrlocks.count(stray))
510           wrlock_finish(stray, mdr.get(), &need_issue);
511         if (mdr->remote_wrlocks.count(stray))
512           remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
513       }
514       if (need_issue)
515         issue_set.insert(static_cast<CInode*>(stray->get_parent()));
516     }
517
518     // lock
519     if (mdr->locking && *p != mdr->locking) {
520       cancel_locking(mdr.get(), &issue_set);
521     }
522     if (xlocks.count(*p)) {
523       marker.message = "failed to xlock, waiting";
524       if (!xlock_start(*p, mdr)) 
525         goto out;
526       dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
527     } else if (need_wrlock || need_remote_wrlock) {
528       if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
529         marker.message = "waiting for remote wrlocks";
530         remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
531         goto out;
532       }
533       if (need_wrlock && !mdr->wrlocks.count(*p)) {
534         marker.message = "failed to wrlock, waiting";
535         if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
536           marker.message = "failed to wrlock, dropping remote wrlock and waiting";
537           // can't take the wrlock because the scatter lock is gathering. need to
538           // release the remote wrlock, so that the gathering process can finish.
539           remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
540           remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
541           goto out;
542         }
543         // nowait if we have already gotten remote wrlock
544         if (!wrlock_start(*p, mdr, need_remote_wrlock))
545           goto out;
546         dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
547       }
548     } else {
549       assert(mdr->is_master());
550       if ((*p)->is_scatterlock()) {
551         ScatterLock *slock = static_cast<ScatterLock *>(*p);
552         if (slock->is_rejoin_mix()) {
553           // If there is a recovering mds who replcated an object when it failed
554           // and scatterlock in the object was in MIX state, It's possible that
555           // the recovering mds needs to take wrlock on the scatterlock when it
556           // replays unsafe requests. So this mds should delay taking rdlock on
557           // the scatterlock until the recovering mds finishes replaying unsafe.
558           // Otherwise unsafe requests may get replayed after current request.
559           //
560           // For example:
561           // The recovering mds is auth mds of a dirfrag, this mds is auth mds
562           // of correspinding inode. when 'rm -rf' the direcotry, this mds should
563           // delay the rmdir request until the recovering mds has replayed unlink
564           // requests.
565           if (mds->is_cluster_degraded()) {
566             if (!mdr->is_replay()) {
567               drop_locks(mdr.get());
568               mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
569               dout(10) << " rejoin mix scatterlock " << *slock << " " << *(*p)->get_parent()
570                        << ", waiting for cluster recovered" << dendl;
571               marker.message = "rejoin mix scatterlock, waiting for cluster recovered";
572               return false;
573             }
574           } else {
575             slock->clear_rejoin_mix();
576           }
577         }
578       }
579
580       marker.message = "failed to rdlock, waiting";
581       if (!rdlock_start(*p, mdr)) 
582         goto out;
583       dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
584     }
585   }
586     
587   // any extra unneeded locks?
588   while (existing != mdr->locks.end()) {
589     SimpleLock *stray = *existing;
590     ++existing;
591     dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
592     bool need_issue = false;
593     if (mdr->xlocks.count(stray)) {
594       xlock_finish(stray, mdr.get(), &need_issue);
595     } else if (mdr->rdlocks.count(stray)) {
596       rdlock_finish(stray, mdr.get(), &need_issue);
597     } else {
598       // may have acquired both wrlock and remore wrlock
599       if (mdr->wrlocks.count(stray))
600         wrlock_finish(stray, mdr.get(), &need_issue);
601       if (mdr->remote_wrlocks.count(stray))
602         remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
603     }
604     if (need_issue)
605       issue_set.insert(static_cast<CInode*>(stray->get_parent()));
606   }
607
608   mdr->done_locking = true;
609   mdr->set_mds_stamp(ceph_clock_now());
610   result = true;
611   marker.message = "acquired locks";
612
613  out:
614   issue_caps_set(issue_set);
615   return result;
616 }
617
618 void Locker::notify_freeze_waiter(MDSCacheObject *o)
619 {
620   CDir *dir = NULL;
621   if (CInode *in = dynamic_cast<CInode*>(o)) {
622     if (!in->is_root())
623       dir = in->get_parent_dir();
624   } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
625     dir = dn->get_dir();
626   } else {
627     dir = dynamic_cast<CDir*>(o);
628     assert(dir);
629   }
630   if (dir) {
631     if (dir->is_freezing_dir())
632       mdcache->fragment_freeze_inc_num_waiters(dir);
633     if (dir->is_freezing_tree()) {
634       while (!dir->is_freezing_tree_root())
635         dir = dir->get_parent_dir();
636       mdcache->migrator->export_freeze_inc_num_waiters(dir);
637     }
638   }
639 }
640
641 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
642 {
643   for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
644        p != mut->xlocks.end();
645        ++p) {
646     MDSCacheObject *object = (*p)->get_parent();
647     assert(object->is_auth());
648     if (skip_dentry &&
649         ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
650       continue;
651     dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
652     (*p)->set_xlock_done();
653   }
654 }
655
656 void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
657 {
658   while (!mut->rdlocks.empty()) {
659     bool ni = false;
660     MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
661     rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
662     if (ni)
663       pneed_issue->insert(static_cast<CInode*>(p));
664   }
665 }
666
667 void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
668 {
669   set<mds_rank_t> slaves;
670
671   while (!mut->xlocks.empty()) {
672     SimpleLock *lock = *mut->xlocks.begin();
673     MDSCacheObject *p = lock->get_parent();
674     if (!p->is_auth()) {
675       assert(lock->get_sm()->can_remote_xlock);
676       slaves.insert(p->authority().first);
677       lock->put_xlock();
678       mut->locks.erase(lock);
679       mut->xlocks.erase(lock);
680       continue;
681     }
682     bool ni = false;
683     xlock_finish(lock, mut, &ni);
684     if (ni)
685       pneed_issue->insert(static_cast<CInode*>(p));
686   }
687
688   while (!mut->remote_wrlocks.empty()) {
689     map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
690     slaves.insert(p->second);
691     if (mut->wrlocks.count(p->first) == 0)
692       mut->locks.erase(p->first);
693     mut->remote_wrlocks.erase(p);
694   }
695
696   while (!mut->wrlocks.empty()) {
697     bool ni = false;
698     MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
699     wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
700     if (ni)
701       pneed_issue->insert(static_cast<CInode*>(p));
702   }
703
704   for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
705     if (!mds->is_cluster_degraded() ||
706         mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
707       dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
708       MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
709                                                         MMDSSlaveRequest::OP_DROPLOCKS);
710       mds->send_message_mds(slavereq, *p);
711     }
712   }
713 }
714
715 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
716 {
717   SimpleLock *lock = mut->locking;
718   assert(lock);
719   dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
720
721   if (lock->get_parent()->is_auth()) {
722     bool need_issue = false;
723     if (lock->get_state() == LOCK_PREXLOCK) {
724       _finish_xlock(lock, -1, &need_issue);
725     } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
726                lock->get_num_xlocks() == 0) {
727       lock->set_state(LOCK_XLOCKDONE);
728       eval_gather(lock, true, &need_issue);
729     }
730     if (need_issue)
731       pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
732   }
733   mut->finish_locking(lock);
734 }
735
736 void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
737 {
738   // leftover locks
739   set<CInode*> my_need_issue;
740   if (!pneed_issue)
741     pneed_issue = &my_need_issue;
742
743   if (mut->locking)
744     cancel_locking(mut, pneed_issue);
745   _drop_non_rdlocks(mut, pneed_issue);
746   _drop_rdlocks(mut, pneed_issue);
747
748   if (pneed_issue == &my_need_issue)
749     issue_caps_set(*pneed_issue);
750   mut->done_locking = false;
751 }
752
753 void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
754 {
755   set<CInode*> my_need_issue;
756   if (!pneed_issue)
757     pneed_issue = &my_need_issue;
758
759   _drop_non_rdlocks(mut, pneed_issue);
760
761   if (pneed_issue == &my_need_issue)
762     issue_caps_set(*pneed_issue);
763 }
764
765 void Locker::drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
766 {
767   set<CInode*> my_need_issue;
768   if (!pneed_issue)
769     pneed_issue = &my_need_issue;
770
771   _drop_rdlocks(mut, pneed_issue);
772
773   if (pneed_issue == &my_need_issue)
774     issue_caps_set(*pneed_issue);
775 }
776
777
778 // generics
779
780 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
781 {
782   dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
783   assert(!lock->is_stable());
784
785   int next = lock->get_next_state();
786
787   CInode *in = 0;
788   bool caps = lock->get_cap_shift();
789   if (lock->get_type() != CEPH_LOCK_DN)
790     in = static_cast<CInode *>(lock->get_parent());
791
792   bool need_issue = false;
793
794   int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
795   assert(!caps || in != NULL);
796   if (caps && in->is_head()) {
797     in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
798                         lock->get_cap_shift(), lock->get_cap_mask());
799     dout(10) << " next state is " << lock->get_state_name(next) 
800              << " issued/allows loner " << gcap_string(loner_issued)
801              << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
802              << " xlocker " << gcap_string(xlocker_issued)
803              << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
804              << " other " << gcap_string(other_issued)
805              << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
806              << dendl;
807
808     if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
809                   (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
810                   (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
811       need_issue = true;
812   }
813
814 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
815   bool auth = lock->get_parent()->is_auth();
816   if (!lock->is_gathering() &&
817       (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
818       (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
819       (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
820       (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
821       !(lock->get_parent()->is_auth() && lock->is_flushing()) &&  // i.e. wait for scatter_writebehind!
822       (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
823                  (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
824                  (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
825       lock->get_state() != LOCK_SYNC_MIX2 &&  // these states need an explicit trigger from the auth mds
826       lock->get_state() != LOCK_MIX_SYNC2
827       ) {
828     dout(7) << "eval_gather finished gather on " << *lock
829             << " on " << *lock->get_parent() << dendl;
830
831     if (lock->get_sm() == &sm_filelock) {
832       assert(in);
833       if (in->state_test(CInode::STATE_RECOVERING)) {
834         dout(7) << "eval_gather finished gather, but still recovering" << dendl;
835         return;
836       } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
837         dout(7) << "eval_gather finished gather, but need to recover" << dendl;
838         mds->mdcache->queue_file_recover(in);
839         mds->mdcache->do_file_recover();
840         return;
841       }
842     }
843
844     if (!lock->get_parent()->is_auth()) {
845       // replica: tell auth
846       mds_rank_t auth = lock->get_parent()->authority().first;
847
848       if (lock->get_parent()->is_rejoining() &&
849           mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
850         dout(7) << "eval_gather finished gather, but still rejoining "
851                 << *lock->get_parent() << dendl;
852         return;
853       }
854
855       if (!mds->is_cluster_degraded() ||
856           mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
857         switch (lock->get_state()) {
858         case LOCK_SYNC_LOCK:
859           mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
860                                 auth);
861           break;
862
863         case LOCK_MIX_SYNC:
864           {
865             MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
866             lock->encode_locked_state(reply->get_data());
867             mds->send_message_mds(reply, auth);
868             next = LOCK_MIX_SYNC2;
869             (static_cast<ScatterLock *>(lock))->start_flush();
870           }
871           break;
872
873         case LOCK_MIX_SYNC2:
874           (static_cast<ScatterLock *>(lock))->finish_flush();
875           (static_cast<ScatterLock *>(lock))->clear_flushed();
876
877         case LOCK_SYNC_MIX2:
878           // do nothing, we already acked
879           break;
880           
881         case LOCK_SYNC_MIX:
882           { 
883             MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
884             mds->send_message_mds(reply, auth);
885             next = LOCK_SYNC_MIX2;
886           }
887           break;
888
889         case LOCK_MIX_LOCK:
890           {
891             bufferlist data;
892             lock->encode_locked_state(data);
893             mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
894             (static_cast<ScatterLock *>(lock))->start_flush();
895             // we'll get an AC_LOCKFLUSHED to complete
896           }
897           break;
898
899         default:
900           ceph_abort();
901         }
902       }
903     } else {
904       // auth
905
906       // once the first (local) stage of mix->lock gather complete we can
907       // gather from replicas
908       if (lock->get_state() == LOCK_MIX_LOCK &&
909           lock->get_parent()->is_replicated()) {
910         dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
911         send_lock_message(lock, LOCK_AC_LOCK);
912         lock->init_gather();
913         lock->set_state(LOCK_MIX_LOCK2);
914         return;
915       }
916
917       if (lock->is_dirty() && !lock->is_flushed()) {
918         scatter_writebehind(static_cast<ScatterLock *>(lock));
919         mds->mdlog->flush();
920         return;
921       }
922       lock->clear_flushed();
923       
924       switch (lock->get_state()) {
925         // to mixed
926       case LOCK_TSYN_MIX:
927       case LOCK_SYNC_MIX:
928       case LOCK_EXCL_MIX:
929         in->start_scatter(static_cast<ScatterLock *>(lock));
930         if (lock->get_parent()->is_replicated()) {
931           bufferlist softdata;
932           lock->encode_locked_state(softdata);
933           send_lock_message(lock, LOCK_AC_MIX, softdata);
934         }
935         (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
936         break;
937
938       case LOCK_XLOCK:
939       case LOCK_XLOCKDONE:
940         if (next != LOCK_SYNC)
941           break;
942         // fall-thru
943
944         // to sync
945       case LOCK_EXCL_SYNC:
946       case LOCK_LOCK_SYNC:
947       case LOCK_MIX_SYNC:
948       case LOCK_XSYN_SYNC:
949         if (lock->get_parent()->is_replicated()) {
950           bufferlist softdata;
951           lock->encode_locked_state(softdata);
952           send_lock_message(lock, LOCK_AC_SYNC, softdata);
953         }
954         break;
955       }
956
957     }
958
959     lock->set_state(next);
960     
961     if (lock->get_parent()->is_auth() &&
962         lock->is_stable())
963       lock->get_parent()->auth_unpin(lock);
964
965     // drop loner before doing waiters
966     if (caps &&
967         in->is_head() &&
968         in->is_auth() &&
969         in->get_wanted_loner() != in->get_loner()) {
970       dout(10) << "  trying to drop loner" << dendl;
971       if (in->try_drop_loner()) {
972         dout(10) << "  dropped loner" << dendl;
973         need_issue = true;
974       }
975     }
976
977     if (pfinishers)
978       lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
979                          *pfinishers);
980     else
981       lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
982     
983     if (caps && in->is_head())
984       need_issue = true;
985
986     if (lock->get_parent()->is_auth() &&
987         lock->is_stable())
988       try_eval(lock, &need_issue);
989   }
990
991   if (need_issue) {
992     if (pneed_issue)
993       *pneed_issue = true;
994     else if (in->is_head())
995       issue_caps(in);
996   }
997
998 }
999
1000 bool Locker::eval(CInode *in, int mask, bool caps_imported)
1001 {
1002   bool need_issue = caps_imported;
1003   list<MDSInternalContextBase*> finishers;
1004   
1005   dout(10) << "eval " << mask << " " << *in << dendl;
1006
1007   // choose loner?
1008   if (in->is_auth() && in->is_head()) {
1009     if (in->choose_ideal_loner() >= 0) {
1010       if (in->try_set_loner()) {
1011         dout(10) << "eval set loner to client." << in->get_loner() << dendl;
1012         need_issue = true;
1013         mask = -1;
1014       } else
1015         dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1016     } else
1017       dout(10) << "eval doesn't want loner" << dendl;
1018   }
1019
1020  retry:
1021   if (mask & CEPH_LOCK_IFILE)
1022     eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1023   if (mask & CEPH_LOCK_IAUTH)
1024     eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1025   if (mask & CEPH_LOCK_ILINK)
1026     eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1027   if (mask & CEPH_LOCK_IXATTR)
1028     eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1029   if (mask & CEPH_LOCK_INEST)
1030     eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1031   if (mask & CEPH_LOCK_IFLOCK)
1032     eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1033   if (mask & CEPH_LOCK_IPOLICY)
1034     eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1035
1036   // drop loner?
1037   if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1038     dout(10) << "  trying to drop loner" << dendl;
1039     if (in->try_drop_loner()) {
1040       dout(10) << "  dropped loner" << dendl;
1041       need_issue = true;
1042
1043       if (in->get_wanted_loner() >= 0) {
1044         if (in->try_set_loner()) {
1045           dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1046           mask = -1;
1047           goto retry;
1048         } else {
1049           dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1050         }
1051       }
1052     }
1053   }
1054
1055   finish_contexts(g_ceph_context, finishers);
1056
1057   if (need_issue && in->is_head())
1058     issue_caps(in);
1059
1060   dout(10) << "eval done" << dendl;
1061   return need_issue;
1062 }
1063
1064 class C_Locker_Eval : public LockerContext {
1065   MDSCacheObject *p;
1066   int mask;
1067 public:
1068   C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1069     // We are used as an MDSCacheObject waiter, so should
1070     // only be invoked by someone already holding the big lock.
1071     assert(locker->mds->mds_lock.is_locked_by_me());
1072     p->get(MDSCacheObject::PIN_PTRWAITER);    
1073   }
1074   void finish(int r) override {
1075     locker->try_eval(p, mask);
1076     p->put(MDSCacheObject::PIN_PTRWAITER);
1077   }
1078 };
1079
1080 void Locker::try_eval(MDSCacheObject *p, int mask)
1081 {
1082   // unstable and ambiguous auth?
1083   if (p->is_ambiguous_auth()) {
1084     dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1085     p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1086     return;
1087   }
1088
1089   if (p->is_auth() && p->is_frozen()) {
1090     dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1091     p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1092     return;
1093   }
1094
1095   if (mask & CEPH_LOCK_DN) {
1096     assert(mask == CEPH_LOCK_DN);
1097     bool need_issue = false;  // ignore this, no caps on dentries
1098     CDentry *dn = static_cast<CDentry *>(p);
1099     eval_any(&dn->lock, &need_issue);
1100   } else {
1101     CInode *in = static_cast<CInode *>(p);
1102     eval(in, mask);
1103   }
1104 }
1105
1106 void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1107 {
1108   MDSCacheObject *p = lock->get_parent();
1109
1110   // unstable and ambiguous auth?
1111   if (p->is_ambiguous_auth()) {
1112     dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1113     p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1114     return;
1115   }
1116   
1117   if (!p->is_auth()) {
1118     dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1119     return;
1120   }
1121
1122   if (p->is_frozen()) {
1123     dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1124     p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1125     return;
1126   }
1127
1128   /*
1129    * We could have a situation like:
1130    *
1131    * - mds A authpins item on mds B
1132    * - mds B starts to freeze tree containing item
1133    * - mds A tries wrlock_start on A, sends REQSCATTER to B
1134    * - mds B lock is unstable, sets scatter_wanted
1135    * - mds B lock stabilizes, calls try_eval.
1136    *
1137    * We can defer while freezing without causing a deadlock.  Honor
1138    * scatter_wanted flag here.  This will never get deferred by the
1139    * checks above due to the auth_pin held by the master.
1140    */
1141   if (lock->is_scatterlock()) {
1142     ScatterLock *slock = static_cast<ScatterLock *>(lock);
1143     if (slock->get_scatter_wanted() &&
1144         slock->get_state() != LOCK_MIX) {
1145       scatter_mix(slock, pneed_issue);
1146       if (!lock->is_stable())
1147         return;
1148     } else if (slock->get_unscatter_wanted() &&
1149         slock->get_state() != LOCK_LOCK) {
1150       simple_lock(slock, pneed_issue);
1151       if (!lock->is_stable()) {
1152         return;
1153       }
1154     }
1155   }
1156
1157   if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
1158     dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1159     p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1160     return;
1161   }
1162
1163   eval(lock, pneed_issue);
1164 }
1165
1166 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1167 {
1168   bool need_issue = false;
1169   list<MDSInternalContextBase*> finishers;
1170
1171   // kick locks now
1172   if (!in->filelock.is_stable())
1173     eval_gather(&in->filelock, false, &need_issue, &finishers);
1174   if (!in->authlock.is_stable())
1175     eval_gather(&in->authlock, false, &need_issue, &finishers);
1176   if (!in->linklock.is_stable())
1177     eval_gather(&in->linklock, false, &need_issue, &finishers);
1178   if (!in->xattrlock.is_stable())
1179     eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1180
1181   if (need_issue && in->is_head()) {
1182     if (issue_set)
1183       issue_set->insert(in);
1184     else
1185       issue_caps(in);
1186   }
1187
1188   finish_contexts(g_ceph_context, finishers);
1189 }
1190
1191 void Locker::eval_scatter_gathers(CInode *in)
1192 {
1193   bool need_issue = false;
1194   list<MDSInternalContextBase*> finishers;
1195
1196   dout(10) << "eval_scatter_gathers " << *in << dendl;
1197
1198   // kick locks now
1199   if (!in->filelock.is_stable())
1200     eval_gather(&in->filelock, false, &need_issue, &finishers);
1201   if (!in->nestlock.is_stable())
1202     eval_gather(&in->nestlock, false, &need_issue, &finishers);
1203   if (!in->dirfragtreelock.is_stable())
1204     eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1205   
1206   if (need_issue && in->is_head())
1207     issue_caps(in);
1208   
1209   finish_contexts(g_ceph_context, finishers);
1210 }
1211
1212 void Locker::eval(SimpleLock *lock, bool *need_issue)
1213 {
1214   switch (lock->get_type()) {
1215   case CEPH_LOCK_IFILE:
1216     return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1217   case CEPH_LOCK_IDFT:
1218   case CEPH_LOCK_INEST:
1219     return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1220   default:
1221     return simple_eval(lock, need_issue);
1222   }
1223 }
1224
1225
1226 // ------------------
1227 // rdlock
1228
1229 bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1230 {
1231   // kick the lock
1232   if (lock->is_stable()) {
1233     if (lock->get_parent()->is_auth()) {
1234       if (lock->get_sm() == &sm_scatterlock) {
1235         // not until tempsync is fully implemented
1236         //if (lock->get_parent()->is_replicated())
1237         //scatter_tempsync((ScatterLock*)lock);
1238         //else
1239         simple_sync(lock);
1240       } else if (lock->get_sm() == &sm_filelock) {
1241         CInode *in = static_cast<CInode*>(lock->get_parent());
1242         if (lock->get_state() == LOCK_EXCL &&
1243             in->get_target_loner() >= 0 &&
1244             !in->is_dir() && !as_anon)   // as_anon => caller wants SYNC, not XSYN
1245           file_xsyn(lock);
1246         else
1247           simple_sync(lock);
1248       } else
1249         simple_sync(lock);
1250       return true;
1251     } else {
1252       // request rdlock state change from auth
1253       mds_rank_t auth = lock->get_parent()->authority().first;
1254       if (!mds->is_cluster_degraded() ||
1255           mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1256         dout(10) << "requesting rdlock from auth on "
1257                  << *lock << " on " << *lock->get_parent() << dendl;
1258         mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1259       }
1260       return false;
1261     }
1262   }
1263   if (lock->get_type() == CEPH_LOCK_IFILE) {
1264     CInode *in = static_cast<CInode *>(lock->get_parent());
1265     if (in->state_test(CInode::STATE_RECOVERING)) {
1266       mds->mdcache->recovery_queue.prioritize(in);
1267     }
1268   }
1269
1270   return false;
1271 }
1272
1273 bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
1274 {
1275   dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;  
1276
1277   // can read?  grab ref.
1278   if (lock->can_rdlock(client)) 
1279     return true;
1280   
1281   _rdlock_kick(lock, false);
1282
1283   if (lock->can_rdlock(client)) 
1284     return true;
1285
1286   // wait!
1287   if (con) {
1288     dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1289     lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1290   }
1291   return false;
1292 }
1293
1294 bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1295 {
1296   dout(7) << "rdlock_start  on " << *lock << " on " << *lock->get_parent() << dendl;  
1297
1298   // client may be allowed to rdlock the same item it has xlocked.
1299   //  UNLESS someone passes in as_anon, or we're reading snapped version here.
1300   if (mut->snapid != CEPH_NOSNAP)
1301     as_anon = true;
1302   client_t client = as_anon ? -1 : mut->get_client();
1303
1304   CInode *in = 0;
1305   if (lock->get_type() != CEPH_LOCK_DN)
1306     in = static_cast<CInode *>(lock->get_parent());
1307
1308   /*
1309   if (!lock->get_parent()->is_auth() &&
1310       lock->fw_rdlock_to_auth()) {
1311     mdcache->request_forward(mut, lock->get_parent()->authority().first);
1312     return false;
1313   }
1314   */
1315
1316   while (1) {
1317     // can read?  grab ref.
1318     if (lock->can_rdlock(client)) {
1319       lock->get_rdlock();
1320       mut->rdlocks.insert(lock);
1321       mut->locks.insert(lock);
1322       return true;
1323     }
1324
1325     // hmm, wait a second.
1326     if (in && !in->is_head() && in->is_auth() &&
1327         lock->get_state() == LOCK_SNAP_SYNC) {
1328       // okay, we actually need to kick the head's lock to get ourselves synced up.
1329       CInode *head = mdcache->get_inode(in->ino());
1330       assert(head);
1331       SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1332       if (hlock->get_state() == LOCK_SYNC)
1333         hlock = head->get_lock(lock->get_type());
1334
1335       if (hlock->get_state() != LOCK_SYNC) {
1336         dout(10) << "rdlock_start trying head inode " << *head << dendl;
1337         if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
1338           return false;
1339         // oh, check our lock again then
1340       }
1341     }
1342
1343     if (!_rdlock_kick(lock, as_anon))
1344       break;
1345   }
1346
1347   // wait!
1348   int wait_on;
1349   if (lock->get_parent()->is_auth() && lock->is_stable())
1350     wait_on = SimpleLock::WAIT_RD;
1351   else
1352     wait_on = SimpleLock::WAIT_STABLE;  // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1353   dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1354   lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1355   nudge_log(lock);
1356   return false;
1357 }
1358
1359 void Locker::nudge_log(SimpleLock *lock)
1360 {
1361   dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1362   if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked())    // as with xlockdone, or cap flush
1363     mds->mdlog->flush();
1364 }
1365
1366 void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1367 {
1368   // drop ref
1369   lock->put_rdlock();
1370   if (mut) {
1371     mut->rdlocks.erase(lock);
1372     mut->locks.erase(lock);
1373   }
1374
1375   dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1376   
1377   // last one?
1378   if (!lock->is_rdlocked()) {
1379     if (!lock->is_stable())
1380       eval_gather(lock, false, pneed_issue);
1381     else if (lock->get_parent()->is_auth())
1382       try_eval(lock, pneed_issue);
1383   }
1384 }
1385
1386
1387 bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
1388 {
1389   dout(10) << "can_rdlock_set " << locks << dendl;
1390   for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1391     if (!(*p)->can_rdlock(-1)) {
1392       dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1393       return false;
1394     }
1395   return true;
1396 }
1397
1398 bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
1399 {
1400   dout(10) << "rdlock_try_set " << locks << dendl;
1401   for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1402     if (!rdlock_try(*p, -1, NULL)) {
1403       dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1404       return false;
1405     }
1406   return true;
1407 }
1408
1409 void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
1410 {
1411   dout(10) << "rdlock_take_set " << locks << dendl;
1412   for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
1413     (*p)->get_rdlock();
1414     mut->rdlocks.insert(*p);
1415     mut->locks.insert(*p);
1416   }
1417 }
1418
1419 // ------------------
1420 // wrlock
1421
1422 void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1423 {
1424   if (lock->get_type() == CEPH_LOCK_IVERSION ||
1425       lock->get_type() == CEPH_LOCK_DVERSION)
1426     return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1427
1428   dout(7) << "wrlock_force  on " << *lock
1429           << " on " << *lock->get_parent() << dendl;  
1430   lock->get_wrlock(true);
1431   mut->wrlocks.insert(lock);
1432   mut->locks.insert(lock);
1433 }
1434
1435 bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1436 {
1437   if (lock->get_type() == CEPH_LOCK_IVERSION ||
1438       lock->get_type() == CEPH_LOCK_DVERSION)
1439     return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1440
1441   dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1442
1443   CInode *in = static_cast<CInode *>(lock->get_parent());
1444   client_t client = mut->get_client();
1445   bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1446                       (in->has_subtree_or_exporting_dirfrag() ||
1447                        static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1448
1449   while (1) {
1450     // wrlock?
1451     if (lock->can_wrlock(client) &&
1452         (!want_scatter || lock->get_state() == LOCK_MIX)) {
1453       lock->get_wrlock();
1454       mut->wrlocks.insert(lock);
1455       mut->locks.insert(lock);
1456       return true;
1457     }
1458
1459     if (lock->get_type() == CEPH_LOCK_IFILE &&
1460         in->state_test(CInode::STATE_RECOVERING)) {
1461       mds->mdcache->recovery_queue.prioritize(in);
1462     }
1463
1464     if (!lock->is_stable())
1465       break;
1466
1467     if (in->is_auth()) {
1468       // don't do nested lock state change if we have dirty scatterdata and
1469       // may scatter_writebehind or start_scatter, because nowait==true implies
1470       // that the caller already has a log entry open!
1471       if (nowait && lock->is_dirty())
1472         return false;
1473
1474       if (want_scatter)
1475         scatter_mix(static_cast<ScatterLock*>(lock));
1476       else
1477         simple_lock(lock);
1478
1479       if (nowait && !lock->can_wrlock(client))
1480         return false;
1481       
1482     } else {
1483       // replica.
1484       // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1485       mds_rank_t auth = lock->get_parent()->authority().first;
1486       if (!mds->is_cluster_degraded() ||
1487           mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1488         dout(10) << "requesting scatter from auth on "
1489                  << *lock << " on " << *lock->get_parent() << dendl;
1490         mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1491       }
1492       break;
1493     }
1494   }
1495
1496   if (!nowait) {
1497     dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1498     lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1499     nudge_log(lock);
1500   }
1501     
1502   return false;
1503 }
1504
1505 void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1506 {
1507   if (lock->get_type() == CEPH_LOCK_IVERSION ||
1508       lock->get_type() == CEPH_LOCK_DVERSION)
1509     return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
1510
1511   dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1512   lock->put_wrlock();
1513   if (mut) {
1514     mut->wrlocks.erase(lock);
1515     if (mut->remote_wrlocks.count(lock) == 0)
1516       mut->locks.erase(lock);
1517   }
1518
1519   if (!lock->is_wrlocked()) {
1520     if (!lock->is_stable())
1521       eval_gather(lock, false, pneed_issue);
1522     else if (lock->get_parent()->is_auth())
1523       try_eval(lock, pneed_issue);
1524   }
1525 }
1526
1527
1528 // remote wrlock
1529
1530 void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1531 {
1532   dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1533
1534   // wait for active target
1535   if (mds->is_cluster_degraded() &&
1536       !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1537     dout(7) << " mds." << target << " is not active" << dendl;
1538     if (mut->more()->waiting_on_slave.empty())
1539       mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1540     return;
1541   }
1542
1543   // send lock request
1544   mut->start_locking(lock, target);
1545   mut->more()->slaves.insert(target);
1546   MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1547                                              MMDSSlaveRequest::OP_WRLOCK);
1548   r->set_lock_type(lock->get_type());
1549   lock->get_parent()->set_object_info(r->get_object_info());
1550   mds->send_message_mds(r, target);
1551
1552   assert(mut->more()->waiting_on_slave.count(target) == 0);
1553   mut->more()->waiting_on_slave.insert(target);
1554 }
1555
1556 void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
1557                                   MutationImpl *mut)
1558 {
1559   // drop ref
1560   mut->remote_wrlocks.erase(lock);
1561   if (mut->wrlocks.count(lock) == 0)
1562     mut->locks.erase(lock);
1563   
1564   dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1565           << " " << *lock->get_parent()  << dendl;
1566   if (!mds->is_cluster_degraded() ||
1567       mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1568     MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1569                                                       MMDSSlaveRequest::OP_UNWRLOCK);
1570     slavereq->set_lock_type(lock->get_type());
1571     lock->get_parent()->set_object_info(slavereq->get_object_info());
1572     mds->send_message_mds(slavereq, target);
1573   }
1574 }
1575
1576
1577 // ------------------
1578 // xlock
1579
1580 bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1581 {
1582   if (lock->get_type() == CEPH_LOCK_IVERSION ||
1583       lock->get_type() == CEPH_LOCK_DVERSION)
1584     return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1585
1586   dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1587   client_t client = mut->get_client();
1588
1589   // auth?
1590   if (lock->get_parent()->is_auth()) {
1591     // auth
1592     while (1) {
1593       if (lock->can_xlock(client)) {
1594         lock->set_state(LOCK_XLOCK);
1595         lock->get_xlock(mut, client);
1596         mut->xlocks.insert(lock);
1597         mut->locks.insert(lock);
1598         mut->finish_locking(lock);
1599         return true;
1600       }
1601       
1602       if (lock->get_type() == CEPH_LOCK_IFILE) {
1603         CInode *in = static_cast<CInode*>(lock->get_parent());
1604         if (in->state_test(CInode::STATE_RECOVERING)) {
1605           mds->mdcache->recovery_queue.prioritize(in);
1606         }
1607       }
1608
1609       if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1610                                  lock->get_xlock_by_client() != client ||
1611                                  lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1612         break;
1613
1614       if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1615         mut->start_locking(lock);
1616         simple_xlock(lock);
1617       } else {
1618         simple_lock(lock);
1619       }
1620     }
1621     
1622     lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1623     nudge_log(lock);
1624     return false;
1625   } else {
1626     // replica
1627     assert(lock->get_sm()->can_remote_xlock);
1628     assert(!mut->slave_request);
1629     
1630     // wait for single auth
1631     if (lock->get_parent()->is_ambiguous_auth()) {
1632       lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, 
1633                                      new C_MDS_RetryRequest(mdcache, mut));
1634       return false;
1635     }
1636     
1637     // wait for active auth
1638     mds_rank_t auth = lock->get_parent()->authority().first;
1639     if (mds->is_cluster_degraded() &&
1640         !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1641       dout(7) << " mds." << auth << " is not active" << dendl;
1642       if (mut->more()->waiting_on_slave.empty())
1643         mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1644       return false;
1645     }
1646
1647     // send lock request
1648     mut->more()->slaves.insert(auth);
1649     mut->start_locking(lock, auth);
1650     MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1651                                                MMDSSlaveRequest::OP_XLOCK);
1652     r->set_lock_type(lock->get_type());
1653     lock->get_parent()->set_object_info(r->get_object_info());
1654     mds->send_message_mds(r, auth);
1655
1656     assert(mut->more()->waiting_on_slave.count(auth) == 0);
1657     mut->more()->waiting_on_slave.insert(auth);
1658
1659     return false;
1660   }
1661 }
1662
1663 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1664 {
1665   assert(!lock->is_stable());
1666   if (lock->get_num_rdlocks() == 0 &&
1667       lock->get_num_wrlocks() == 0 &&
1668       lock->get_num_client_lease() == 0 &&
1669       lock->get_state() != LOCK_XLOCKSNAP &&
1670       lock->get_type() != CEPH_LOCK_DN) {
1671     CInode *in = static_cast<CInode*>(lock->get_parent());
1672     client_t loner = in->get_target_loner();
1673     if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1674       lock->set_state(LOCK_EXCL);
1675       lock->get_parent()->auth_unpin(lock);
1676       lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1677       if (lock->get_cap_shift())
1678         *pneed_issue = true;
1679       if (lock->get_parent()->is_auth() &&
1680           lock->is_stable())
1681         try_eval(lock, pneed_issue);
1682       return;
1683     }
1684   }
1685   // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1686   eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1687 }
1688
1689 void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1690 {
1691   if (lock->get_type() == CEPH_LOCK_IVERSION ||
1692       lock->get_type() == CEPH_LOCK_DVERSION)
1693     return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
1694
1695   dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1696
1697   client_t xlocker = lock->get_xlock_by_client();
1698
1699   // drop ref
1700   lock->put_xlock();
1701   assert(mut);
1702   mut->xlocks.erase(lock);
1703   mut->locks.erase(lock);
1704   
1705   bool do_issue = false;
1706
1707   // remote xlock?
1708   if (!lock->get_parent()->is_auth()) {
1709     assert(lock->get_sm()->can_remote_xlock);
1710
1711     // tell auth
1712     dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent()  << dendl;
1713     mds_rank_t auth = lock->get_parent()->authority().first;
1714     if (!mds->is_cluster_degraded() ||
1715         mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1716       MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1717                                                         MMDSSlaveRequest::OP_UNXLOCK);
1718       slavereq->set_lock_type(lock->get_type());
1719       lock->get_parent()->set_object_info(slavereq->get_object_info());
1720       mds->send_message_mds(slavereq, auth);
1721     }
1722     // others waiting?
1723     lock->finish_waiters(SimpleLock::WAIT_STABLE |
1724                          SimpleLock::WAIT_WR | 
1725                          SimpleLock::WAIT_RD, 0); 
1726   } else {
1727     if (lock->get_num_xlocks() == 0) {
1728       if (lock->get_state() == LOCK_LOCK_XLOCK)
1729         lock->set_state(LOCK_XLOCKDONE);
1730       _finish_xlock(lock, xlocker, &do_issue);
1731     }
1732   }
1733   
1734   if (do_issue) {
1735     CInode *in = static_cast<CInode*>(lock->get_parent());
1736     if (in->is_head()) {
1737       if (pneed_issue)
1738         *pneed_issue = true;
1739       else
1740         issue_caps(in);
1741     }
1742   }
1743 }
1744
1745 void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
1746 {
1747   dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1748
1749   lock->put_xlock();
1750   mut->xlocks.erase(lock);
1751   mut->locks.erase(lock);
1752
1753   MDSCacheObject *p = lock->get_parent();
1754   assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH));  // we are exporting this (inode)
1755
1756   if (!lock->is_stable())
1757     lock->get_parent()->auth_unpin(lock);
1758
1759   lock->set_state(LOCK_LOCK);
1760 }
1761
1762 void Locker::xlock_import(SimpleLock *lock)
1763 {
1764   dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1765   lock->get_parent()->auth_pin(lock);
1766 }
1767
1768
1769
1770 // file i/o -----------------------------------------
1771
1772 version_t Locker::issue_file_data_version(CInode *in)
1773 {
1774   dout(7) << "issue_file_data_version on " << *in << dendl;
1775   return in->inode.file_data_version;
1776 }
1777
1778 class C_Locker_FileUpdate_finish : public LockerLogContext {
1779   CInode *in;
1780   MutationRef mut;
1781   bool share_max;
1782   bool need_issue;
1783   client_t client;
1784   MClientCaps *ack;
1785 public:
1786   C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
1787                                 bool sm=false, bool ni=false, client_t c=-1,
1788                                 MClientCaps *ac = 0)
1789     : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
1790       client(c), ack(ac) {
1791     in->get(CInode::PIN_PTRWAITER);
1792   }
1793   void finish(int r) override {
1794     locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
1795     in->put(CInode::PIN_PTRWAITER);
1796   }
1797 };
1798
1799 void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
1800                                 client_t client, MClientCaps *ack)
1801 {
1802   dout(10) << "file_update_finish on " << *in << dendl;
1803   in->pop_and_dirty_projected_inode(mut->ls);
1804
1805   mut->apply();
1806   
1807   if (ack) {
1808     Session *session = mds->get_session(client);
1809     if (session) {
1810       // "oldest flush tid" > 0 means client uses unique TID for each flush
1811       if (ack->get_oldest_flush_tid() > 0)
1812         session->add_completed_flush(ack->get_client_tid());
1813       mds->send_message_client_counted(ack, session);
1814     } else {
1815       dout(10) << " no session for client." << client << " " << *ack << dendl;
1816       ack->put();
1817     }
1818   }
1819
1820   set<CInode*> need_issue;
1821   drop_locks(mut.get(), &need_issue);
1822
1823   if (!in->is_head() && !in->client_snap_caps.empty()) {
1824     dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1825     // check for snap writeback completion
1826     bool gather = false;
1827     compact_map<int,set<client_t> >::iterator p = in->client_snap_caps.begin();
1828     while (p != in->client_snap_caps.end()) {
1829       SimpleLock *lock = in->get_lock(p->first);
1830       assert(lock);
1831       dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1832                << " lock " << *lock << " on " << *in << dendl;
1833       lock->put_wrlock();
1834
1835       p->second.erase(client);
1836       if (p->second.empty()) {
1837         gather = true;
1838         in->client_snap_caps.erase(p++);
1839       } else
1840         ++p;
1841     }
1842     if (gather) {
1843       if (in->client_snap_caps.empty())
1844         in->item_open_file.remove_myself();
1845       eval_cap_gather(in, &need_issue);
1846     }
1847   } else {
1848     if (issue_client_cap && need_issue.count(in) == 0) {
1849       Capability *cap = in->get_client_cap(client);
1850       if (cap && (cap->wanted() & ~cap->pending()))
1851         issue_caps(in, cap);
1852     }
1853   
1854     if (share_max && in->is_auth() &&
1855         (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1856       share_inode_max_size(in);
1857   }
1858   issue_caps_set(need_issue);
1859
1860   // auth unpin after issuing caps
1861   mut->cleanup();
1862 }
1863
1864 Capability* Locker::issue_new_caps(CInode *in,
1865                                    int mode,
1866                                    Session *session,
1867                                    SnapRealm *realm,
1868                                    bool is_replay)
1869 {
1870   dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1871   bool is_new;
1872
1873   // if replay, try to reconnect cap, and otherwise do nothing.
1874   if (is_replay) {
1875     mds->mdcache->try_reconnect_cap(in, session);
1876     return 0;
1877   }
1878
1879   // my needs
1880   assert(session->info.inst.name.is_client());
1881   client_t my_client = session->info.inst.name.num();
1882   int my_want = ceph_caps_for_mode(mode);
1883
1884   // register a capability
1885   Capability *cap = in->get_client_cap(my_client);
1886   if (!cap) {
1887     // new cap
1888     cap = in->add_client_cap(my_client, session, realm);
1889     cap->set_wanted(my_want);
1890     cap->mark_new();
1891     cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1892     is_new = true;
1893   } else {
1894     is_new = false;
1895     // make sure it wants sufficient caps
1896     if (my_want & ~cap->wanted()) {
1897       // augment wanted caps for this client
1898       cap->set_wanted(cap->wanted() | my_want);
1899     }
1900   }
1901
1902   if (in->is_auth()) {
1903     // [auth] twiddle mode?
1904     eval(in, CEPH_CAP_LOCKS);
1905
1906     if (_need_flush_mdlog(in, my_want))
1907       mds->mdlog->flush();
1908
1909   } else {
1910     // [replica] tell auth about any new caps wanted
1911     request_inode_file_caps(in);
1912   }
1913
1914   // issue caps (pot. incl new one)
1915   //issue_caps(in);  // note: _eval above may have done this already...
1916
1917   // re-issue whatever we can
1918   //cap->issue(cap->pending());
1919
1920   if (is_new)
1921     cap->dec_suppress();
1922
1923   return cap;
1924 }
1925
1926
1927 void Locker::issue_caps_set(set<CInode*>& inset)
1928 {
1929   for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1930     issue_caps(*p);
1931 }
1932
1933 bool Locker::issue_caps(CInode *in, Capability *only_cap)
1934 {
1935   // allowed caps are determined by the lock mode.
1936   int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1937   int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
1938   int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
1939
1940   client_t loner = in->get_loner();
1941   if (loner >= 0) {
1942     dout(7) << "issue_caps loner client." << loner
1943             << " allowed=" << ccap_string(loner_allowed) 
1944             << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1945             << ", others allowed=" << ccap_string(all_allowed)
1946             << " on " << *in << dendl;
1947   } else {
1948     dout(7) << "issue_caps allowed=" << ccap_string(all_allowed) 
1949             << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1950             << " on " << *in << dendl;
1951   }
1952
1953   assert(in->is_head());
1954
1955   // count conflicts with
1956   int nissued = 0;        
1957
1958   // client caps
1959   map<client_t, Capability*>::iterator it;
1960   if (only_cap)
1961     it = in->client_caps.find(only_cap->get_client());
1962   else
1963     it = in->client_caps.begin();
1964   for (; it != in->client_caps.end(); ++it) {
1965     Capability *cap = it->second;
1966     if (cap->is_stale())
1967       continue;
1968
1969     // do not issue _new_ bits when size|mtime is projected
1970     int allowed;
1971     if (loner == it->first)
1972       allowed = loner_allowed;
1973     else
1974       allowed = all_allowed;
1975
1976     // add in any xlocker-only caps (for locks this client is the xlocker for)
1977     allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
1978
1979     Session *session = mds->get_session(it->first);
1980     if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
1981         !(session && session->connection &&
1982           session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
1983       allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
1984
1985     int pending = cap->pending();
1986     int wanted = cap->wanted();
1987
1988     dout(20) << " client." << it->first
1989              << " pending " << ccap_string(pending) 
1990              << " allowed " << ccap_string(allowed) 
1991              << " wanted " << ccap_string(wanted)
1992              << dendl;
1993
1994     if (!(pending & ~allowed)) {
1995       // skip if suppress or new, and not revocation
1996       if (cap->is_new() || cap->is_suppress()) {
1997         dout(20) << "  !revoke and new|suppressed, skipping client." << it->first << dendl;
1998         continue;
1999       }
2000     }
2001
2002     // notify clients about deleted inode, to make sure they release caps ASAP.
2003     if (in->inode.nlink == 0)
2004       wanted |= CEPH_CAP_LINK_SHARED;
2005
2006     // are there caps that the client _wants_ and can have, but aren't pending?
2007     // or do we need to revoke?
2008     if (((wanted & allowed) & ~pending) ||  // missing wanted+allowed caps
2009         (pending & ~allowed)) {             // need to revoke ~allowed caps.
2010       // issue
2011       nissued++;
2012
2013       // include caps that clients generally like, while we're at it.
2014       int likes = in->get_caps_liked();      
2015       int before = pending;
2016       long seq;
2017       if (pending & ~allowed)
2018         seq = cap->issue((wanted|likes) & allowed & pending);  // if revoking, don't issue anything new.
2019       else
2020         seq = cap->issue((wanted|likes) & allowed);
2021       int after = cap->pending();
2022
2023       if (cap->is_new()) {
2024         // haven't send caps to client yet
2025         if (before & ~after)
2026           cap->confirm_receipt(seq, after);
2027       } else {
2028         dout(7) << "   sending MClientCaps to client." << it->first
2029                 << " seq " << cap->get_last_seq()
2030                 << " new pending " << ccap_string(after) << " was " << ccap_string(before) 
2031                 << dendl;
2032
2033         int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2034         if (op == CEPH_CAP_OP_REVOKE) {
2035                 revoking_caps.push_back(&cap->item_revoking_caps);
2036                 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2037                 cap->set_last_revoke_stamp(ceph_clock_now());
2038                 cap->reset_num_revoke_warnings();
2039         }
2040
2041         MClientCaps *m = new MClientCaps(op, in->ino(),
2042                                          in->find_snaprealm()->inode->ino(),
2043                                          cap->get_cap_id(), cap->get_last_seq(),
2044                                          after, wanted, 0,
2045                                          cap->get_mseq(),
2046                                          mds->get_osd_epoch_barrier());
2047         in->encode_cap_message(m, cap);
2048
2049         mds->send_message_client_counted(m, it->first);
2050       }
2051     }
2052
2053     if (only_cap)
2054       break;
2055   }
2056
2057   return (nissued == 0);  // true if no re-issued, no callbacks
2058 }
2059
2060 void Locker::issue_truncate(CInode *in)
2061 {
2062   dout(7) << "issue_truncate on " << *in << dendl;
2063   
2064   for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
2065        it != in->client_caps.end();
2066        ++it) {
2067     Capability *cap = it->second;
2068     MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
2069                                      in->ino(),
2070                                      in->find_snaprealm()->inode->ino(),
2071                                      cap->get_cap_id(), cap->get_last_seq(),
2072                                      cap->pending(), cap->wanted(), 0,
2073                                      cap->get_mseq(),
2074                                      mds->get_osd_epoch_barrier());
2075     in->encode_cap_message(m, cap);                          
2076     mds->send_message_client_counted(m, it->first);
2077   }
2078
2079   // should we increase max_size?
2080   if (in->is_auth() && in->is_file())
2081     check_inode_max_size(in);
2082 }
2083
2084
2085 void Locker::revoke_stale_caps(Capability *cap)
2086 {
2087   CInode *in = cap->get_inode();
2088   if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2089     // if export succeeds, the cap will be removed. if export fails, we need to
2090     // revoke the cap if it's still stale.
2091     in->state_set(CInode::STATE_EVALSTALECAPS);
2092     return;
2093   }
2094
2095   int issued = cap->issued();
2096   if (issued & ~CEPH_CAP_PIN) {
2097     dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2098     cap->revoke();
2099
2100     if (in->is_auth() &&
2101         in->inode.client_ranges.count(cap->get_client()))
2102       in->state_set(CInode::STATE_NEEDSRECOVER);
2103
2104     if (!in->filelock.is_stable()) eval_gather(&in->filelock);
2105     if (!in->linklock.is_stable()) eval_gather(&in->linklock);
2106     if (!in->authlock.is_stable()) eval_gather(&in->authlock);
2107     if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
2108
2109     if (in->is_auth()) {
2110       try_eval(in, CEPH_CAP_LOCKS);
2111     } else {
2112       request_inode_file_caps(in);
2113     }
2114   }
2115 }
2116
2117 void Locker::revoke_stale_caps(Session *session)
2118 {
2119   dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2120
2121   for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2122     Capability *cap = *p;
2123     cap->mark_stale();
2124     revoke_stale_caps(cap);
2125   }
2126 }
2127
2128 void Locker::resume_stale_caps(Session *session)
2129 {
2130   dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2131
2132   for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2133     Capability *cap = *p;
2134     CInode *in = cap->get_inode();
2135     assert(in->is_head());
2136     if (cap->is_stale()) {
2137       dout(10) << " clearing stale flag on " << *in << dendl;
2138       cap->clear_stale();
2139
2140       if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2141         // if export succeeds, the cap will be removed. if export fails,
2142         // we need to re-issue the cap if it's not stale.
2143         in->state_set(CInode::STATE_EVALSTALECAPS);
2144         continue;
2145       }
2146
2147       if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2148         issue_caps(in, cap);
2149     }
2150   }
2151 }
2152
2153 void Locker::remove_stale_leases(Session *session)
2154 {
2155   dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2156   xlist<ClientLease*>::iterator p = session->leases.begin();
2157   while (!p.end()) {
2158     ClientLease *l = *p;
2159     ++p;
2160     CDentry *parent = static_cast<CDentry*>(l->parent);
2161     dout(15) << " removing lease on " << *parent << dendl;
2162     parent->remove_client_lease(l, this);
2163   }
2164 }
2165
2166
2167 class C_MDL_RequestInodeFileCaps : public LockerContext {
2168   CInode *in;
2169 public:
2170   C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2171     in->get(CInode::PIN_PTRWAITER);
2172   }
2173   void finish(int r) override {
2174     if (!in->is_auth())
2175       locker->request_inode_file_caps(in);
2176     in->put(CInode::PIN_PTRWAITER);
2177   }
2178 };
2179
2180 void Locker::request_inode_file_caps(CInode *in)
2181 {
2182   assert(!in->is_auth());
2183
2184   int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
2185   if (wanted != in->replica_caps_wanted) {
2186     // wait for single auth
2187     if (in->is_ambiguous_auth()) {
2188       in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, 
2189                      new C_MDL_RequestInodeFileCaps(this, in));
2190       return;
2191     }
2192
2193     mds_rank_t auth = in->authority().first;
2194     if (mds->is_cluster_degraded() &&
2195         mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2196       mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2197       return;
2198     }
2199
2200     dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2201             << " was " << ccap_string(in->replica_caps_wanted) 
2202             << " on " << *in << " to mds." << auth << dendl;
2203
2204     in->replica_caps_wanted = wanted;
2205
2206     if (!mds->is_cluster_degraded() ||
2207         mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2208       mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
2209                             auth);
2210   }
2211 }
2212
2213 /* This function DOES put the passed message before returning */
2214 void Locker::handle_inode_file_caps(MInodeFileCaps *m)
2215 {
2216   // nobody should be talking to us during recovery.
2217   assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
2218
2219   // ok
2220   CInode *in = mdcache->get_inode(m->get_ino());
2221   mds_rank_t from = mds_rank_t(m->get_source().num());
2222
2223   assert(in);
2224   assert(in->is_auth());
2225
2226   dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2227
2228   if (m->get_caps())
2229     in->mds_caps_wanted[from] = m->get_caps();
2230   else
2231     in->mds_caps_wanted.erase(from);
2232
2233   try_eval(in, CEPH_CAP_LOCKS);
2234   m->put();
2235 }
2236
2237
2238 class C_MDL_CheckMaxSize : public LockerContext {
2239   CInode *in;
2240   uint64_t new_max_size;
2241   uint64_t newsize;
2242   utime_t mtime;
2243
2244 public:
2245   C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2246                      uint64_t _newsize, utime_t _mtime) :
2247     LockerContext(l), in(i),
2248     new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2249   {
2250     in->get(CInode::PIN_PTRWAITER);
2251   }
2252   void finish(int r) override {
2253     if (in->is_auth())
2254       locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2255     in->put(CInode::PIN_PTRWAITER);
2256   }
2257 };
2258
2259 uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size)
2260 {
2261   uint64_t new_max = (size + 1) << 1;
2262   uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
2263   if (max_inc > 0) {
2264     max_inc *= pi->get_layout_size_increment();
2265     new_max = MIN(new_max, size + max_inc);
2266   }
2267   return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
2268 }
2269
2270 void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
2271                                     map<client_t,client_writeable_range_t> *new_ranges,
2272                                     bool *max_increased)
2273 {
2274   inode_t *latest = in->get_projected_inode();
2275   uint64_t ms;
2276   if(latest->has_layout()) {
2277     ms = calc_new_max_size(latest, size);
2278   } else {
2279     // Layout-less directories like ~mds0/, have zero size
2280     ms = 0;
2281   }
2282
2283   // increase ranges as appropriate.
2284   // shrink to 0 if no WR|BUFFER caps issued.
2285   for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
2286        p != in->client_caps.end();
2287        ++p) {
2288     if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2289       client_writeable_range_t& nr = (*new_ranges)[p->first];
2290       nr.range.first = 0;
2291       if (latest->client_ranges.count(p->first)) {
2292         client_writeable_range_t& oldr = latest->client_ranges[p->first];
2293         if (ms > oldr.range.last)
2294           *max_increased = true;
2295         nr.range.last = MAX(ms, oldr.range.last);
2296         nr.follows = oldr.follows;
2297       } else {
2298         *max_increased = true;
2299         nr.range.last = ms;
2300         nr.follows = in->first - 1;
2301       }
2302     }
2303   }
2304 }
2305
2306 bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2307                                   uint64_t new_max_size, uint64_t new_size,
2308                                   utime_t new_mtime)
2309 {
2310   assert(in->is_auth());
2311   assert(in->is_file());
2312
2313   inode_t *latest = in->get_projected_inode();
2314   map<client_t, client_writeable_range_t> new_ranges;
2315   uint64_t size = latest->size;
2316   bool update_size = new_size > 0;
2317   bool update_max = false;
2318   bool max_increased = false;
2319
2320   if (update_size) {
2321     new_size = size = MAX(size, new_size);
2322     new_mtime = MAX(new_mtime, latest->mtime);
2323     if (latest->size == new_size && latest->mtime == new_mtime)
2324       update_size = false;
2325   }
2326
2327   calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
2328
2329   if (max_increased || latest->client_ranges != new_ranges)
2330     update_max = true;
2331
2332   if (!update_size && !update_max) {
2333     dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2334     return false;
2335   }
2336
2337   dout(10) << "check_inode_max_size new_ranges " << new_ranges
2338            << " update_size " << update_size
2339            << " on " << *in << dendl;
2340
2341   if (in->is_frozen()) {
2342     dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2343     C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2344                                                      new_max_size,
2345                                                      new_size,
2346                                                      new_mtime);
2347     in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2348     return false;
2349   }
2350   if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2351     // lock?
2352     if (in->filelock.is_stable()) {
2353       if (in->get_target_loner() >= 0)
2354         file_excl(&in->filelock);
2355       else
2356         simple_lock(&in->filelock);
2357     }
2358     if (!in->filelock.can_wrlock(in->get_loner())) {
2359       // try again later
2360       C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2361                                                        new_max_size,
2362                                                        new_size,
2363                                                        new_mtime);
2364
2365       in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2366       dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2367       return false;    
2368     }
2369   }
2370
2371   MutationRef mut(new MutationImpl());
2372   mut->ls = mds->mdlog->get_current_segment();
2373     
2374   inode_t *pi = in->project_inode();
2375   pi->version = in->pre_dirty();
2376
2377   if (update_max) {
2378     dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
2379     pi->client_ranges = new_ranges;
2380   }
2381
2382   if (update_size) {
2383     dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl;
2384     pi->size = new_size;
2385     pi->rstat.rbytes = new_size;
2386     dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl;
2387     pi->mtime = new_mtime;
2388   }
2389
2390   // use EOpen if the file is still open; otherwise, use EUpdate.
2391   // this is just an optimization to push open files forward into
2392   // newer log segments.
2393   LogEvent *le;
2394   EMetaBlob *metablob;
2395   if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {   
2396     EOpen *eo = new EOpen(mds->mdlog);
2397     eo->add_ino(in->ino());
2398     metablob = &eo->metablob;
2399     le = eo;
2400     mut->ls->open_files.push_back(&in->item_open_file);
2401   } else {
2402     EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2403     metablob = &eu->metablob;
2404     le = eu;
2405   }
2406   mds->mdlog->start_entry(le);
2407   if (update_size) {  // FIXME if/when we do max_size nested accounting
2408     mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2409     // no cow, here!
2410     CDentry *parent = in->get_projected_parent_dn();
2411     metablob->add_primary_dentry(parent, in, true);
2412   } else {
2413     metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2414     mdcache->journal_dirty_inode(mut.get(), metablob, in);
2415   }
2416   mds->mdlog->submit_entry(le,
2417           new C_Locker_FileUpdate_finish(this, in, mut, true));
2418   wrlock_force(&in->filelock, mut);  // wrlock for duration of journal
2419   mut->auth_pin(in);
2420
2421   // make max_size _increase_ timely
2422   if (max_increased)
2423     mds->mdlog->flush();
2424
2425   return true;
2426 }
2427
2428
2429 void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2430 {
2431   /*
2432    * only share if currently issued a WR cap.  if client doesn't have it,
2433    * file_max doesn't matter, and the client will get it if/when they get
2434    * the cap later.
2435    */
2436   dout(10) << "share_inode_max_size on " << *in << dendl;
2437   map<client_t, Capability*>::iterator it;
2438   if (only_cap)
2439     it = in->client_caps.find(only_cap->get_client());
2440   else
2441     it = in->client_caps.begin();
2442   for (; it != in->client_caps.end(); ++it) {
2443     const client_t client = it->first;
2444     Capability *cap = it->second;
2445     if (cap->is_suppress())
2446       continue;
2447     if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2448       dout(10) << "share_inode_max_size with client." << client << dendl;
2449       cap->inc_last_seq();
2450       MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
2451                                        in->ino(),
2452                                        in->find_snaprealm()->inode->ino(),
2453                                        cap->get_cap_id(), cap->get_last_seq(),
2454                                        cap->pending(), cap->wanted(), 0,
2455                                        cap->get_mseq(),
2456                                        mds->get_osd_epoch_barrier());
2457       in->encode_cap_message(m, cap);
2458       mds->send_message_client_counted(m, client);
2459     }
2460     if (only_cap)
2461       break;
2462   }
2463 }
2464
2465 bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2466 {
2467   /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2468    * pending mutations. */
2469   if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2470        in->filelock.is_unstable_and_locked()) ||
2471       ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2472        in->authlock.is_unstable_and_locked()) ||
2473       ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2474        in->linklock.is_unstable_and_locked()) ||
2475       ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2476        in->xattrlock.is_unstable_and_locked()))
2477     return true;
2478   return false;
2479 }
2480
2481 void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2482 {
2483   if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2484     dout(10) << " wanted " << ccap_string(cap->wanted())
2485              << " -> " << ccap_string(wanted) << dendl;
2486     cap->set_wanted(wanted);
2487   } else if (wanted & ~cap->wanted()) {
2488     dout(10) << " wanted " << ccap_string(cap->wanted())
2489              << " -> " << ccap_string(wanted)
2490              << " (added caps even though we had seq mismatch!)" << dendl;
2491     cap->set_wanted(wanted | cap->wanted());
2492   } else {
2493     dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2494              << " -> " << ccap_string(wanted)
2495              << " (issue_seq " << issue_seq << " != last_issue "
2496              << cap->get_last_issue() << ")" << dendl;
2497     return;
2498   }
2499
2500   CInode *cur = cap->get_inode();
2501   if (!cur->is_auth()) {
2502     request_inode_file_caps(cur);
2503     return;
2504   }
2505
2506   if (cap->wanted() == 0) {
2507     if (cur->item_open_file.is_on_list() &&
2508         !cur->is_any_caps_wanted()) {
2509       dout(10) << " removing unwanted file from open file list " << *cur << dendl;
2510       cur->item_open_file.remove_myself();
2511     }
2512   } else {
2513     if (cur->state_test(CInode::STATE_RECOVERING) &&
2514         (cap->wanted() & (CEPH_CAP_FILE_RD |
2515                           CEPH_CAP_FILE_WR))) {
2516       mds->mdcache->recovery_queue.prioritize(cur);
2517     }
2518
2519     if (!cur->item_open_file.is_on_list()) {
2520       dout(10) << " adding to open file list " << *cur << dendl;
2521       assert(cur->last == CEPH_NOSNAP);
2522       LogSegment *ls = mds->mdlog->get_current_segment();
2523       EOpen *le = new EOpen(mds->mdlog);
2524       mds->mdlog->start_entry(le);
2525       le->add_clean_inode(cur);
2526       ls->open_files.push_back(&cur->item_open_file);
2527       mds->mdlog->submit_entry(le);
2528     }
2529   }
2530 }
2531
2532
2533
2534 void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
2535 {
2536   dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
2537   for (auto p = head_in->client_need_snapflush.begin();
2538        p != head_in->client_need_snapflush.end() && p->first < last; ) {
2539     snapid_t snapid = p->first;
2540     set<client_t>& clients = p->second;
2541     ++p;  // be careful, q loop below depends on this
2542
2543     if (clients.count(client)) {
2544       dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
2545       CInode *sin = mdcache->get_inode(head_in->ino(), snapid);
2546       if (!sin) {
2547         // hrm, look forward until we find the inode. 
2548         //  (we can only look it up by the last snapid it is valid for)
2549         dout(10) << " didn't have " << head_in->ino() << " snapid " << snapid << dendl;
2550         for (compact_map<snapid_t, set<client_t> >::iterator q = p;  // p is already at next entry
2551              q != head_in->client_need_snapflush.end();
2552              ++q) {
2553           dout(10) << " trying snapid " << q->first << dendl;
2554           sin = mdcache->get_inode(head_in->ino(), q->first);
2555           if (sin) {
2556             assert(sin->first <= snapid);
2557             break;
2558           }
2559           dout(10) << " didn't have " << head_in->ino() << " snapid " << q->first << dendl;
2560         }
2561         if (!sin && head_in->is_multiversion())
2562           sin = head_in;
2563         assert(sin);
2564       }
2565       _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
2566       head_in->remove_need_snapflush(sin, snapid, client);
2567     }
2568   }
2569 }
2570
2571
2572 bool Locker::should_defer_client_cap_frozen(CInode *in)
2573 {
2574   /*
2575    * This policy needs to be AT LEAST as permissive as allowing a client request
2576    * to go forward, or else a client request can release something, the release
2577    * gets deferred, but the request gets processed and deadlocks because when the
2578    * caps can't get revoked.
2579    *
2580    * Currently, a request wait if anything locked is freezing (can't
2581    * auth_pin), which would avoid any deadlock with cap release.  Thus @in
2582    * _MUST_ be in the lock/auth_pin set.
2583    *
2584    * auth_pins==0 implies no unstable lock and not auth pinnned by
2585    * client request, otherwise continue even it's freezing.
2586    */
2587   return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2588 }
2589
2590 /*
2591  * This function DOES put the passed message before returning
2592  */
2593 void Locker::handle_client_caps(MClientCaps *m)
2594 {
2595   Session *session = static_cast<Session *>(m->get_connection()->get_priv());
2596   client_t client = m->get_source().num();
2597
2598   snapid_t follows = m->get_snap_follows();
2599   dout(7) << "handle_client_caps "
2600           << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
2601           << " on " << m->get_ino()
2602           << " tid " << m->get_client_tid() << " follows " << follows
2603           << " op " << ceph_cap_op_name(m->get_op()) << dendl;
2604
2605   if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2606     if (!session) {
2607       dout(5) << " no session, dropping " << *m << dendl;
2608       m->put();
2609       return;
2610     }
2611     if (session->is_closed() ||
2612         session->is_closing() ||
2613         session->is_killing()) {
2614       dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2615       m->put();
2616       return;
2617     }
2618     if (mds->is_reconnect() &&
2619         m->get_dirty() && m->get_client_tid() > 0 &&
2620         !session->have_completed_flush(m->get_client_tid())) {
2621       mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
2622     }
2623     mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2624     return;
2625   }
2626
2627   if (m->get_client_tid() > 0 && session &&
2628       session->have_completed_flush(m->get_client_tid())) {
2629     dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2630             << " for client." << client << dendl;
2631     MClientCaps *ack;
2632     if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2633       ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
2634                             m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2635     } else {
2636       ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
2637                             m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
2638                             mds->get_osd_epoch_barrier());
2639     }
2640     ack->set_snap_follows(follows);
2641     ack->set_client_tid(m->get_client_tid());
2642     mds->send_message_client_counted(ack, m->get_connection());
2643     if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2644       m->put();
2645       return;
2646     } else {
2647       // fall-thru because the message may release some caps
2648       m->clear_dirty();
2649       m->set_op(CEPH_CAP_OP_UPDATE);
2650     }
2651   }
2652
2653   // "oldest flush tid" > 0 means client uses unique TID for each flush
2654   if (m->get_oldest_flush_tid() > 0 && session) {
2655     if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2656       mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2657
2658       if (session->get_num_trim_flushes_warnings() > 0 &&
2659           session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
2660         session->reset_num_trim_flushes_warnings();
2661     } else {
2662       if (session->get_num_completed_flushes() >=
2663           (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2664         session->inc_num_trim_flushes_warnings();
2665         stringstream ss;
2666         ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2667            << m->get_oldest_flush_tid() << "), "
2668            << session->get_num_completed_flushes()
2669            << " completed flushes recorded in session";
2670         mds->clog->warn() << ss.str();
2671         dout(20) << __func__ << " " << ss.str() << dendl;
2672       }
2673     }
2674   }
2675
2676   CInode *head_in = mdcache->get_inode(m->get_ino());
2677   if (!head_in) {
2678     if (mds->is_clientreplay()) {
2679       dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2680         << ", will try again after replayed client requests" << dendl;
2681       mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2682       return;
2683     }
2684     dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2685     m->put();
2686     return;
2687   }
2688
2689   if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2690     // Pause RADOS operations until we see the required epoch
2691     mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2692   }
2693
2694   if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2695     // Record the barrier so that we will retransmit it to clients
2696     mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2697   }
2698
2699   CInode *in = head_in;
2700   if (follows > 0) {
2701     in = mdcache->pick_inode_snap(head_in, follows);
2702     if (in != head_in)
2703       dout(10) << " head inode " << *head_in << dendl;
2704   }
2705   dout(10) << "  cap inode " << *in << dendl;
2706
2707   Capability *cap = 0;
2708   cap = in->get_client_cap(client);
2709   if (!cap && in != head_in)
2710     cap = head_in->get_client_cap(client);
2711   if (!cap) {
2712     dout(7) << "handle_client_caps no cap for client." << client << " on " << *in << dendl;
2713     m->put();
2714     return;
2715   }  
2716   assert(cap);
2717
2718   // freezing|frozen?
2719   if (should_defer_client_cap_frozen(in)) {
2720     dout(7) << "handle_client_caps freezing|frozen on " << *in << dendl;
2721     in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
2722     return;
2723   }
2724   if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2725     dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2726             << ", dropping" << dendl;
2727     m->put();
2728     return;
2729   }
2730
2731   int op = m->get_op();
2732
2733   // flushsnap?
2734   if (op == CEPH_CAP_OP_FLUSHSNAP) {
2735     if (!in->is_auth()) {
2736       dout(7) << " not auth, ignoring flushsnap on " << *in << dendl;
2737       goto out;
2738     }
2739
2740     SnapRealm *realm = in->find_snaprealm();
2741     snapid_t snap = realm->get_snap_following(follows);
2742     dout(10) << "  flushsnap follows " << follows << " -> snap " << snap << dendl;
2743
2744     // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2745     // other cap ops.  (except possibly duplicate FLUSHSNAP requests, but worst
2746     // case we get a dup response, so whatever.)
2747     MClientCaps *ack = 0;
2748     if (m->get_dirty()) {
2749       ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2750       ack->set_snap_follows(follows);
2751       ack->set_client_tid(m->get_client_tid());
2752       ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2753     }
2754
2755     if (in == head_in ||
2756         (head_in->client_need_snapflush.count(snap) &&
2757          head_in->client_need_snapflush[snap].count(client))) {
2758       dout(7) << " flushsnap snap " << snap
2759               << " client." << client << " on " << *in << dendl;
2760
2761       // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2762       if (in == head_in)
2763         cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
2764       else if (head_in->client_need_snapflush.begin()->first < snap)
2765         _do_null_snapflush(head_in, client, snap);
2766    
2767       _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
2768
2769       if (in != head_in)
2770         head_in->remove_need_snapflush(in, snap, client);
2771       
2772     } else {
2773       dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2774       if (ack)
2775         mds->send_message_client_counted(ack, m->get_connection());
2776     }
2777     goto out;
2778   }
2779
2780   if (cap->get_cap_id() != m->get_cap_id()) {
2781     dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2782   } else {
2783     // intermediate snap inodes
2784     while (in != head_in) {
2785       assert(in->last != CEPH_NOSNAP);
2786       if (in->is_auth() && m->get_dirty()) {
2787         dout(10) << " updating intermediate snapped inode " << *in << dendl;
2788         _do_cap_update(in, NULL, m->get_dirty(), follows, m);
2789       }
2790       in = mdcache->pick_inode_snap(head_in, in->last);
2791     }
2792  
2793     // head inode, and cap
2794     MClientCaps *ack = 0;
2795
2796     int caps = m->get_caps();
2797     if (caps & ~cap->issued()) {
2798       dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2799       caps &= cap->issued();
2800     }
2801     
2802     cap->confirm_receipt(m->get_seq(), caps);
2803     dout(10) << " follows " << follows
2804              << " retains " << ccap_string(m->get_caps())
2805              << " dirty " << ccap_string(m->get_dirty())
2806              << " on " << *in << dendl;
2807
2808
2809     // missing/skipped snapflush?
2810     //  The client MAY send a snapflush if it is issued WR/EXCL caps, but
2811     //  presently only does so when it has actual dirty metadata.  But, we
2812     //  set up the need_snapflush stuff based on the issued caps.
2813     //  We can infer that the client WONT send a FLUSHSNAP once they have
2814     //  released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2815     //  update/release).
2816     if (!head_in->client_need_snapflush.empty()) {
2817       if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2818         _do_null_snapflush(head_in, client);
2819       } else {
2820         dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2821       }
2822     }
2823     
2824     if (m->get_dirty() && in->is_auth()) {
2825       dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty()) 
2826               << " seq " << m->get_seq() << " on " << *in << dendl;
2827       ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2828                             m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2829       ack->set_client_tid(m->get_client_tid());
2830       ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2831     }
2832
2833     // filter wanted based on what we could ever give out (given auth/replica status)
2834     bool need_flush = m->flags & CLIENT_CAPS_SYNC;
2835     int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
2836     if (new_wanted != cap->wanted()) {
2837       if (!need_flush && (new_wanted & ~cap->pending())) {
2838         // exapnding caps.  make sure we aren't waiting for a log flush
2839         need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2840       }
2841
2842       adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2843     }
2844       
2845     if (in->is_auth() &&
2846         _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
2847       // updated
2848       eval(in, CEPH_CAP_LOCKS);
2849
2850       if (!need_flush && (cap->wanted() & ~cap->pending()))
2851         need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
2852     } else {
2853       // no update, ack now.
2854       if (ack)
2855         mds->send_message_client_counted(ack, m->get_connection());
2856       
2857       bool did_issue = eval(in, CEPH_CAP_LOCKS);
2858       if (!did_issue && (cap->wanted() & ~cap->pending()))
2859         issue_caps(in, cap);
2860
2861       if (cap->get_last_seq() == 0 &&
2862           (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
2863         cap->issue_norevoke(cap->issued());
2864         share_inode_max_size(in, cap);
2865       }
2866     }
2867
2868     if (need_flush)
2869       mds->mdlog->flush();
2870   }
2871
2872  out:
2873   m->put();
2874 }
2875
2876
2877 class C_Locker_RetryRequestCapRelease : public LockerContext {
2878   client_t client;
2879   ceph_mds_request_release item;
2880 public:
2881   C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
2882     LockerContext(l), client(c), item(it) { }
2883   void finish(int r) override {
2884     string dname;
2885     MDRequestRef null_ref;
2886     locker->process_request_cap_release(null_ref, client, item, dname);
2887   }
2888 };
2889
2890 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
2891                                          const string &dname)
2892 {
2893   inodeno_t ino = (uint64_t)item.ino;
2894   uint64_t cap_id = item.cap_id;
2895   int caps = item.caps;
2896   int wanted = item.wanted;
2897   int seq = item.seq;
2898   int issue_seq = item.issue_seq;
2899   int mseq = item.mseq;
2900
2901   CInode *in = mdcache->get_inode(ino);
2902   if (!in)
2903     return;
2904
2905   if (dname.length()) {
2906     frag_t fg = in->pick_dirfrag(dname);
2907     CDir *dir = in->get_dirfrag(fg);
2908     if (dir) {
2909       CDentry *dn = dir->lookup(dname);
2910       if (dn) {
2911         ClientLease *l = dn->get_client_lease(client);
2912         if (l) {
2913           dout(10) << "process_cap_release removing lease on " << *dn << dendl;
2914           dn->remove_client_lease(l, this);
2915         } else {
2916           dout(7) << "process_cap_release client." << client
2917                   << " doesn't have lease on " << *dn << dendl;
2918         }
2919       } else {
2920         dout(7) << "process_cap_release client." << client << " released lease on dn "
2921                 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
2922       }
2923     }
2924   }
2925
2926   Capability *cap = in->get_client_cap(client);
2927   if (!cap)
2928     return;
2929
2930   dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
2931            << (mdr ? "" : " (DEFERRED, no mdr)")
2932            << dendl;
2933     
2934   if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
2935     dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
2936     return;
2937   }
2938
2939   if (cap->get_cap_id() != cap_id) {
2940     dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
2941     return;
2942   }
2943
2944   if (should_defer_client_cap_frozen(in)) {
2945     dout(7) << " frozen, deferring" << dendl;
2946     in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
2947     return;
2948   }
2949     
2950   if (caps & ~cap->issued()) {
2951     dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2952     caps &= cap->issued();
2953   }
2954   cap->confirm_receipt(seq, caps);
2955
2956   if (!in->client_need_snapflush.empty() &&
2957       (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2958     _do_null_snapflush(in, client);
2959   }
2960
2961   adjust_cap_wanted(cap, wanted, issue_seq);
2962   
2963   if (mdr)
2964     cap->inc_suppress();
2965   eval(in, CEPH_CAP_LOCKS);
2966   if (mdr)
2967     cap->dec_suppress();
2968   
2969   // take note; we may need to reissue on this cap later
2970   if (mdr)
2971     mdr->cap_releases[in->vino()] = cap->get_last_seq();
2972 }
2973
2974 class C_Locker_RetryKickIssueCaps : public LockerContext {
2975   CInode *in;
2976   client_t client;
2977   ceph_seq_t seq;
2978 public:
2979   C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
2980     LockerContext(l), in(i), client(c), seq(s) {
2981     in->get(CInode::PIN_PTRWAITER);
2982   }
2983   void finish(int r) override {
2984     locker->kick_issue_caps(in, client, seq);
2985     in->put(CInode::PIN_PTRWAITER);
2986   }
2987 };
2988
2989 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
2990 {
2991   Capability *cap = in->get_client_cap(client);
2992   if (!cap || cap->get_last_sent() != seq)
2993     return;
2994   if (in->is_frozen()) {
2995     dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
2996     in->add_waiter(CInode::WAIT_UNFREEZE,
2997         new C_Locker_RetryKickIssueCaps(this, in, client, seq));
2998     return;
2999   }
3000   dout(10) << "kick_issue_caps released at current seq " << seq
3001     << ", reissuing" << dendl;
3002   issue_caps(in, cap);
3003 }
3004
3005 void Locker::kick_cap_releases(MDRequestRef& mdr)
3006 {
3007   client_t client = mdr->get_client();
3008   for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3009        p != mdr->cap_releases.end();
3010        ++p) {
3011     CInode *in = mdcache->get_inode(p->first);
3012     if (!in)
3013       continue;
3014     kick_issue_caps(in, client, p->second);
3015   }
3016 }
3017
3018 /**
3019  * m and ack might be NULL, so don't dereference them unless dirty != 0
3020  */
3021 void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
3022 {
3023   dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3024            << " follows " << follows << " snap " << snap
3025            << " on " << *in << dendl;
3026
3027   if (snap == CEPH_NOSNAP) {
3028     // hmm, i guess snap was already deleted?  just ack!
3029     dout(10) << " wow, the snap following " << follows
3030              << " was already deleted.  nothing to record, just ack." << dendl;
3031     if (ack)
3032       mds->send_message_client_counted(ack, m->get_connection());
3033     return;
3034   }
3035
3036   EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3037   mds->mdlog->start_entry(le);
3038   MutationRef mut = new MutationImpl();
3039   mut->ls = mds->mdlog->get_current_segment();
3040
3041   // normal metadata updates that we can apply to the head as well.
3042
3043   // update xattrs?
3044   bool xattrs = false;
3045   map<string,bufferptr> *px = 0;
3046   if ((dirty & CEPH_CAP_XATTR_EXCL) && 
3047       m->xattrbl.length() &&
3048       m->head.xattr_version > in->get_projected_inode()->xattr_version)
3049     xattrs = true;
3050
3051   old_inode_t *oi = 0;
3052   if (in->is_multiversion()) {
3053     oi = in->pick_old_inode(snap);
3054   }
3055
3056   inode_t *pi;
3057   if (oi) {
3058     dout(10) << " writing into old inode" << dendl;
3059     pi = in->project_inode();
3060     pi->version = in->pre_dirty();
3061     if (snap > oi->first)
3062       in->split_old_inode(snap);
3063     pi = &oi->inode;
3064     if (xattrs)
3065       px = &oi->xattrs;
3066   } else {
3067     if (xattrs)
3068       px = new map<string,bufferptr>;
3069     pi = in->project_inode(px);
3070     pi->version = in->pre_dirty();
3071   }
3072
3073   _update_cap_fields(in, dirty, m, pi);
3074
3075   // xattr
3076   if (px) {
3077     dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version
3078             << " len " << m->xattrbl.length() << dendl;
3079     pi->xattr_version = m->head.xattr_version;
3080     bufferlist::iterator p = m->xattrbl.begin();
3081     ::decode(*px, p);
3082   }
3083
3084   if (pi->client_ranges.count(client)) {
3085     if (in->last == snap) {
3086       dout(10) << "  removing client_range entirely" << dendl;
3087       pi->client_ranges.erase(client);
3088     } else {
3089       dout(10) << "  client_range now follows " << snap << dendl;
3090       pi->client_ranges[client].follows = snap;
3091     }
3092   }
3093
3094   mut->auth_pin(in);
3095   mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3096   mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3097
3098   // "oldest flush tid" > 0 means client uses unique TID for each flush
3099   if (ack && ack->get_oldest_flush_tid() > 0)
3100     le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3101                                   ack->get_oldest_flush_tid());
3102
3103   mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
3104                                                               client, ack));
3105 }
3106
3107 void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi)
3108 {
3109   if (dirty == 0)
3110     return;
3111
3112   /* m must be valid if there are dirty caps */
3113   assert(m);
3114   uint64_t features = m->get_connection()->get_features();
3115
3116   if (m->get_ctime() > pi->ctime) {
3117     dout(7) << "  ctime " << pi->ctime << " -> " << m->get_ctime()
3118             << " for " << *in << dendl;
3119     pi->ctime = m->get_ctime();
3120   }
3121
3122   if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3123       m->get_change_attr() > pi->change_attr) {
3124     dout(7) << "  change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3125             << " for " << *in << dendl;
3126     pi->change_attr = m->get_change_attr();
3127   }
3128
3129   // file
3130   if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3131     utime_t atime = m->get_atime();
3132     utime_t mtime = m->get_mtime();
3133     uint64_t size = m->get_size();
3134     version_t inline_version = m->inline_version;
3135     
3136     if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3137         ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3138       dout(7) << "  mtime " << pi->mtime << " -> " << mtime
3139               << " for " << *in << dendl;
3140       pi->mtime = mtime;
3141     }
3142     if (in->inode.is_file() &&   // ONLY if regular file
3143         size > pi->size) {
3144       dout(7) << "  size " << pi->size << " -> " << size
3145               << " for " << *in << dendl;
3146       pi->size = size;
3147       pi->rstat.rbytes = size;
3148     }
3149     if (in->inode.is_file() &&
3150         (dirty & CEPH_CAP_FILE_WR) &&
3151         inline_version > pi->inline_data.version) {
3152       pi->inline_data.version = inline_version;
3153       if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3154         pi->inline_data.get_data() = m->inline_data;
3155       else
3156         pi->inline_data.free_data();
3157     }
3158     if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3159       dout(7) << "  atime " << pi->atime << " -> " << atime
3160               << " for " << *in << dendl;
3161       pi->atime = atime;
3162     }
3163     if ((dirty & CEPH_CAP_FILE_EXCL) &&
3164         ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3165       dout(7) << "  time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3166               << " for " << *in << dendl;
3167       pi->time_warp_seq = m->get_time_warp_seq();
3168     }
3169   }
3170   // auth
3171   if (dirty & CEPH_CAP_AUTH_EXCL) {
3172     if (m->head.uid != pi->uid) {
3173       dout(7) << "  uid " << pi->uid
3174               << " -> " << m->head.uid
3175               << " for " << *in << dendl;
3176       pi->uid = m->head.uid;
3177     }
3178     if (m->head.gid != pi->gid) {
3179       dout(7) << "  gid " << pi->gid
3180               << " -> " << m->head.gid
3181               << " for " << *in << dendl;
3182       pi->gid = m->head.gid;
3183     }
3184     if (m->head.mode != pi->mode) {
3185       dout(7) << "  mode " << oct << pi->mode
3186               << " -> " << m->head.mode << dec
3187               << " for " << *in << dendl;
3188       pi->mode = m->head.mode;
3189     }
3190     if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3191       dout(7) << "  btime " << oct << pi->btime
3192               << " -> " << m->get_btime() << dec
3193               << " for " << *in << dendl;
3194       pi->btime = m->get_btime();
3195     }
3196   }
3197 }
3198
3199 /*
3200  * update inode based on cap flush|flushsnap|wanted.
3201  *  adjust max_size, if needed.
3202  * if we update, return true; otherwise, false (no updated needed).
3203  */
3204 bool Locker::_do_cap_update(CInode *in, Capability *cap,
3205                             int dirty, snapid_t follows,
3206                             MClientCaps *m, MClientCaps *ack,
3207                             bool *need_flush)
3208 {
3209   dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3210            << " issued " << ccap_string(cap ? cap->issued() : 0)
3211            << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3212            << " on " << *in << dendl;
3213   assert(in->is_auth());
3214   client_t client = m->get_source().num();
3215   inode_t *latest = in->get_projected_inode();
3216
3217   // increase or zero max_size?
3218   uint64_t size = m->get_size();
3219   bool change_max = false;
3220   uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3221   uint64_t new_max = old_max;
3222   
3223   if (in->is_file()) {
3224     bool forced_change_max = false;
3225     dout(20) << "inode is file" << dendl;
3226     if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3227       dout(20) << "client has write caps; m->get_max_size="
3228                << m->get_max_size() << "; old_max=" << old_max << dendl;
3229       if (m->get_max_size() > new_max) {
3230         dout(10) << "client requests file_max " << m->get_max_size()
3231                  << " > max " << old_max << dendl;
3232         change_max = true;
3233         forced_change_max = true;
3234         new_max = calc_new_max_size(latest, m->get_max_size());
3235       } else {
3236         new_max = calc_new_max_size(latest, size);
3237
3238         if (new_max > old_max)
3239           change_max = true;
3240         else
3241           new_max = old_max;
3242       }
3243     } else {
3244       if (old_max) {
3245         change_max = true;
3246         new_max = 0;
3247       }
3248     }
3249
3250     if (in->last == CEPH_NOSNAP &&
3251         change_max &&
3252         !in->filelock.can_wrlock(client) &&
3253         !in->filelock.can_force_wrlock(client)) {
3254       dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3255       if (in->filelock.is_stable()) {
3256         bool need_issue = false;
3257         if (cap)
3258           cap->inc_suppress();
3259         if (in->mds_caps_wanted.empty() &&
3260             (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3261           if (in->filelock.get_state() != LOCK_EXCL)
3262             file_excl(&in->filelock, &need_issue);
3263         } else
3264           simple_lock(&in->filelock, &need_issue);
3265         if (need_issue)
3266           issue_caps(in);
3267         if (cap)
3268           cap->dec_suppress();
3269       }
3270       if (!in->filelock.can_wrlock(client) &&
3271           !in->filelock.can_force_wrlock(client)) {
3272         C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3273                                                          forced_change_max ? new_max : 0,
3274                                                          0, utime_t());
3275
3276         in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3277         change_max = false;
3278       }
3279     }
3280   }
3281
3282   if (m->flockbl.length()) {
3283     int32_t num_locks;
3284     bufferlist::iterator bli = m->flockbl.begin();
3285     ::decode(num_locks, bli);
3286     for ( int i=0; i < num_locks; ++i) {
3287       ceph_filelock decoded_lock;
3288       ::decode(decoded_lock, bli);
3289       in->get_fcntl_lock_state()->held_locks.
3290         insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3291       ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3292     }
3293     ::decode(num_locks, bli);
3294     for ( int i=0; i < num_locks; ++i) {
3295       ceph_filelock decoded_lock;
3296       ::decode(decoded_lock, bli);
3297       in->get_flock_lock_state()->held_locks.
3298         insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3299       ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3300     }
3301   }
3302
3303   if (!dirty && !change_max)
3304     return false;
3305
3306   Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3307   if (session->check_access(in, MAY_WRITE,
3308                             m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3309     session->put();
3310     dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3311     return false;
3312   }
3313   session->put();
3314
3315   // do the update.
3316   EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3317   mds->mdlog->start_entry(le);
3318
3319   // xattrs update?
3320   map<string,bufferptr> *px = 0;
3321   if ((dirty & CEPH_CAP_XATTR_EXCL) && 
3322       m->xattrbl.length() &&
3323       m->head.xattr_version > in->get_projected_inode()->xattr_version)
3324     px = new map<string,bufferptr>;
3325
3326   inode_t *pi = in->project_inode(px);
3327   pi->version = in->pre_dirty();
3328
3329   MutationRef mut(new MutationImpl());
3330   mut->ls = mds->mdlog->get_current_segment();
3331
3332   _update_cap_fields(in, dirty, m, pi);
3333
3334   if (change_max) {
3335     dout(7) << "  max_size " << old_max << " -> " << new_max
3336             << " for " << *in << dendl;
3337     if (new_max) {
3338       pi->client_ranges[client].range.first = 0;
3339       pi->client_ranges[client].range.last = new_max;
3340       pi->client_ranges[client].follows = in->first - 1;
3341     } else 
3342       pi->client_ranges.erase(client);
3343   }
3344     
3345   if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) 
3346     wrlock_force(&in->filelock, mut);  // wrlock for duration of journal
3347
3348   // auth
3349   if (dirty & CEPH_CAP_AUTH_EXCL)
3350     wrlock_force(&in->authlock, mut);
3351
3352   // xattr
3353   if (px) {
3354     dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl;
3355     pi->xattr_version = m->head.xattr_version;
3356     bufferlist::iterator p = m->xattrbl.begin();
3357     ::decode(*px, p);
3358
3359     wrlock_force(&in->xattrlock, mut);
3360   }
3361   
3362   mut->auth_pin(in);
3363   mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3364   mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3365
3366   // "oldest flush tid" > 0 means client uses unique TID for each flush
3367   if (ack && ack->get_oldest_flush_tid() > 0)
3368     le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3369                                   ack->get_oldest_flush_tid());
3370
3371   mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
3372                                                               change_max, !!cap,
3373                                                               client, ack));
3374   if (need_flush && !*need_flush &&
3375       ((change_max && new_max) || // max INCREASE
3376        _need_flush_mdlog(in, dirty)))
3377     *need_flush = true;
3378
3379   return true;
3380 }
3381
3382 /* This function DOES put the passed message before returning */
3383 void Locker::handle_client_cap_release(MClientCapRelease *m)
3384 {
3385   client_t client = m->get_source().num();
3386   dout(10) << "handle_client_cap_release " << *m << dendl;
3387
3388   if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3389     mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3390     return;
3391   }
3392
3393   if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3394     // Pause RADOS operations until we see the required epoch
3395     mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3396   }
3397
3398   if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3399     // Record the barrier so that we will retransmit it to clients
3400     mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3401   }
3402
3403   Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3404
3405   for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
3406     _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
3407   }
3408
3409   if (session) {
3410     session->notify_cap_release(m->caps.size());
3411   }
3412
3413   m->put();
3414 }
3415
3416 class C_Locker_RetryCapRelease : public LockerContext {
3417   client_t client;
3418   inodeno_t ino;
3419   uint64_t cap_id;
3420   ceph_seq_t migrate_seq;
3421   ceph_seq_t issue_seq;
3422 public:
3423   C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3424                            ceph_seq_t mseq, ceph_seq_t seq) :
3425     LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3426   void finish(int r) override {
3427     locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3428   }
3429 };
3430
3431 void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3432                              ceph_seq_t mseq, ceph_seq_t seq)
3433 {
3434   CInode *in = mdcache->get_inode(ino);
3435   if (!in) {
3436     dout(7) << "_do_cap_release missing ino " << ino << dendl;
3437     return;
3438   }
3439   Capability *cap = in->get_client_cap(client);
3440   if (!cap) {
3441     dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3442     return;
3443   }
3444
3445   dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3446   if (cap->get_cap_id() != cap_id) {
3447     dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3448     return;
3449   }
3450   if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3451     dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3452     return;
3453   }
3454   if (should_defer_client_cap_frozen(in)) {
3455     dout(7) << " freezing|frozen, deferring" << dendl;
3456     in->add_waiter(CInode::WAIT_UNFREEZE,
3457                   new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3458     return;
3459   }
3460   if (seq != cap->get_last_issue()) {
3461     dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3462     // clean out any old revoke history
3463     cap->clean_revoke_from(seq);
3464     eval_cap_gather(in);
3465     return;
3466   }
3467   remove_client_cap(in, client);
3468 }
3469
3470 /* This function DOES put the passed message before returning */
3471
3472 void Locker::remove_client_cap(CInode *in, client_t client)
3473 {
3474   // clean out any pending snapflush state
3475   if (!in->client_need_snapflush.empty())
3476     _do_null_snapflush(in, client);
3477
3478   in->remove_client_cap(client);
3479
3480   if (in->is_auth()) {
3481     // make sure we clear out the client byte range
3482     if (in->get_projected_inode()->client_ranges.count(client) &&
3483         !(in->inode.nlink == 0 && !in->is_any_caps()))    // unless it's unlink + stray
3484       check_inode_max_size(in);
3485   } else {
3486     request_inode_file_caps(in);
3487   }
3488   
3489   try_eval(in, CEPH_CAP_LOCKS);
3490 }
3491
3492
3493 /**
3494  * Return true if any currently revoking caps exceed the
3495  * mds_revoke_cap_timeout threshold.
3496  */
3497 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
3498 {
3499     xlist<Capability*>::const_iterator p = revoking.begin();
3500     if (p.end()) {
3501       // No revoking caps at the moment
3502       return false;
3503     } else {
3504       utime_t now = ceph_clock_now();
3505       utime_t age = now - (*p)->get_last_revoke_stamp();
3506       if (age <= g_conf->mds_revoke_cap_timeout) {
3507           return false;
3508       } else {
3509           return true;
3510       }
3511     }
3512 }
3513
3514
3515 void Locker::get_late_revoking_clients(std::list<client_t> *result) const
3516 {
3517   if (!any_late_revoking_caps(revoking_caps)) {
3518     // Fast path: no misbehaving clients, execute in O(1)
3519     return;
3520   }
3521
3522   // Slow path: execute in O(N_clients)
3523   std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
3524   for (client_rc_iter = revoking_caps_by_client.begin();
3525        client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
3526     xlist<Capability*> const &client_rc = client_rc_iter->second;
3527     bool any_late = any_late_revoking_caps(client_rc);
3528     if (any_late) {
3529         result->push_back(client_rc_iter->first);
3530     }
3531   }
3532 }
3533
3534 // Hard-code instead of surfacing a config settings because this is
3535 // really a hack that should go away at some point when we have better
3536 // inspection tools for getting at detailed cap state (#7316)
3537 #define MAX_WARN_CAPS 100
3538
3539 void Locker::caps_tick()
3540 {
3541   utime_t now = ceph_clock_now();
3542
3543   dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3544
3545   int i = 0;
3546   for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3547     Capability *cap = *p;
3548
3549     utime_t age = now - cap->get_last_revoke_stamp();
3550     dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3551     if (age <= g_conf->mds_revoke_cap_timeout) {
3552       dout(20) << __func__ << " age below timeout " << g_conf->mds_revoke_cap_timeout << dendl;
3553       break;
3554     } else {
3555       ++i;
3556       if (i > MAX_WARN_CAPS) {
3557         dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3558           << "revoking, ignoring subsequent caps" << dendl;
3559         break;
3560       }
3561     }
3562     // exponential backoff of warning intervals
3563     if (age > g_conf->mds_revoke_cap_timeout * (1 << cap->get_num_revoke_warnings())) {
3564       cap->inc_num_revoke_warnings();
3565       stringstream ss;
3566       ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3567          << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3568          << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3569       mds->clog->warn() << ss.str();
3570       dout(20) << __func__ << " " << ss.str() << dendl;
3571     } else {
3572       dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3573     }
3574   }
3575 }
3576
3577
3578 void Locker::handle_client_lease(MClientLease *m)
3579 {
3580   dout(10) << "handle_client_lease " << *m << dendl;
3581
3582   assert(m->get_source().is_client());
3583   client_t client = m->get_source().num();
3584
3585   CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3586   if (!in) {
3587     dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3588     m->put();
3589     return;
3590   }
3591   CDentry *dn = 0;
3592
3593   frag_t fg = in->pick_dirfrag(m->dname);
3594   CDir *dir = in->get_dirfrag(fg);
3595   if (dir) 
3596     dn = dir->lookup(m->dname);
3597   if (!dn) {
3598     dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3599     m->put();
3600     return;
3601   }
3602   dout(10) << " on " << *dn << dendl;
3603
3604   // replica and lock
3605   ClientLease *l = dn->get_client_lease(client);
3606   if (!l) {
3607     dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3608     m->put();
3609     return;
3610   } 
3611
3612   switch (m->get_action()) {
3613   case CEPH_MDS_LEASE_REVOKE_ACK:
3614   case CEPH_MDS_LEASE_RELEASE:
3615     if (l->seq != m->get_seq()) {
3616       dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3617     } else {
3618       dout(7) << "handle_client_lease client." << client
3619               << " on " << *dn << dendl;
3620       dn->remove_client_lease(l, this);
3621     }
3622     m->put();
3623     break;
3624
3625   case CEPH_MDS_LEASE_RENEW:
3626     {
3627       dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3628               << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3629       if (dn->lock.can_lease(client)) {
3630         int pool = 1;   // fixme.. do something smart!
3631         m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3632         m->h.seq = ++l->seq;
3633         m->clear_payload();
3634
3635         utime_t now = ceph_clock_now();
3636         now += mdcache->client_lease_durations[pool];
3637         mdcache->touch_client_lease(l, pool, now);
3638
3639         mds->send_message_client_counted(m, m->get_connection());
3640       }
3641     }
3642     break;
3643
3644   default:
3645     ceph_abort(); // implement me
3646     break;
3647   }
3648 }
3649
3650
3651 void Locker::issue_client_lease(CDentry *dn, client_t client,
3652                                bufferlist &bl, utime_t now, Session *session)
3653 {
3654   CInode *diri = dn->get_dir()->get_inode();
3655   if (!diri->is_stray() &&  // do not issue dn leases in stray dir!
3656       ((!diri->filelock.can_lease(client) &&
3657         (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3658       dn->lock.can_lease(client)) {
3659     int pool = 1;   // fixme.. do something smart!
3660     // issue a dentry lease
3661     ClientLease *l = dn->add_client_lease(client, session);
3662     session->touch_lease(l);
3663     
3664     now += mdcache->client_lease_durations[pool];
3665     mdcache->touch_client_lease(l, pool, now);
3666
3667     LeaseStat e;
3668     e.mask = 1 | CEPH_LOCK_DN;  // old and new bit values
3669     e.seq = ++l->seq;
3670     e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3671     ::encode(e, bl);
3672     dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
3673              << " on " << *dn << dendl;
3674   } else {
3675     // null lease
3676     LeaseStat e;
3677     e.mask = 0;
3678     e.seq = 0;
3679     e.duration_ms = 0;
3680     ::encode(e, bl);
3681     dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3682   }
3683 }
3684
3685
3686 void Locker::revoke_client_leases(SimpleLock *lock)
3687 {
3688   int n = 0;
3689   CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3690   for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3691        p != dn->client_lease_map.end();
3692        ++p) {
3693     ClientLease *l = p->second;
3694     
3695     n++;
3696     assert(lock->get_type() == CEPH_LOCK_DN);
3697
3698     CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3699     int mask = 1 | CEPH_LOCK_DN; // old and new bits
3700     
3701     // i should also revoke the dir ICONTENT lease, if they have it!
3702     CInode *diri = dn->get_dir()->get_inode();
3703     mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
3704                                               mask,
3705                                               diri->ino(),
3706                                               diri->first, CEPH_NOSNAP,
3707                                               dn->get_name()),
3708                              l->client);
3709   }
3710   assert(n == lock->get_num_client_lease());
3711 }
3712
3713
3714
3715 // locks ----------------------------------------------------------------
3716
3717 SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info) 
3718 {
3719   switch (lock_type) {
3720   case CEPH_LOCK_DN:
3721     {
3722       // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3723       CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3724       frag_t fg;
3725       CDir *dir = 0;
3726       CDentry *dn = 0;
3727       if (diri) {
3728         fg = diri->pick_dirfrag(info.dname);
3729         dir = diri->get_dirfrag(fg);
3730         if (dir) 
3731           dn = dir->lookup(info.dname, info.snapid);
3732       }
3733       if (!dn) {
3734         dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3735         return 0;
3736       }
3737       return &dn->lock;
3738     }
3739
3740   case CEPH_LOCK_IAUTH:
3741   case CEPH_LOCK_ILINK:
3742   case CEPH_LOCK_IDFT:
3743   case CEPH_LOCK_IFILE:
3744   case CEPH_LOCK_INEST:
3745   case CEPH_LOCK_IXATTR:
3746   case CEPH_LOCK_ISNAP:
3747   case CEPH_LOCK_IFLOCK:
3748   case CEPH_LOCK_IPOLICY:
3749     {
3750       CInode *in = mdcache->get_inode(info.ino, info.snapid);
3751       if (!in) {
3752         dout(7) << "get_lock don't have ino " << info.ino << dendl;
3753         return 0;
3754       }
3755       switch (lock_type) {
3756       case CEPH_LOCK_IAUTH: return &in->authlock;
3757       case CEPH_LOCK_ILINK: return &in->linklock;
3758       case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3759       case CEPH_LOCK_IFILE: return &in->filelock;
3760       case CEPH_LOCK_INEST: return &in->nestlock;
3761       case CEPH_LOCK_IXATTR: return &in->xattrlock;
3762       case CEPH_LOCK_ISNAP: return &in->snaplock;
3763       case CEPH_LOCK_IFLOCK: return &in->flocklock;
3764       case CEPH_LOCK_IPOLICY: return &in->policylock;
3765       }
3766     }
3767
3768   default:
3769     dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3770     ceph_abort();
3771     break;
3772   }
3773
3774   return 0;  
3775 }
3776
3777 /* This function DOES put the passed message before returning */
3778 void Locker::handle_lock(MLock *m)
3779 {
3780   // nobody should be talking to us during recovery.
3781   assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3782
3783   SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3784   if (!lock) {
3785     dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3786     m->put();
3787     return;
3788   }
3789
3790   switch (lock->get_type()) {
3791   case CEPH_LOCK_DN:
3792   case CEPH_LOCK_IAUTH:
3793   case CEPH_LOCK_ILINK:
3794   case CEPH_LOCK_ISNAP:
3795   case CEPH_LOCK_IXATTR:
3796   case CEPH_LOCK_IFLOCK:
3797   case CEPH_LOCK_IPOLICY:
3798     handle_simple_lock(lock, m);
3799     break;
3800     
3801   case CEPH_LOCK_IDFT:
3802   case CEPH_LOCK_INEST:
3803     //handle_scatter_lock((ScatterLock*)lock, m);
3804     //break;
3805
3806   case CEPH_LOCK_IFILE:
3807     handle_file_lock(static_cast<ScatterLock*>(lock), m);
3808     break;
3809     
3810   default:
3811     dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3812     ceph_abort();
3813     break;
3814   }
3815 }
3816  
3817
3818
3819
3820
3821 // ==========================================================================
3822 // simple lock
3823
3824 /** This function may take a reference to m if it needs one, but does
3825  * not put references. */
3826 void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
3827 {
3828   MDSCacheObject *parent = lock->get_parent();
3829   if (parent->is_auth() &&
3830       lock->get_state() != LOCK_SYNC &&
3831       !parent->is_frozen()) {
3832     dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3833             << " on " << *parent << dendl;
3834     assert(parent->is_auth()); // replica auth pinned if they're doing this!
3835     if (lock->is_stable()) {
3836       simple_sync(lock);
3837     } else {
3838       dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
3839       lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
3840                        new C_MDS_RetryMessage(mds, m->get()));
3841     }
3842   } else {
3843     dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3844             << " on " << *parent << dendl;
3845     // replica should retry
3846   }
3847 }
3848
3849 /* This function DOES put the passed message before returning */
3850 void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
3851 {
3852   int from = m->get_asker();
3853   
3854   dout(10) << "handle_simple_lock " << *m
3855            << " on " << *lock << " " << *lock->get_parent() << dendl;
3856
3857   if (mds->is_rejoin()) {
3858     if (lock->get_parent()->is_rejoining()) {
3859       dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
3860               << ", dropping " << *m << dendl;
3861       m->put();
3862       return;
3863     }
3864   }
3865
3866   switch (m->get_action()) {
3867     // -- replica --
3868   case LOCK_AC_SYNC:
3869     assert(lock->get_state() == LOCK_LOCK);
3870     lock->decode_locked_state(m->get_data());
3871     lock->set_state(LOCK_SYNC);
3872     lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
3873     break;
3874     
3875   case LOCK_AC_LOCK:
3876     assert(lock->get_state() == LOCK_SYNC);
3877     lock->set_state(LOCK_SYNC_LOCK);
3878     if (lock->is_leased())
3879       revoke_client_leases(lock);
3880     eval_gather(lock, true);
3881     if (lock->is_unstable_and_locked())
3882       mds->mdlog->flush();
3883     break;
3884
3885
3886     // -- auth --
3887   case LOCK_AC_LOCKACK:
3888     assert(lock->get_state() == LOCK_SYNC_LOCK ||
3889            lock->get_state() == LOCK_SYNC_EXCL);
3890     assert(lock->is_gathering(from));
3891     lock->remove_gather(from);
3892     
3893     if (lock->is_gathering()) {
3894       dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3895               << ", still gathering " << lock->get_gather_set() << dendl;
3896     } else {
3897       dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3898               << ", last one" << dendl;
3899       eval_gather(lock);
3900     }
3901     break;
3902
3903   case LOCK_AC_REQRDLOCK:
3904     handle_reqrdlock(lock, m);
3905     break;
3906
3907   }
3908
3909   m->put();
3910 }
3911
3912 /* unused, currently.
3913
3914 class C_Locker_SimpleEval : public Context {
3915   Locker *locker;
3916   SimpleLock *lock;
3917 public:
3918   C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3919   void finish(int r) {
3920     locker->try_simple_eval(lock);
3921   }
3922 };
3923
3924 void Locker::try_simple_eval(SimpleLock *lock)
3925 {
3926   // unstable and ambiguous auth?
3927   if (!lock->is_stable() &&
3928       lock->get_parent()->is_ambiguous_auth()) {
3929     dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3930     //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3931     lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3932     return;
3933   }
3934
3935   if (!lock->get_parent()->is_auth()) {
3936     dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3937     return;
3938   }
3939
3940   if (!lock->get_parent()->can_auth_pin()) {
3941     dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3942     //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3943     lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3944     return;
3945   }
3946
3947   if (lock->is_stable())
3948     simple_eval(lock);
3949 }
3950 */
3951
3952
3953 void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
3954 {
3955   dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
3956
3957   assert(lock->get_parent()->is_auth());
3958   assert(lock->is_stable());
3959
3960   if (lock->get_parent()->is_freezing_or_frozen()) {
3961     // dentry lock in unreadable state can block path traverse
3962     if ((lock->get_type() != CEPH_LOCK_DN ||
3963          lock->get_state() == LOCK_SYNC ||
3964          lock->get_parent()->is_frozen()))
3965       return;
3966   }
3967
3968   if (mdcache->is_readonly()) {
3969     if (lock->get_state() != LOCK_SYNC) {
3970       dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
3971       simple_sync(lock, need_issue);
3972     }
3973     return;
3974   }
3975
3976   CInode *in = 0;
3977   int wanted = 0;
3978   if (lock->get_type() != CEPH_LOCK_DN) {
3979     in = static_cast<CInode*>(lock->get_parent());
3980     in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
3981   }
3982   
3983   // -> excl?
3984   if (lock->get_state() != LOCK_EXCL &&
3985       in && in->get_target_loner() >= 0 &&
3986       (wanted & CEPH_CAP_GEXCL)) {
3987     dout(7) << "simple_eval stable, going to excl " << *lock 
3988             << " on " << *lock->get_parent() << dendl;
3989     simple_excl(lock, need_issue);
3990   }
3991
3992   // stable -> sync?
3993   else if (lock->get_state() != LOCK_SYNC &&
3994            !lock->is_wrlocked() &&
3995            ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
3996             (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
3997     dout(7) << "simple_eval stable, syncing " << *lock 
3998             << " on " << *lock->get_parent() << dendl;
3999     simple_sync(lock, need_issue);
4000   }
4001 }
4002
4003
4004 // mid
4005
4006 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4007 {
4008   dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4009   assert(lock->get_parent()->is_auth());
4010   assert(lock->is_stable());
4011
4012   CInode *in = 0;
4013   if (lock->get_cap_shift())
4014     in = static_cast<CInode *>(lock->get_parent());
4015
4016   int old_state = lock->get_state();
4017
4018   if (old_state != LOCK_TSYN) {
4019
4020     switch (lock->get_state()) {
4021     case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4022     case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4023     case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4024     case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4025     default: ceph_abort();
4026     }
4027
4028     int gather = 0;
4029     if (lock->is_wrlocked())
4030       gather++;
4031     
4032     if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4033       send_lock_message(lock, LOCK_AC_SYNC);
4034       lock->init_gather();
4035       gather++;
4036     }
4037     
4038     if (in && in->is_head()) {
4039       if (in->issued_caps_need_gather(lock)) {
4040         if (need_issue)
4041           *need_issue = true;
4042         else
4043           issue_caps(in);
4044         gather++;
4045       }
4046     }
4047     
4048     bool need_recover = false;
4049     if (lock->get_type() == CEPH_LOCK_IFILE) {
4050       assert(in);
4051       if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4052         mds->mdcache->queue_file_recover(in);
4053         need_recover = true;
4054         gather++;
4055       }
4056     }
4057     
4058     if (!gather && lock->is_dirty()) {
4059       lock->get_parent()->auth_pin(lock);
4060       scatter_writebehind(static_cast<ScatterLock*>(lock));
4061       mds->mdlog->flush();
4062       return false;
4063     }
4064
4065     if (gather) {
4066       lock->get_parent()->auth_pin(lock);
4067       if (need_recover)
4068         mds->mdcache->do_file_recover();
4069       return false;
4070     }
4071   }
4072
4073   if (lock->get_parent()->is_replicated()) {    // FIXME
4074     bufferlist data;
4075     lock->encode_locked_state(data);
4076     send_lock_message(lock, LOCK_AC_SYNC, data);
4077   }
4078   lock->set_state(LOCK_SYNC);
4079   lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4080   if (in && in->is_head()) {
4081     if (need_issue)
4082       *need_issue = true;
4083     else
4084       issue_caps(in);
4085   }
4086   return true;
4087 }
4088
4089 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4090 {
4091   dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4092   assert(lock->get_parent()->is_auth());
4093   assert(lock->is_stable());
4094
4095   CInode *in = 0;
4096   if (lock->get_cap_shift())
4097     in = static_cast<CInode *>(lock->get_parent());
4098
4099   switch (lock->get_state()) {
4100   case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4101   case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4102   case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4103   default: ceph_abort();
4104   }
4105   
4106   int gather = 0;
4107   if (lock->is_rdlocked())
4108     gather++;
4109   if (lock->is_wrlocked())
4110     gather++;
4111
4112   if (lock->get_parent()->is_replicated() && 
4113       lock->get_state() != LOCK_LOCK_EXCL &&
4114       lock->get_state() != LOCK_XSYN_EXCL) {
4115     send_lock_message(lock, LOCK_AC_LOCK);
4116     lock->init_gather();
4117     gather++;
4118   }
4119   
4120   if (in && in->is_head()) {
4121     if (in->issued_caps_need_gather(lock)) {
4122       if (need_issue)
4123         *need_issue = true;
4124       else
4125         issue_caps(in);
4126       gather++;
4127     }
4128   }
4129   
4130   if (gather) {
4131     lock->get_parent()->auth_pin(lock);
4132   } else {
4133     lock->set_state(LOCK_EXCL);
4134     lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4135     if (in) {
4136       if (need_issue)
4137         *need_issue = true;
4138       else
4139         issue_caps(in);
4140     }
4141   }
4142 }
4143
4144 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4145 {
4146   dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4147   assert(lock->get_parent()->is_auth());
4148   assert(lock->is_stable());
4149   assert(lock->get_state() != LOCK_LOCK);
4150   
4151   CInode *in = 0;
4152   if (lock->get_cap_shift())
4153     in = static_cast<CInode *>(lock->get_parent());
4154
4155   int old_state = lock->get_state();
4156
4157   switch (lock->get_state()) {
4158   case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4159   case LOCK_XSYN:
4160     file_excl(static_cast<ScatterLock*>(lock), need_issue);
4161     if (lock->get_state() != LOCK_EXCL)
4162       return;
4163     // fall-thru
4164   case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4165   case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4166     (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4167     break;
4168   case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4169   default: ceph_abort();
4170   }
4171
4172   int gather = 0;
4173   if (lock->is_leased()) {
4174     gather++;
4175     revoke_client_leases(lock);
4176   }
4177   if (lock->is_rdlocked())
4178     gather++;
4179   if (in && in->is_head()) {
4180     if (in->issued_caps_need_gather(lock)) {
4181       if (need_issue)
4182         *need_issue = true;
4183       else
4184         issue_caps(in);
4185       gather++;
4186     }
4187   }
4188
4189   bool need_recover = false;
4190   if (lock->get_type() == CEPH_LOCK_IFILE) {
4191     assert(in);
4192     if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4193       mds->mdcache->queue_file_recover(in);
4194       need_recover = true;
4195       gather++;
4196     }
4197   }
4198
4199   if (lock->get_parent()->is_replicated() &&
4200       lock->get_state() == LOCK_MIX_LOCK &&
4201       gather) {
4202     dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4203   } else {
4204     // move to second stage of gather now, so we don't send the lock action later.
4205     if (lock->get_state() == LOCK_MIX_LOCK)
4206       lock->set_state(LOCK_MIX_LOCK2);
4207
4208     if (lock->get_parent()->is_replicated() &&
4209         lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) {  // replica may already be LOCK
4210       gather++;
4211       send_lock_message(lock, LOCK_AC_LOCK);
4212       lock->init_gather();
4213     }
4214   }
4215
4216   if (!gather && lock->is_dirty()) {
4217     lock->get_parent()->auth_pin(lock);
4218     scatter_writebehind(static_cast<ScatterLock*>(lock));
4219     mds->mdlog->flush();
4220     return;
4221   }
4222
4223   if (gather) {
4224     lock->get_parent()->auth_pin(lock);
4225     if (need_recover)
4226       mds->mdcache->do_file_recover();
4227   } else {
4228     lock->set_state(LOCK_LOCK);
4229     lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4230   }
4231 }
4232
4233
4234 void Locker::simple_xlock(SimpleLock *lock)
4235 {
4236   dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4237   assert(lock->get_parent()->is_auth());
4238   //assert(lock->is_stable());
4239   assert(lock->get_state() != LOCK_XLOCK);
4240   
4241   CInode *in = 0;
4242   if (lock->get_cap_shift())
4243     in = static_cast<CInode *>(lock->get_parent());
4244
4245   if (lock->is_stable())
4246     lock->get_parent()->auth_pin(lock);
4247
4248   switch (lock->get_state()) {
4249   case LOCK_LOCK: 
4250   case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4251   default: ceph_abort();
4252   }
4253
4254   int gather = 0;
4255   if (lock->is_rdlocked())
4256     gather++;
4257   if (lock->is_wrlocked())
4258     gather++;
4259   
4260   if (in && in->is_head()) {
4261     if (in->issued_caps_need_gather(lock)) {
4262       issue_caps(in);
4263       gather++;
4264     }
4265   }
4266
4267   if (!gather) {
4268     lock->set_state(LOCK_PREXLOCK);
4269     //assert("shouldn't be called if we are already xlockable" == 0);
4270   }
4271 }
4272
4273
4274
4275
4276
4277 // ==========================================================================
4278 // scatter lock
4279
4280 /*
4281
4282 Some notes on scatterlocks.
4283
4284  - The scatter/gather is driven by the inode lock.  The scatter always
4285    brings in the latest metadata from the fragments.
4286
4287  - When in a scattered/MIX state, fragments are only allowed to
4288    update/be written to if the accounted stat matches the inode's
4289    current version.
4290
4291  - That means, on gather, we _only_ assimilate diffs for frag metadata
4292    that match the current version, because those are the only ones
4293    written during this scatter/gather cycle.  (Others didn't permit
4294    it.)  We increment the version and journal this to disk.
4295
4296  - When possible, we also simultaneously update our local frag
4297    accounted stats to match.
4298
4299  - On scatter, the new inode info is broadcast to frags, both local
4300    and remote.  If possible (auth and !frozen), the dirfrag auth
4301    should update the accounted state (if it isn't already up to date).
4302    Note that this may occur on both the local inode auth node and
4303    inode replicas, so there are two potential paths. If it is NOT
4304    possible, they need to mark_stale to prevent any possible writes.
4305
4306  - A scatter can be to MIX (potentially writeable) or to SYNC (read
4307    only).  Both are opportunities to update the frag accounted stats,
4308    even though only the MIX case is affected by a stale dirfrag.
4309
4310  - Because many scatter/gather cycles can potentially go by without a
4311    frag being able to update its accounted stats (due to being frozen
4312    by exports/refragments in progress), the frag may have (even very)
4313    old stat versions.  That's fine.  If when we do want to update it,
4314    we can update accounted_* and the version first.
4315
4316 */
4317
4318 class C_Locker_ScatterWB : public LockerLogContext {
4319   ScatterLock *lock;
4320   MutationRef mut;
4321 public:
4322   C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4323     LockerLogContext(l), lock(sl), mut(m) {}
4324   void finish(int r) override { 
4325     locker->scatter_writebehind_finish(lock, mut); 
4326   }
4327 };
4328
4329 void Locker::scatter_writebehind(ScatterLock *lock)
4330 {
4331   CInode *in = static_cast<CInode*>(lock->get_parent());
4332   dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4333
4334   // journal
4335   MutationRef mut(new MutationImpl());
4336   mut->ls = mds->mdlog->get_current_segment();
4337
4338   // forcefully take a wrlock
4339   lock->get_wrlock(true);
4340   mut->wrlocks.insert(lock);
4341   mut->locks.insert(lock);
4342
4343   in->pre_cow_old_inode();  // avoid cow mayhem
4344
4345   inode_t *pi = in->project_inode();
4346   pi->version = in->pre_dirty();
4347
4348   in->finish_scatter_gather_update(lock->get_type());
4349   lock->start_flush();
4350
4351   EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4352   mds->mdlog->start_entry(le);
4353
4354   mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4355   mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4356   
4357   in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4358
4359   mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4360 }
4361
4362 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4363 {
4364   CInode *in = static_cast<CInode*>(lock->get_parent());
4365   dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4366   in->pop_and_dirty_projected_inode(mut->ls);
4367
4368   lock->finish_flush();
4369
4370   // if replicas may have flushed in a mix->lock state, send another
4371   // message so they can finish_flush().
4372   if (in->is_replicated()) {
4373     switch (lock->get_state()) {
4374     case LOCK_MIX_LOCK:
4375     case LOCK_MIX_LOCK2:
4376     case LOCK_MIX_EXCL:
4377     case LOCK_MIX_TSYN:
4378       send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4379     }
4380   }
4381
4382   mut->apply();
4383   drop_locks(mut.get());
4384   mut->cleanup();
4385
4386   if (lock->is_stable())
4387     lock->finish_waiters(ScatterLock::WAIT_STABLE);
4388
4389   //scatter_eval_gather(lock);
4390 }
4391
4392 void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4393 {
4394   dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4395
4396   assert(lock->get_parent()->is_auth());
4397   assert(lock->is_stable());
4398
4399   if (lock->get_parent()->is_freezing_or_frozen()) {
4400     dout(20) << "  freezing|frozen" << dendl;
4401     return;
4402   }
4403
4404   if (mdcache->is_readonly()) {
4405     if (lock->get_state() != LOCK_SYNC) {
4406       dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4407       simple_sync(lock, need_issue);
4408     }
4409     return;
4410   }
4411   
4412   if (!lock->is_rdlocked() &&
4413       lock->get_state() != LOCK_MIX &&
4414       lock->get_scatter_wanted()) {
4415     dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4416              << " on " << *lock->get_parent() << dendl;
4417     scatter_mix(lock, need_issue);
4418     return;
4419   }
4420
4421   if (lock->get_type() == CEPH_LOCK_INEST) {
4422     // in general, we want to keep INEST writable at all times.
4423     if (!lock->is_rdlocked()) {
4424       if (lock->get_parent()->is_replicated()) {
4425         if (lock->get_state() != LOCK_MIX)
4426           scatter_mix(lock, need_issue);
4427       } else {
4428         if (lock->get_state() != LOCK_LOCK)
4429           simple_lock(lock, need_issue);
4430       }
4431     }
4432     return;
4433   }
4434
4435   CInode *in = static_cast<CInode*>(lock->get_parent());
4436   if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4437     // i _should_ be sync.
4438     if (!lock->is_wrlocked() &&
4439         lock->get_state() != LOCK_SYNC) {
4440       dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4441       simple_sync(lock, need_issue);
4442     }
4443   }
4444 }
4445
4446
4447 /*
4448  * mark a scatterlock to indicate that the dir fnode has some dirty data
4449  */
4450 void Locker::mark_updated_scatterlock(ScatterLock *lock)
4451 {
4452   lock->mark_dirty();
4453   if (lock->get_updated_item()->is_on_list()) {
4454     dout(10) << "mark_updated_scatterlock " << *lock
4455              << " - already on list since " << lock->get_update_stamp() << dendl;
4456   } else {
4457     updated_scatterlocks.push_back(lock->get_updated_item());
4458     utime_t now = ceph_clock_now();
4459     lock->set_update_stamp(now);
4460     dout(10) << "mark_updated_scatterlock " << *lock
4461              << " - added at " << now << dendl;
4462   }
4463 }
4464
4465 /*
4466  * this is called by scatter_tick and LogSegment::try_to_trim() when
4467  * trying to flush dirty scattered data (i.e. updated fnode) back to
4468  * the inode.
4469  *
4470  * we need to lock|scatter in order to push fnode changes into the
4471  * inode.dirstat.
4472  */
4473 void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
4474 {
4475   CInode *p = static_cast<CInode *>(lock->get_parent());
4476
4477   if (p->is_frozen() || p->is_freezing()) {
4478     dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4479     if (c) 
4480       p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
4481     else
4482       // just requeue.  not ideal.. starvation prone..
4483       updated_scatterlocks.push_back(lock->get_updated_item());
4484     return;
4485   }
4486
4487   if (p->is_ambiguous_auth()) {
4488     dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4489     if (c) 
4490       p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
4491     else
4492       // just requeue.  not ideal.. starvation prone..
4493       updated_scatterlocks.push_back(lock->get_updated_item());
4494     return;
4495   }
4496
4497   if (p->is_auth()) {
4498     int count = 0;
4499     while (true) {
4500       if (lock->is_stable()) {
4501         // can we do it now?
4502         //  (only if we're not replicated.. if we are, we really do need
4503         //   to nudge the lock state!)
4504         /*
4505           actually, even if we're not replicated, we can't stay in MIX, because another mds
4506           could discover and replicate us at any time.  if that happens while we're flushing,
4507           they end up in MIX but their inode has the old scatterstat version.
4508
4509         if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4510           dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4511           scatter_writebehind(lock);
4512           if (c)
4513             lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4514           return;
4515         }
4516         */
4517
4518         if (mdcache->is_readonly()) {
4519           if (lock->get_state() != LOCK_SYNC) {
4520             dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4521             simple_sync(static_cast<ScatterLock*>(lock));
4522           }
4523           break;
4524         }
4525
4526         // adjust lock state
4527         dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4528         switch (lock->get_type()) {
4529         case CEPH_LOCK_IFILE:
4530           if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4531             scatter_mix(static_cast<ScatterLock*>(lock));
4532           else if (lock->get_state() != LOCK_LOCK)
4533             simple_lock(static_cast<ScatterLock*>(lock));
4534           else
4535             simple_sync(static_cast<ScatterLock*>(lock));
4536           break;
4537           
4538         case CEPH_LOCK_IDFT:
4539         case CEPH_LOCK_INEST:
4540           if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4541             scatter_mix(lock);
4542           else if (lock->get_state() != LOCK_LOCK)
4543             simple_lock(lock);
4544           else
4545             simple_sync(lock);
4546           break;
4547         default:
4548           ceph_abort();
4549         }
4550         ++count;
4551         if (lock->is_stable() && count == 2) {
4552           dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4553           // this should only realy happen when called via
4554           // handle_file_lock due to AC_NUDGE, because the rest of the
4555           // time we are replicated or have dirty data and won't get
4556           // called.  bailing here avoids an infinite loop.
4557           assert(!c); 
4558           break;
4559         }
4560       } else {
4561         dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4562         if (c)
4563           lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4564         return;
4565       }
4566     }
4567   } else {
4568     dout(10) << "scatter_nudge replica, requesting scatter/unscatter of " 
4569              << *lock << " on " << *p << dendl;
4570     // request unscatter?
4571     mds_rank_t auth = lock->get_parent()->authority().first;
4572     if (!mds->is_cluster_degraded() ||
4573         mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
4574       mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4575
4576     // wait...
4577     if (c)
4578       lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4579
4580     // also, requeue, in case we had wrong auth or something
4581     updated_scatterlocks.push_back(lock->get_updated_item());
4582   }
4583 }
4584
4585 void Locker::scatter_tick()
4586 {
4587   dout(10) << "scatter_tick" << dendl;
4588   
4589   // updated
4590   utime_t now = ceph_clock_now();
4591   int n = updated_scatterlocks.size();
4592   while (!updated_scatterlocks.empty()) {
4593     ScatterLock *lock = updated_scatterlocks.front();
4594
4595     if (n-- == 0) break;  // scatter_nudge() may requeue; avoid looping
4596     
4597     if (!lock->is_dirty()) {
4598       updated_scatterlocks.pop_front();
4599       dout(10) << " removing from updated_scatterlocks " 
4600                << *lock << " " << *lock->get_parent() << dendl;
4601       continue;
4602     }
4603     if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
4604       break;
4605     updated_scatterlocks.pop_front();
4606     scatter_nudge(lock, 0);
4607   }
4608   mds->mdlog->flush();
4609 }
4610
4611
4612 void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4613 {
4614   dout(10) << "scatter_tempsync " << *lock
4615            << " on " << *lock->get_parent() << dendl;
4616   assert(lock->get_parent()->is_auth());
4617   assert(lock->is_stable());
4618
4619   assert(0 == "not fully implemented, at least not for filelock");
4620
4621   CInode *in = static_cast<CInode *>(lock->get_parent());
4622
4623   switch (lock->get_state()) {
4624   case LOCK_SYNC: ceph_abort();   // this shouldn't happen
4625   case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4626   case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4627   default: ceph_abort();
4628   }
4629
4630   int gather = 0;
4631   if (lock->is_wrlocked())
4632     gather++;
4633
4634   if (lock->get_cap_shift() &&
4635       in->is_head() &&
4636       in->issued_caps_need_gather(lock)) {
4637     if (need_issue)
4638       *need_issue = true;
4639     else
4640       issue_caps(in);
4641     gather++;
4642   }
4643
4644   if (lock->get_state() == LOCK_MIX_TSYN &&
4645       in->is_replicated()) {
4646     lock->init_gather();
4647     send_lock_message(lock, LOCK_AC_LOCK);
4648     gather++;
4649   }
4650
4651   if (gather) {
4652     in->auth_pin(lock);
4653   } else {
4654     // do tempsync
4655     lock->set_state(LOCK_TSYN);
4656     lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4657     if (lock->get_cap_shift()) {
4658       if (need_issue)
4659         *need_issue = true;
4660       else
4661         issue_caps(in);
4662     }
4663   }
4664 }
4665
4666
4667
4668 // ==========================================================================
4669 // local lock
4670
4671 void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4672 {
4673   dout(7) << "local_wrlock_grab  on " << *lock
4674           << " on " << *lock->get_parent() << dendl;  
4675   
4676   assert(lock->get_parent()->is_auth());
4677   assert(lock->can_wrlock());
4678   assert(!mut->wrlocks.count(lock));
4679   lock->get_wrlock(mut->get_client());
4680   mut->wrlocks.insert(lock);
4681   mut->locks.insert(lock);
4682 }
4683
4684 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4685 {
4686   dout(7) << "local_wrlock_start  on " << *lock
4687           << " on " << *lock->get_parent() << dendl;  
4688   
4689   assert(lock->get_parent()->is_auth());
4690   if (lock->can_wrlock()) {
4691     assert(!mut->wrlocks.count(lock));
4692     lock->get_wrlock(mut->get_client());
4693     mut->wrlocks.insert(lock);
4694     mut->locks.insert(lock);
4695     return true;
4696   } else {
4697     lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4698     return false;
4699   }
4700 }
4701
4702 void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
4703 {
4704   dout(7) << "local_wrlock_finish  on " << *lock
4705           << " on " << *lock->get_parent() << dendl;  
4706   lock->put_wrlock();
4707   mut->wrlocks.erase(lock);
4708   mut->locks.erase(lock);
4709   if (lock->get_num_wrlocks() == 0) {
4710     lock->finish_waiters(SimpleLock::WAIT_STABLE |
4711                          SimpleLock::WAIT_WR |
4712                          SimpleLock::WAIT_RD);
4713   }
4714 }
4715
4716 bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4717 {
4718   dout(7) << "local_xlock_start  on " << *lock
4719           << " on " << *lock->get_parent() << dendl;  
4720   
4721   assert(lock->get_parent()->is_auth());
4722   if (!lock->can_xlock_local()) {
4723     lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4724     return false;
4725   }
4726
4727   lock->get_xlock(mut, mut->get_client());
4728   mut->xlocks.insert(lock);
4729   mut->locks.insert(lock);
4730   return true;
4731 }
4732
4733 void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
4734 {
4735   dout(7) << "local_xlock_finish  on " << *lock
4736           << " on " << *lock->get_parent() << dendl;  
4737   lock->put_xlock();
4738   mut->xlocks.erase(lock);
4739   mut->locks.erase(lock);
4740
4741   lock->finish_waiters(SimpleLock::WAIT_STABLE | 
4742                        SimpleLock::WAIT_WR | 
4743                        SimpleLock::WAIT_RD);
4744 }
4745
4746
4747
4748 // ==========================================================================
4749 // file lock
4750
4751
4752 void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4753 {
4754   CInode *in = static_cast<CInode*>(lock->get_parent());
4755   int loner_wanted, other_wanted;
4756   int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4757   dout(7) << "file_eval wanted=" << gcap_string(wanted)
4758           << " loner_wanted=" << gcap_string(loner_wanted)
4759           << " other_wanted=" << gcap_string(other_wanted)
4760           << "  filelock=" << *lock << " on " << *lock->get_parent()
4761           << dendl;
4762
4763   assert(lock->get_parent()->is_auth());
4764   assert(lock->is_stable());
4765
4766   if (lock->get_parent()->is_freezing_or_frozen())
4767     return;
4768
4769   if (mdcache->is_readonly()) {
4770     if (lock->get_state() != LOCK_SYNC) {
4771       dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4772       simple_sync(lock, need_issue);
4773     }
4774     return;
4775   }
4776
4777   // excl -> *?
4778   if (lock->get_state() == LOCK_EXCL) {
4779     dout(20) << " is excl" << dendl;
4780     int loner_issued, other_issued, xlocker_issued;
4781     in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4782     dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4783             << " other_issued=" << gcap_string(other_issued)
4784             << " xlocker_issued=" << gcap_string(xlocker_issued)
4785             << dendl;
4786     if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4787          (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4788         (in->inode.is_dir() && in->multiple_nonstale_caps())) {  // FIXME.. :/
4789       dout(20) << " should lose it" << dendl;
4790       // we should lose it.
4791       //  loner  other   want
4792       //  R      R       SYNC
4793       //  R      R|W     MIX
4794       //  R      W       MIX
4795       //  R|W    R       MIX
4796       //  R|W    R|W     MIX
4797       //  R|W    W       MIX
4798       //  W      R       MIX
4799       //  W      R|W     MIX
4800       //  W      W       MIX
4801       // -> any writer means MIX; RD doesn't matter.
4802       if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4803           lock->is_waiter_for(SimpleLock::WAIT_WR))
4804         scatter_mix(lock, need_issue);
4805       else if (!lock->is_wrlocked())   // let excl wrlocks drain first
4806         simple_sync(lock, need_issue);
4807       else
4808         dout(10) << " waiting for wrlock to drain" << dendl;
4809     }    
4810   }
4811
4812   // * -> excl?
4813   else if (lock->get_state() != LOCK_EXCL &&
4814            !lock->is_rdlocked() &&
4815            //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4816            ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4817             (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4818            in->get_target_loner() >= 0) {
4819     dout(7) << "file_eval stable, bump to loner " << *lock
4820             << " on " << *lock->get_parent() << dendl;
4821     file_excl(lock, need_issue);
4822   }
4823
4824   // * -> mixed?
4825   else if (lock->get_state() != LOCK_MIX &&
4826            !lock->is_rdlocked() &&
4827            //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4828            (lock->get_scatter_wanted() ||
4829             (in->get_wanted_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
4830     dout(7) << "file_eval stable, bump to mixed " << *lock
4831             << " on " << *lock->get_parent() << dendl;
4832     scatter_mix(lock, need_issue);
4833   }
4834   
4835   // * -> sync?
4836   else if (lock->get_state() != LOCK_SYNC &&
4837            !lock->is_wrlocked() &&   // drain wrlocks first!
4838            !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4839            !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
4840            !((lock->get_state() == LOCK_MIX) &&
4841              in->is_dir() && in->has_subtree_or_exporting_dirfrag())  // if we are a delegation point, stay where we are
4842            //((wanted & CEPH_CAP_RD) || 
4843            //in->is_replicated() || 
4844            //lock->get_num_client_lease() || 
4845            //(!loner && lock->get_state() == LOCK_EXCL)) &&
4846            ) {
4847     dout(7) << "file_eval stable, bump to sync " << *lock 
4848             << " on " << *lock->get_parent() << dendl;
4849     simple_sync(lock, need_issue);
4850   }
4851 }
4852
4853
4854
4855 void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
4856 {
4857   dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
4858
4859   CInode *in = static_cast<CInode*>(lock->get_parent());
4860   assert(in->is_auth());
4861   assert(lock->is_stable());
4862
4863   if (lock->get_state() == LOCK_LOCK) {
4864     in->start_scatter(lock);
4865     if (in->is_replicated()) {
4866       // data
4867       bufferlist softdata;
4868       lock->encode_locked_state(softdata);
4869
4870       // bcast to replicas
4871       send_lock_message(lock, LOCK_AC_MIX, softdata);
4872     }
4873
4874     // change lock
4875     lock->set_state(LOCK_MIX);
4876     lock->clear_scatter_wanted();
4877     if (lock->get_cap_shift()) {
4878       if (need_issue)
4879         *need_issue = true;
4880       else
4881         issue_caps(in);
4882     }
4883   } else {
4884     // gather?
4885     switch (lock->get_state()) {
4886     case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
4887     case LOCK_XSYN:
4888       file_excl(lock, need_issue);
4889       if (lock->get_state() != LOCK_EXCL)
4890         return;
4891       // fall-thru
4892     case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
4893     case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
4894     default: ceph_abort();
4895     }
4896
4897     int gather = 0;
4898     if (lock->is_rdlocked())
4899       gather++;
4900     if (in->is_replicated()) {
4901       if (lock->get_state() != LOCK_EXCL_MIX &&   // EXCL replica is already LOCK
4902           lock->get_state() != LOCK_XSYN_EXCL) {  // XSYN replica is already LOCK;  ** FIXME here too!
4903         send_lock_message(lock, LOCK_AC_MIX);
4904         lock->init_gather();
4905         gather++;
4906       }
4907     }
4908     if (lock->is_leased()) {
4909       revoke_client_leases(lock);
4910       gather++;
4911     }
4912     if (lock->get_cap_shift() &&
4913         in->is_head() &&
4914         in->issued_caps_need_gather(lock)) {
4915       if (need_issue)
4916         *need_issue = true;
4917       else
4918         issue_caps(in);
4919       gather++;
4920     }
4921     bool need_recover = false;
4922     if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4923       mds->mdcache->queue_file_recover(in);
4924       need_recover = true;
4925       gather++;
4926     }
4927
4928     if (gather) {
4929       lock->get_parent()->auth_pin(lock);
4930       if (need_recover)
4931         mds->mdcache->do_file_recover();
4932     } else {
4933       in->start_scatter(lock);
4934       lock->set_state(LOCK_MIX);
4935       lock->clear_scatter_wanted();
4936       if (in->is_replicated()) {
4937         bufferlist softdata;
4938         lock->encode_locked_state(softdata);
4939         send_lock_message(lock, LOCK_AC_MIX, softdata);
4940       }
4941       if (lock->get_cap_shift()) {
4942         if (need_issue)
4943           *need_issue = true;
4944         else
4945           issue_caps(in);
4946       }
4947     }
4948   }
4949 }
4950
4951
4952 void Locker::file_excl(ScatterLock *lock, bool *need_issue)
4953 {
4954   CInode *in = static_cast<CInode*>(lock->get_parent());
4955   dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;  
4956
4957   assert(in->is_auth());
4958   assert(lock->is_stable());
4959
4960   assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
4961          (lock->get_state() == LOCK_XSYN));  // must do xsyn -> excl -> <anything else>
4962   
4963   switch (lock->get_state()) {
4964   case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4965   case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
4966   case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4967   case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4968   default: ceph_abort();
4969   }
4970   int gather = 0;
4971   
4972   if (lock->is_rdlocked())
4973     gather++;
4974   if (lock->is_wrlocked())
4975     gather++;
4976
4977   if (in->is_replicated() &&
4978       lock->get_state() != LOCK_LOCK_EXCL &&
4979       lock->get_state() != LOCK_XSYN_EXCL) {  // if we were lock, replicas are already lock.
4980     send_lock_message(lock, LOCK_AC_LOCK);
4981     lock->init_gather();
4982     gather++;
4983   }
4984   if (lock->is_leased()) {
4985     revoke_client_leases(lock);
4986     gather++;
4987   }
4988   if (in->is_head() &&
4989       in->issued_caps_need_gather(lock)) {
4990     if (need_issue)
4991       *need_issue = true;
4992     else
4993       issue_caps(in);
4994     gather++;
4995   }
4996   bool need_recover = false;
4997   if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4998     mds->mdcache->queue_file_recover(in);
4999     need_recover = true;
5000     gather++;
5001   }
5002   
5003   if (gather) {
5004     lock->get_parent()->auth_pin(lock);
5005     if (need_recover)
5006       mds->mdcache->do_file_recover();
5007   } else {
5008     lock->set_state(LOCK_EXCL);
5009     if (need_issue)
5010       *need_issue = true;
5011     else
5012       issue_caps(in);
5013   }
5014 }
5015
5016 void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5017 {
5018   dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5019   CInode *in = static_cast<CInode *>(lock->get_parent());
5020   assert(in->is_auth());
5021   assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
5022
5023   switch (lock->get_state()) {
5024   case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5025   default: ceph_abort();
5026   }
5027   
5028   int gather = 0;
5029   if (lock->is_wrlocked())
5030     gather++;
5031
5032   if (in->is_head() &&
5033       in->issued_caps_need_gather(lock)) {
5034     if (need_issue)
5035       *need_issue = true;
5036     else
5037       issue_caps(in);
5038     gather++;
5039   }
5040   
5041   if (gather) {
5042     lock->get_parent()->auth_pin(lock);
5043   } else {
5044     lock->set_state(LOCK_XSYN);
5045     lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5046     if (need_issue)
5047       *need_issue = true;
5048     else
5049       issue_caps(in);
5050   }
5051 }
5052
5053 void Locker::file_recover(ScatterLock *lock)
5054 {
5055   CInode *in = static_cast<CInode *>(lock->get_parent());
5056   dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5057
5058   assert(in->is_auth());
5059   //assert(lock->is_stable());
5060   assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5061
5062   int gather = 0;
5063   
5064   /*
5065   if (in->is_replicated()
5066       lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5067     send_lock_message(lock, LOCK_AC_LOCK);
5068     lock->init_gather();
5069     gather++;
5070   }
5071   */
5072   if (in->is_head() &&
5073       in->issued_caps_need_gather(lock)) {
5074     issue_caps(in);
5075     gather++;
5076   }
5077
5078   lock->set_state(LOCK_SCAN);
5079   if (gather)
5080     in->state_set(CInode::STATE_NEEDSRECOVER);
5081   else
5082     mds->mdcache->queue_file_recover(in);
5083 }
5084
5085
5086 // messenger
5087 /* This function DOES put the passed message before returning */
5088 void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
5089 {
5090   CInode *in = static_cast<CInode*>(lock->get_parent());
5091   int from = m->get_asker();
5092
5093   if (mds->is_rejoin()) {
5094     if (in->is_rejoining()) {
5095       dout(7) << "handle_file_lock still rejoining " << *in
5096               << ", dropping " << *m << dendl;
5097       m->put();
5098       return;
5099     }
5100   }
5101
5102   dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
5103           << " on " << *lock
5104           << " from mds." << from << " " 
5105           << *in << dendl;
5106
5107   bool caps = lock->get_cap_shift();
5108   
5109   switch (m->get_action()) {
5110     // -- replica --
5111   case LOCK_AC_SYNC:
5112     assert(lock->get_state() == LOCK_LOCK ||
5113            lock->get_state() == LOCK_MIX ||
5114            lock->get_state() == LOCK_MIX_SYNC2);
5115     
5116     if (lock->get_state() == LOCK_MIX) {
5117       lock->set_state(LOCK_MIX_SYNC);
5118       eval_gather(lock, true);
5119       if (lock->is_unstable_and_locked())
5120         mds->mdlog->flush();
5121       break;
5122     }
5123
5124     (static_cast<ScatterLock *>(lock))->finish_flush();
5125     (static_cast<ScatterLock *>(lock))->clear_flushed();
5126
5127     // ok
5128     lock->decode_locked_state(m->get_data());
5129     lock->set_state(LOCK_SYNC);
5130
5131     lock->get_rdlock();
5132     if (caps)
5133       issue_caps(in);
5134     lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5135     lock->put_rdlock();
5136     break;
5137     
5138   case LOCK_AC_LOCK:
5139     switch (lock->get_state()) {
5140     case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5141     case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5142     default: ceph_abort();
5143     }
5144
5145     eval_gather(lock, true);
5146     if (lock->is_unstable_and_locked())
5147       mds->mdlog->flush();
5148
5149     break;
5150
5151   case LOCK_AC_LOCKFLUSHED:
5152     (static_cast<ScatterLock *>(lock))->finish_flush();
5153     (static_cast<ScatterLock *>(lock))->clear_flushed();
5154     // wake up scatter_nudge waiters
5155     if (lock->is_stable())
5156       lock->finish_waiters(SimpleLock::WAIT_STABLE);
5157     break;
5158     
5159   case LOCK_AC_MIX:
5160     assert(lock->get_state() == LOCK_SYNC ||
5161            lock->get_state() == LOCK_LOCK ||
5162            lock->get_state() == LOCK_SYNC_MIX2);
5163     
5164     if (lock->get_state() == LOCK_SYNC) {
5165       // MIXED
5166       lock->set_state(LOCK_SYNC_MIX);
5167       eval_gather(lock, true);
5168       if (lock->is_unstable_and_locked())
5169         mds->mdlog->flush();
5170       break;
5171     } 
5172
5173     // ok
5174     lock->set_state(LOCK_MIX);
5175     lock->decode_locked_state(m->get_data());
5176
5177     if (caps)
5178       issue_caps(in);
5179     
5180     lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5181     break;
5182
5183
5184     // -- auth --
5185   case LOCK_AC_LOCKACK:
5186     assert(lock->get_state() == LOCK_SYNC_LOCK ||
5187            lock->get_state() == LOCK_MIX_LOCK ||
5188            lock->get_state() == LOCK_MIX_LOCK2 ||
5189            lock->get_state() == LOCK_MIX_EXCL ||
5190            lock->get_state() == LOCK_SYNC_EXCL ||
5191            lock->get_state() == LOCK_SYNC_MIX ||
5192            lock->get_state() == LOCK_MIX_TSYN);
5193     assert(lock->is_gathering(from));
5194     lock->remove_gather(from);
5195     
5196     if (lock->get_state() == LOCK_MIX_LOCK ||
5197         lock->get_state() == LOCK_MIX_LOCK2 ||
5198         lock->get_state() == LOCK_MIX_EXCL ||
5199         lock->get_state() == LOCK_MIX_TSYN) {
5200       lock->decode_locked_state(m->get_data());
5201       // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5202       // delay calling scatter_writebehind().
5203       lock->clear_flushed();
5204     }
5205
5206     if (lock->is_gathering()) {
5207       dout(7) << "handle_file_lock " << *in << " from " << from
5208               << ", still gathering " << lock->get_gather_set() << dendl;
5209     } else {
5210       dout(7) << "handle_file_lock " << *in << " from " << from
5211               << ", last one" << dendl;
5212       eval_gather(lock);
5213     }
5214     break;
5215     
5216   case LOCK_AC_SYNCACK:
5217     assert(lock->get_state() == LOCK_MIX_SYNC);
5218     assert(lock->is_gathering(from));
5219     lock->remove_gather(from);
5220     
5221     lock->decode_locked_state(m->get_data());
5222
5223     if (lock->is_gathering()) {
5224       dout(7) << "handle_file_lock " << *in << " from " << from
5225               << ", still gathering " << lock->get_gather_set() << dendl;
5226     } else {
5227       dout(7) << "handle_file_lock " << *in << " from " << from
5228               << ", last one" << dendl;
5229       eval_gather(lock);
5230     }
5231     break;
5232
5233   case LOCK_AC_MIXACK:
5234     assert(lock->get_state() == LOCK_SYNC_MIX);
5235     assert(lock->is_gathering(from));
5236     lock->remove_gather(from);
5237     
5238     if (lock->is_gathering()) {
5239       dout(7) << "handle_file_lock " << *in << " from " << from
5240               << ", still gathering " << lock->get_gather_set() << dendl;
5241     } else {
5242       dout(7) << "handle_file_lock " << *in << " from " << from
5243               << ", last one" << dendl;
5244       eval_gather(lock);
5245     }
5246     break;
5247
5248
5249     // requests....
5250   case LOCK_AC_REQSCATTER:
5251     if (lock->is_stable()) {
5252       /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5253        *  because the replica should be holding an auth_pin if they're
5254        *  doing this (and thus, we are freezing, not frozen, and indefinite
5255        *  starvation isn't an issue).
5256        */
5257       dout(7) << "handle_file_lock got scatter request on " << *lock
5258               << " on " << *lock->get_parent() << dendl;
5259       if (lock->get_state() != LOCK_MIX)  // i.e., the reqscatter didn't race with an actual mix/scatter
5260         scatter_mix(lock);
5261     } else {
5262       dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5263               << " on " << *lock->get_parent() << dendl;
5264       lock->set_scatter_wanted();
5265     }
5266     break;
5267
5268   case LOCK_AC_REQUNSCATTER:
5269     if (lock->is_stable()) {
5270       /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5271        *  because the replica should be holding an auth_pin if they're
5272        *  doing this (and thus, we are freezing, not frozen, and indefinite
5273        *  starvation isn't an issue).
5274        */
5275       dout(7) << "handle_file_lock got unscatter request on " << *lock
5276               << " on " << *lock->get_parent() << dendl;
5277       if (lock->get_state() == LOCK_MIX)  // i.e., the reqscatter didn't race with an actual mix/scatter
5278         simple_lock(lock);  // FIXME tempsync?
5279     } else {
5280       dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5281               << " on " << *lock->get_parent() << dendl;
5282       lock->set_unscatter_wanted();
5283     }
5284     break;
5285
5286   case LOCK_AC_REQRDLOCK:
5287     handle_reqrdlock(lock, m);
5288     break;
5289
5290   case LOCK_AC_NUDGE:
5291     if (!lock->get_parent()->is_auth()) {
5292       dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5293               << " on " << *lock->get_parent() << dendl;
5294     } else if (!lock->get_parent()->is_replicated()) {
5295       dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5296               << " on " << *lock->get_parent() << dendl;
5297     } else {
5298       dout(7) << "handle_file_lock trying nudge on " << *lock
5299               << " on " << *lock->get_parent() << dendl;
5300       scatter_nudge(lock, 0, true);
5301       mds->mdlog->flush();
5302     }
5303     break;
5304
5305   default:
5306     ceph_abort();
5307   }  
5308   
5309   m->put();
5310 }
5311
5312
5313
5314
5315
5316