Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / flock.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include <errno.h>
4
5 #include "common/debug.h"
6 #include "mdstypes.h"
7 #include "mds/flock.h"
8
9 #define dout_subsys ceph_subsys_mds
10
11 static multimap<ceph_filelock, ceph_lock_state_t*> global_waiting_locks;
12
13 static void remove_global_waiting(ceph_filelock &fl, ceph_lock_state_t *lock_state)
14 {
15   for (auto p = global_waiting_locks.find(fl);
16        p != global_waiting_locks.end(); ) {
17     if (p->first != fl)
18       break;
19     if (p->second == lock_state) {
20       global_waiting_locks.erase(p);
21       break;
22     }
23     ++p;
24   }
25 }
26
27 ceph_lock_state_t::~ceph_lock_state_t()
28 {
29   if (type == CEPH_LOCK_FCNTL) {
30     for (auto p = waiting_locks.begin(); p != waiting_locks.end(); ++p) {
31       remove_global_waiting(p->second, this);
32     }
33   }
34 }
35
36 bool ceph_lock_state_t::is_waiting(const ceph_filelock &fl) const
37 {
38   multimap<uint64_t, ceph_filelock>::const_iterator p = waiting_locks.find(fl.start);
39   while (p != waiting_locks.end()) {
40     if (p->second.start > fl.start)
41       return false;
42     if (p->second.length == fl.length &&
43         ceph_filelock_owner_equal(p->second, fl))
44       return true;
45     ++p;
46   }
47   return false;
48 }
49
50 void ceph_lock_state_t::remove_waiting(const ceph_filelock& fl)
51 {
52   for (auto p = waiting_locks.find(fl.start);
53        p != waiting_locks.end(); ) {
54     if (p->second.start > fl.start)
55       break;
56     if (p->second.length == fl.length &&
57         ceph_filelock_owner_equal(p->second, fl)) {
58       if (type == CEPH_LOCK_FCNTL) {
59         remove_global_waiting(p->second, this);
60       }
61       waiting_locks.erase(p);
62       --client_waiting_lock_counts[(client_t)fl.client];
63       if (!client_waiting_lock_counts[(client_t)fl.client]) {
64         client_waiting_lock_counts.erase((client_t)fl.client);
65       }
66       break;
67     }
68     ++p;
69   }
70 }
71
72 bool ceph_lock_state_t::is_deadlock(const ceph_filelock& fl,
73                                     list<multimap<uint64_t, ceph_filelock>::iterator>&
74                                       overlapping_locks,
75                                     const ceph_filelock *first_fl, unsigned depth) const
76 {
77   ldout(cct,15) << "is_deadlock " << fl << dendl;
78
79   // only for posix lock
80   if (type != CEPH_LOCK_FCNTL)
81     return false;
82
83   // find conflict locks' owners
84   set<ceph_filelock> lock_owners;
85   for (auto p = overlapping_locks.begin();
86        p != overlapping_locks.end();
87        ++p) {
88
89     if (fl.type == CEPH_LOCK_SHARED &&
90         (*p)->second.type == CEPH_LOCK_SHARED)
91       continue;
92
93     // circle detected
94     if (first_fl && ceph_filelock_owner_equal(*first_fl, (*p)->second)) {
95       ldout(cct,15) << " detect deadlock" << dendl;
96       return true;
97     }
98
99     ceph_filelock tmp = (*p)->second;
100     tmp.start = 0;
101     tmp.length = 0;
102     tmp.type = 0;
103     lock_owners.insert(tmp);
104   }
105
106   if (depth >= MAX_DEADLK_DEPTH)
107     return false;
108
109   first_fl = first_fl ? first_fl : &fl;
110   for (auto p = lock_owners.begin();
111        p != lock_owners.end();
112        ++p) {
113     ldout(cct,15) << " conflict lock owner " << *p << dendl;
114     // if conflict lock' owner is waiting for other lock?
115     for (auto q = global_waiting_locks.lower_bound(*p);
116          q != global_waiting_locks.end();
117          ++q) {
118       if (!ceph_filelock_owner_equal(q->first, *p))
119         break;
120
121       list<multimap<uint64_t, ceph_filelock>::iterator>
122         _overlapping_locks, _self_overlapping_locks;
123       ceph_lock_state_t& state = *(q->second);
124       if (state.get_overlapping_locks(q->first, _overlapping_locks)) {
125         state.split_by_owner(q->first, _overlapping_locks, _self_overlapping_locks);
126       }
127       if (!_overlapping_locks.empty()) {
128         if (is_deadlock(q->first, _overlapping_locks, first_fl, depth + 1))
129           return true;
130       }
131     }
132   }
133   return false;
134 }
135
136 void ceph_lock_state_t::add_waiting(const ceph_filelock& fl)
137 {
138   waiting_locks.insert(pair<uint64_t, ceph_filelock>(fl.start, fl));
139   ++client_waiting_lock_counts[(client_t)fl.client];
140   if (type == CEPH_LOCK_FCNTL) {
141     global_waiting_locks.insert(pair<ceph_filelock,ceph_lock_state_t*>(fl, this));
142   }
143 }
144
145 bool ceph_lock_state_t::add_lock(ceph_filelock& new_lock,
146                                  bool wait_on_fail, bool replay,
147                                  bool *deadlock)
148 {
149   ldout(cct,15) << "add_lock " << new_lock << dendl;
150   bool ret = false;
151   list<multimap<uint64_t, ceph_filelock>::iterator>
152     overlapping_locks, self_overlapping_locks, neighbor_locks;
153
154   // first, get any overlapping locks and split them into owned-by-us and not
155   if (get_overlapping_locks(new_lock, overlapping_locks, &neighbor_locks)) {
156     ldout(cct,15) << "got overlapping lock, splitting by owner" << dendl;
157     split_by_owner(new_lock, overlapping_locks, self_overlapping_locks);
158   }
159   if (!overlapping_locks.empty()) { //overlapping locks owned by others :(
160     if (CEPH_LOCK_EXCL == new_lock.type) {
161       //can't set, we want an exclusive
162       ldout(cct,15) << "overlapping lock, and this lock is exclusive, can't set"
163               << dendl;
164       if (wait_on_fail && !replay) {
165         if (is_deadlock(new_lock, overlapping_locks))
166           *deadlock = true;
167         else
168           add_waiting(new_lock);
169       }
170     } else { //shared lock, check for any exclusive locks blocking us
171       if (contains_exclusive_lock(overlapping_locks)) { //blocked :(
172         ldout(cct,15) << " blocked by exclusive lock in overlapping_locks" << dendl;
173         if (wait_on_fail && !replay) {
174           if (is_deadlock(new_lock, overlapping_locks))
175             *deadlock = true;
176           else
177             add_waiting(new_lock);
178         }
179       } else {
180         //yay, we can insert a shared lock
181         ldout(cct,15) << "inserting shared lock" << dendl;
182         remove_waiting(new_lock);
183         adjust_locks(self_overlapping_locks, new_lock, neighbor_locks);
184         held_locks.insert(pair<uint64_t, ceph_filelock>(new_lock.start, new_lock));
185         ret = true;
186       }
187     }
188   } else { //no overlapping locks except our own
189     remove_waiting(new_lock);
190     adjust_locks(self_overlapping_locks, new_lock, neighbor_locks);
191     ldout(cct,15) << "no conflicts, inserting " << new_lock << dendl;
192     held_locks.insert(pair<uint64_t, ceph_filelock>
193                       (new_lock.start, new_lock));
194     ret = true;
195   }
196   if (ret) {
197     ++client_held_lock_counts[(client_t)new_lock.client];
198   }
199   return ret;
200 }
201
202 void ceph_lock_state_t::look_for_lock(ceph_filelock& testing_lock)
203 {
204   list<multimap<uint64_t, ceph_filelock>::iterator> overlapping_locks,
205     self_overlapping_locks;
206   if (get_overlapping_locks(testing_lock, overlapping_locks)) {
207     split_by_owner(testing_lock, overlapping_locks, self_overlapping_locks);
208   }
209   if (!overlapping_locks.empty()) { //somebody else owns overlapping lock
210     if (CEPH_LOCK_EXCL == testing_lock.type) { //any lock blocks it
211       testing_lock = (*overlapping_locks.begin())->second;
212     } else {
213       ceph_filelock *blocking_lock;
214       if ((blocking_lock = contains_exclusive_lock(overlapping_locks))) {
215         testing_lock = *blocking_lock;
216       } else { //nothing blocking!
217         testing_lock.type = CEPH_LOCK_UNLOCK;
218       }
219     }
220     return;
221   }
222   //if we get here, only our own locks block
223   testing_lock.type = CEPH_LOCK_UNLOCK;
224 }
225
226 void ceph_lock_state_t::remove_lock(ceph_filelock removal_lock,
227                  list<ceph_filelock>& activated_locks)
228 {
229   list<multimap<uint64_t, ceph_filelock>::iterator> overlapping_locks,
230     self_overlapping_locks;
231   if (get_overlapping_locks(removal_lock, overlapping_locks)) {
232     ldout(cct,15) << "splitting by owner" << dendl;
233     split_by_owner(removal_lock, overlapping_locks, self_overlapping_locks);
234   } else ldout(cct,15) << "attempt to remove lock at " << removal_lock.start
235                  << " but no locks there!" << dendl;
236   bool remove_to_end = (0 == removal_lock.length);
237   uint64_t removal_start = removal_lock.start;
238   uint64_t removal_end = removal_start + removal_lock.length - 1;
239   __s64 old_lock_client = 0;
240   ceph_filelock *old_lock;
241
242   ldout(cct,15) << "examining " << self_overlapping_locks.size()
243           << " self-overlapping locks for removal" << dendl;
244   for (list<multimap<uint64_t, ceph_filelock>::iterator>::iterator
245          iter = self_overlapping_locks.begin();
246        iter != self_overlapping_locks.end();
247        ++iter) {
248     ldout(cct,15) << "self overlapping lock " << (*iter)->second << dendl;
249     old_lock = &(*iter)->second;
250     bool old_lock_to_end = (0 == old_lock->length);
251     uint64_t old_lock_end = old_lock->start + old_lock->length - 1;
252     old_lock_client = old_lock->client;
253     if (remove_to_end) {
254       if (old_lock->start < removal_start) {
255         old_lock->length = removal_start - old_lock->start;
256       } else {
257         ldout(cct,15) << "erasing " << (*iter)->second << dendl;
258         held_locks.erase(*iter);
259         --client_held_lock_counts[old_lock_client];
260       }
261     } else if (old_lock_to_end) {
262       ceph_filelock append_lock = *old_lock;
263       append_lock.start = removal_end+1;
264       held_locks.insert(pair<uint64_t, ceph_filelock>
265                         (append_lock.start, append_lock));
266       ++client_held_lock_counts[(client_t)old_lock->client];
267       if (old_lock->start >= removal_start) {
268         ldout(cct,15) << "erasing " << (*iter)->second << dendl;
269         held_locks.erase(*iter);
270         --client_held_lock_counts[old_lock_client];
271       } else old_lock->length = removal_start - old_lock->start;
272     } else {
273       if (old_lock_end  > removal_end) {
274         ceph_filelock append_lock = *old_lock;
275         append_lock.start = removal_end + 1;
276         append_lock.length = old_lock_end - append_lock.start + 1;
277         held_locks.insert(pair<uint64_t, ceph_filelock>
278                           (append_lock.start, append_lock));
279         ++client_held_lock_counts[(client_t)old_lock->client];
280       }
281       if (old_lock->start < removal_start) {
282         old_lock->length = removal_start - old_lock->start;
283       } else {
284         ldout(cct,15) << "erasing " << (*iter)->second << dendl;
285         held_locks.erase(*iter);
286         --client_held_lock_counts[old_lock_client];
287       }
288     }
289     if (!client_held_lock_counts[old_lock_client]) {
290       client_held_lock_counts.erase(old_lock_client);
291     }
292   }
293 }
294
295 bool ceph_lock_state_t::remove_all_from (client_t client)
296 {
297   bool cleared_any = false;
298   if (client_held_lock_counts.count(client)) {
299     multimap<uint64_t, ceph_filelock>::iterator iter = held_locks.begin();
300     while (iter != held_locks.end()) {
301       if ((client_t)iter->second.client == client) {
302         held_locks.erase(iter++);
303       } else
304         ++iter;
305     }
306     client_held_lock_counts.erase(client);
307     cleared_any = true;
308   }
309
310   if (client_waiting_lock_counts.count(client)) {
311     multimap<uint64_t, ceph_filelock>::iterator iter = waiting_locks.begin();
312     while (iter != waiting_locks.end()) {
313       if ((client_t)iter->second.client != client) {
314         ++iter;
315         continue;
316       }
317       if (type == CEPH_LOCK_FCNTL) {
318         remove_global_waiting(iter->second, this);
319       }
320       waiting_locks.erase(iter++);
321     }
322     client_waiting_lock_counts.erase(client);
323   }
324   return cleared_any;
325 }
326
327 void ceph_lock_state_t::adjust_locks(list<multimap<uint64_t, ceph_filelock>::iterator> old_locks,
328                   ceph_filelock& new_lock,
329                   list<multimap<uint64_t, ceph_filelock>::iterator>
330                   neighbor_locks)
331 {
332   ldout(cct,15) << "adjust_locks" << dendl;
333   bool new_lock_to_end = (0 == new_lock.length);
334   __s64 old_lock_client = 0;
335   ceph_filelock *old_lock;
336   for (list<multimap<uint64_t, ceph_filelock>::iterator>::iterator
337          iter = old_locks.begin();
338        iter != old_locks.end();
339        ++iter) {
340     old_lock = &(*iter)->second;
341     ldout(cct,15) << "adjusting lock: " << *old_lock << dendl;
342     bool old_lock_to_end = (0 == old_lock->length);
343     uint64_t old_lock_start = old_lock->start;
344     uint64_t old_lock_end = old_lock->start + old_lock->length - 1;
345     uint64_t new_lock_start = new_lock.start;
346     uint64_t new_lock_end = new_lock.start + new_lock.length - 1;
347     old_lock_client = old_lock->client;
348     if (new_lock_to_end || old_lock_to_end) {
349       //special code path to deal with a length set at 0
350       ldout(cct,15) << "one lock extends forever" << dendl;
351       if (old_lock->type == new_lock.type) {
352         //just unify them in new lock, remove old lock
353         ldout(cct,15) << "same lock type, unifying" << dendl;
354         new_lock.start = (new_lock_start < old_lock_start) ? new_lock_start :
355           old_lock_start;
356         new_lock.length = 0;
357         held_locks.erase(*iter);
358         --client_held_lock_counts[old_lock_client];
359       } else { //not same type, have to keep any remains of old lock around
360         ldout(cct,15) << "shrinking old lock" << dendl;
361         if (new_lock_to_end) {
362           if (old_lock_start < new_lock_start) {
363             old_lock->length = new_lock_start - old_lock_start;
364           } else {
365             held_locks.erase(*iter);
366             --client_held_lock_counts[old_lock_client];
367           }
368         } else { //old lock extends past end of new lock
369           ceph_filelock appended_lock = *old_lock;
370           appended_lock.start = new_lock_end + 1;
371           held_locks.insert(pair<uint64_t, ceph_filelock>
372                             (appended_lock.start, appended_lock));
373           ++client_held_lock_counts[(client_t)old_lock->client];
374           if (old_lock_start < new_lock_start) {
375             old_lock->length = new_lock_start - old_lock_start;
376           } else {
377             held_locks.erase(*iter);
378             --client_held_lock_counts[old_lock_client];
379           }
380         }
381       }
382     } else {
383       if (old_lock->type == new_lock.type) { //just merge them!
384         ldout(cct,15) << "merging locks, they're the same type" << dendl;
385         new_lock.start = (old_lock_start < new_lock_start ) ? old_lock_start :
386           new_lock_start;
387         int new_end = (new_lock_end > old_lock_end) ? new_lock_end :
388           old_lock_end;
389         new_lock.length = new_end - new_lock.start + 1;
390         ldout(cct,15) << "erasing lock " << (*iter)->second << dendl;
391         held_locks.erase(*iter);
392         --client_held_lock_counts[old_lock_client];
393       } else { //we'll have to update sizes and maybe make new locks
394         ldout(cct,15) << "locks aren't same type, changing sizes" << dendl;
395         if (old_lock_end > new_lock_end) { //add extra lock after new_lock
396           ceph_filelock appended_lock = *old_lock;
397           appended_lock.start = new_lock_end + 1;
398           appended_lock.length = old_lock_end - appended_lock.start + 1;
399           held_locks.insert(pair<uint64_t, ceph_filelock>
400                             (appended_lock.start, appended_lock));
401           ++client_held_lock_counts[(client_t)old_lock->client];
402         }
403         if (old_lock_start < new_lock_start) {
404           old_lock->length = new_lock_start - old_lock_start;
405         } else { //old_lock starts inside new_lock, so remove it
406           //if it extended past new_lock_end it's been replaced
407           held_locks.erase(*iter);
408           --client_held_lock_counts[old_lock_client];
409         }
410       }
411     }
412     if (!client_held_lock_counts[old_lock_client]) {
413       client_held_lock_counts.erase(old_lock_client);
414     }
415   }
416
417   //make sure to coalesce neighboring locks
418   for (list<multimap<uint64_t, ceph_filelock>::iterator>::iterator
419          iter = neighbor_locks.begin();
420        iter != neighbor_locks.end();
421        ++iter) {
422     old_lock = &(*iter)->second;
423     old_lock_client = old_lock->client;
424     ldout(cct,15) << "lock to coalesce: " << *old_lock << dendl;
425     /* because if it's a neighboring lock there can't be any self-overlapping
426        locks that covered it */
427     if (old_lock->type == new_lock.type) { //merge them
428       if (0 == new_lock.length) {
429         if (old_lock->start + old_lock->length == new_lock.start) {
430           new_lock.start = old_lock->start;
431         } else ceph_abort(); /* if there's no end to new_lock, the neighbor
432                              HAS TO be to left side */
433       } else if (0 == old_lock->length) {
434         if (new_lock.start + new_lock.length == old_lock->start) {
435           new_lock.length = 0;
436         } else ceph_abort(); //same as before, but reversed
437       } else {
438         if (old_lock->start + old_lock->length == new_lock.start) {
439           new_lock.start = old_lock->start;
440           new_lock.length = old_lock->length + new_lock.length;
441         } else if (new_lock.start + new_lock.length == old_lock->start) {
442           new_lock.length = old_lock->length + new_lock.length;
443         }
444       }
445       held_locks.erase(*iter);
446       --client_held_lock_counts[old_lock_client];
447     }
448     if (!client_held_lock_counts[old_lock_client]) {
449       client_held_lock_counts.erase(old_lock_client);
450     }
451   }
452 }
453
454 multimap<uint64_t, ceph_filelock>::iterator
455 ceph_lock_state_t::get_lower_bound(uint64_t start,
456                                    multimap<uint64_t, ceph_filelock>& lock_map)
457 {
458    multimap<uint64_t, ceph_filelock>::iterator lower_bound =
459      lock_map.lower_bound(start);
460    if ((lower_bound->first != start)
461        && (start != 0)
462        && (lower_bound != lock_map.begin())) --lower_bound;
463    if (lock_map.end() == lower_bound)
464      ldout(cct,15) << "get_lower_dout(15)eturning end()" << dendl;
465    else ldout(cct,15) << "get_lower_bound returning iterator pointing to "
466                 << lower_bound->second << dendl;
467    return lower_bound;
468  }
469
470 multimap<uint64_t, ceph_filelock>::iterator
471 ceph_lock_state_t::get_last_before(uint64_t end,
472                                    multimap<uint64_t, ceph_filelock>& lock_map)
473 {
474   multimap<uint64_t, ceph_filelock>::iterator last =
475     lock_map.upper_bound(end);
476   if (last != lock_map.begin()) --last;
477   if (lock_map.end() == last)
478     ldout(cct,15) << "get_last_before returning end()" << dendl;
479   else ldout(cct,15) << "get_last_before returning iterator pointing to "
480                << last->second << dendl;
481   return last;
482 }
483
484 bool ceph_lock_state_t::share_space(
485     multimap<uint64_t, ceph_filelock>::iterator& iter,
486     uint64_t start, uint64_t end)
487 {
488   bool ret = ((iter->first >= start && iter->first <= end) ||
489               ((iter->first < start) &&
490                (((iter->first + iter->second.length - 1) >= start) ||
491                 (0 == iter->second.length))));
492   ldout(cct,15) << "share_space got start: " << start << ", end: " << end
493           << ", lock: " << iter->second << ", returning " << ret << dendl;
494   return ret;
495 }
496
497 bool ceph_lock_state_t::get_overlapping_locks(const ceph_filelock& lock,
498                            list<multimap<uint64_t,
499                                ceph_filelock>::iterator> & overlaps,
500                            list<multimap<uint64_t,
501                                ceph_filelock>::iterator> *self_neighbors)
502 {
503   ldout(cct,15) << "get_overlapping_locks" << dendl;
504   // create a lock starting one earlier and ending one later
505   // to check for neighbors
506   ceph_filelock neighbor_check_lock = lock;
507   if (neighbor_check_lock.start != 0) {
508     neighbor_check_lock.start = neighbor_check_lock.start - 1;
509     if (neighbor_check_lock.length)
510       neighbor_check_lock.length = neighbor_check_lock.length + 2;
511   } else {
512     if (neighbor_check_lock.length)
513       neighbor_check_lock.length = neighbor_check_lock.length + 1;
514   }
515   //find the last held lock starting at the point after lock
516   uint64_t endpoint = lock.start;
517   if (lock.length) {
518     endpoint += lock.length;
519   } else {
520     endpoint = uint64_t(-1); // max offset
521   }
522   multimap<uint64_t, ceph_filelock>::iterator iter =
523     get_last_before(endpoint, held_locks);
524   bool cont = iter != held_locks.end();
525   while(cont) {
526     if (share_space(iter, lock)) {
527       overlaps.push_front(iter);
528     } else if (self_neighbors &&
529                ceph_filelock_owner_equal(neighbor_check_lock, iter->second) &&
530                share_space(iter, neighbor_check_lock)) {
531       self_neighbors->push_front(iter);
532     }
533     if ((iter->first < lock.start) && (CEPH_LOCK_EXCL == iter->second.type)) {
534       //can't be any more overlapping locks or they'd interfere with this one
535       cont = false;
536     } else if (held_locks.begin() == iter) cont = false;
537     else --iter;
538   }
539   return !overlaps.empty();
540 }
541
542 bool ceph_lock_state_t::get_waiting_overlaps(const ceph_filelock& lock,
543                                              list<multimap<uint64_t,
544                                                ceph_filelock>::iterator>&
545                                                overlaps)
546 {
547   ldout(cct,15) << "get_waiting_overlaps" << dendl;
548   multimap<uint64_t, ceph_filelock>::iterator iter =
549     get_last_before(lock.start + lock.length - 1, waiting_locks);
550   bool cont = iter != waiting_locks.end();
551   while(cont) {
552     if (share_space(iter, lock)) overlaps.push_front(iter);
553     if (waiting_locks.begin() == iter) cont = false;
554     --iter;
555   }
556   return !overlaps.empty();
557 }
558
559 void ceph_lock_state_t::split_by_owner(const ceph_filelock& owner,
560                                        list<multimap<uint64_t,
561                                            ceph_filelock>::iterator>& locks,
562                                        list<multimap<uint64_t,
563                                            ceph_filelock>::iterator>&
564                                            owned_locks)
565 {
566   list<multimap<uint64_t, ceph_filelock>::iterator>::iterator
567     iter = locks.begin();
568   ldout(cct,15) << "owner lock: " << owner << dendl;
569   while (iter != locks.end()) {
570     ldout(cct,15) << "comparing to " << (*iter)->second << dendl;
571     if (ceph_filelock_owner_equal((*iter)->second, owner)) {
572       ldout(cct,15) << "success, pushing to owned_locks" << dendl;
573       owned_locks.push_back(*iter);
574       iter = locks.erase(iter);
575     } else {
576       ldout(cct,15) << "failure, something not equal in this group "
577               << (*iter)->second.client << ":" << owner.client << ","
578               << (*iter)->second.owner << ":" << owner.owner << ","
579               << (*iter)->second.pid << ":" << owner.pid << dendl;
580       ++iter;
581     }
582   }
583 }
584
585 ceph_filelock *
586 ceph_lock_state_t::contains_exclusive_lock(list<multimap<uint64_t,
587                                                ceph_filelock>::iterator>& locks)
588 {
589   for (list<multimap<uint64_t, ceph_filelock>::iterator>::iterator
590          iter = locks.begin();
591        iter != locks.end();
592        ++iter) {
593     if (CEPH_LOCK_EXCL == (*iter)->second.type) return &(*iter)->second;
594   }
595   return NULL;
596 }