Add the rt linux 4.1.3-rt3 as base
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #include "../include/lustre_dlm.h"
60 #include "../include/obd_support.h"
61 #include "../include/obd_class.h"
62 #include "../include/lustre_lib.h"
63 #include <linux/list.h>
64 #include "ldlm_internal.h"
65
66 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
67                             void *data, int flag);
68
69 /**
70  * list_for_remaining_safe - iterate over the remaining entries in a list
71  *            and safeguard against removal of a list entry.
72  * \param pos   the &struct list_head to use as a loop counter. pos MUST
73  *            have been initialized prior to using it in this macro.
74  * \param n     another &struct list_head to use as temporary storage
75  * \param head  the head for your list.
76  */
77 #define list_for_remaining_safe(pos, n, head) \
78         for (n = pos->next; pos != (head); pos = n, n = pos->next)
79
80 static inline int
81 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
82 {
83         return((new->l_policy_data.l_flock.owner ==
84                 lock->l_policy_data.l_flock.owner) &&
85                (new->l_export == lock->l_export));
86 }
87
88 static inline int
89 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
90 {
91         return((new->l_policy_data.l_flock.start <=
92                 lock->l_policy_data.l_flock.end) &&
93                (new->l_policy_data.l_flock.end >=
94                 lock->l_policy_data.l_flock.start));
95 }
96
97 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
98                                             struct ldlm_lock *lock)
99 {
100         /* For server only */
101         if (req->l_export == NULL)
102                 return;
103
104         LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
105
106         req->l_policy_data.l_flock.blocking_owner =
107                 lock->l_policy_data.l_flock.owner;
108         req->l_policy_data.l_flock.blocking_export =
109                 lock->l_export;
110         req->l_policy_data.l_flock.blocking_refs = 0;
111
112         cfs_hash_add(req->l_export->exp_flock_hash,
113                      &req->l_policy_data.l_flock.owner,
114                      &req->l_exp_flock_hash);
115 }
116
117 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
118 {
119         /* For server only */
120         if (req->l_export == NULL)
121                 return;
122
123         check_res_locked(req->l_resource);
124         if (req->l_export->exp_flock_hash != NULL &&
125             !hlist_unhashed(&req->l_exp_flock_hash))
126                 cfs_hash_del(req->l_export->exp_flock_hash,
127                              &req->l_policy_data.l_flock.owner,
128                              &req->l_exp_flock_hash);
129 }
130
131 static inline void
132 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
133 {
134         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
135                    mode, flags);
136
137         /* Safe to not lock here, since it should be empty anyway */
138         LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
139
140         list_del_init(&lock->l_res_link);
141         if (flags == LDLM_FL_WAIT_NOREPROC &&
142             !(lock->l_flags & LDLM_FL_FAILED)) {
143                 /* client side - set a flag to prevent sending a CANCEL */
144                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
145
146                 /* when reaching here, it is under lock_res_and_lock(). Thus,
147                    need call the nolock version of ldlm_lock_decref_internal*/
148                 ldlm_lock_decref_internal_nolock(lock, mode);
149         }
150
151         ldlm_lock_destroy_nolock(lock);
152 }
153
154 /**
155  * POSIX locks deadlock detection code.
156  *
157  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
158  * with, we need to iterate through all blocked POSIX locks for this
159  * export and see if there is a deadlock condition arising. (i.e. when
160  * one client holds a lock on something and want a lock on something
161  * else and at the same time another client has the opposite situation).
162  */
163 static int
164 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
165 {
166         struct obd_export *req_exp = req->l_export;
167         struct obd_export *bl_exp = bl_lock->l_export;
168         __u64 req_owner = req->l_policy_data.l_flock.owner;
169         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
170
171         /* For server only */
172         if (req_exp == NULL)
173                 return 0;
174
175         class_export_get(bl_exp);
176         while (1) {
177                 struct obd_export *bl_exp_new;
178                 struct ldlm_lock *lock = NULL;
179                 struct ldlm_flock *flock;
180
181                 if (bl_exp->exp_flock_hash != NULL)
182                         lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
183                                                &bl_owner);
184                 if (lock == NULL)
185                         break;
186
187                 LASSERT(req != lock);
188                 flock = &lock->l_policy_data.l_flock;
189                 LASSERT(flock->owner == bl_owner);
190                 bl_owner = flock->blocking_owner;
191                 bl_exp_new = class_export_get(flock->blocking_export);
192                 class_export_put(bl_exp);
193
194                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
195                 bl_exp = bl_exp_new;
196
197                 if (bl_owner == req_owner && bl_exp == req_exp) {
198                         class_export_put(bl_exp);
199                         return 1;
200                 }
201         }
202         class_export_put(bl_exp);
203
204         return 0;
205 }
206
207 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
208                                           struct list_head *work_list)
209 {
210         CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
211
212         if ((exp_connect_flags(lock->l_export) &
213                                 OBD_CONNECT_FLOCK_DEAD) == 0) {
214                 CERROR(
215                       "deadlock found, but client doesn't support flock canceliation\n");
216         } else {
217                 LASSERT(lock->l_completion_ast);
218                 LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
219                 lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
220                         LDLM_FL_FLOCK_DEADLOCK;
221                 ldlm_flock_blocking_unlink(lock);
222                 ldlm_resource_unlink_lock(lock);
223                 ldlm_add_ast_work_item(lock, NULL, work_list);
224         }
225 }
226
227 /**
228  * Process a granting attempt for flock lock.
229  * Must be called under ns lock held.
230  *
231  * This function looks for any conflicts for \a lock in the granted or
232  * waiting queues. The lock is granted if no conflicts are found in
233  * either queue.
234  *
235  * It is also responsible for splitting a lock if a portion of the lock
236  * is released.
237  *
238  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
239  *   - blocking ASTs have already been sent
240  *
241  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
242  *   - blocking ASTs have not been sent yet, so list of conflicting locks
243  *     would be collected and ASTs sent.
244  */
245 int
246 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
247                         ldlm_error_t *err, struct list_head *work_list)
248 {
249         struct ldlm_resource *res = req->l_resource;
250         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
251         struct list_head *tmp;
252         struct list_head *ownlocks = NULL;
253         struct ldlm_lock *lock = NULL;
254         struct ldlm_lock *new = req;
255         struct ldlm_lock *new2 = NULL;
256         ldlm_mode_t mode = req->l_req_mode;
257         int local = ns_is_client(ns);
258         int added = (mode == LCK_NL);
259         int overlaps = 0;
260         int splitted = 0;
261         const struct ldlm_callback_suite null_cbs = { NULL };
262
263         CDEBUG(D_DLMTRACE,
264                "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
265                *flags, new->l_policy_data.l_flock.owner,
266                new->l_policy_data.l_flock.pid, mode,
267                req->l_policy_data.l_flock.start,
268                req->l_policy_data.l_flock.end);
269
270         *err = ELDLM_OK;
271
272         if (local) {
273                 /* No blocking ASTs are sent to the clients for
274                  * Posix file & record locks */
275                 req->l_blocking_ast = NULL;
276         } else {
277                 /* Called on the server for lock cancels. */
278                 req->l_blocking_ast = ldlm_flock_blocking_ast;
279         }
280
281 reprocess:
282         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
283                 /* This loop determines where this processes locks start
284                  * in the resource lr_granted list. */
285                 list_for_each(tmp, &res->lr_granted) {
286                         lock = list_entry(tmp, struct ldlm_lock,
287                                               l_res_link);
288                         if (ldlm_same_flock_owner(lock, req)) {
289                                 ownlocks = tmp;
290                                 break;
291                         }
292                 }
293         } else {
294                 int reprocess_failed = 0;
295
296                 lockmode_verify(mode);
297
298                 /* This loop determines if there are existing locks
299                  * that conflict with the new lock request. */
300                 list_for_each(tmp, &res->lr_granted) {
301                         lock = list_entry(tmp, struct ldlm_lock,
302                                               l_res_link);
303
304                         if (ldlm_same_flock_owner(lock, req)) {
305                                 if (!ownlocks)
306                                         ownlocks = tmp;
307                                 continue;
308                         }
309
310                         /* locks are compatible, overlap doesn't matter */
311                         if (lockmode_compat(lock->l_granted_mode, mode))
312                                 continue;
313
314                         if (!ldlm_flocks_overlap(lock, req))
315                                 continue;
316
317                         if (!first_enq) {
318                                 reprocess_failed = 1;
319                                 if (ldlm_flock_deadlock(req, lock)) {
320                                         ldlm_flock_cancel_on_deadlock(req,
321                                                         work_list);
322                                         return LDLM_ITER_CONTINUE;
323                                 }
324                                 continue;
325                         }
326
327                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
328                                 ldlm_flock_destroy(req, mode, *flags);
329                                 *err = -EAGAIN;
330                                 return LDLM_ITER_STOP;
331                         }
332
333                         if (*flags & LDLM_FL_TEST_LOCK) {
334                                 ldlm_flock_destroy(req, mode, *flags);
335                                 req->l_req_mode = lock->l_granted_mode;
336                                 req->l_policy_data.l_flock.pid =
337                                         lock->l_policy_data.l_flock.pid;
338                                 req->l_policy_data.l_flock.start =
339                                         lock->l_policy_data.l_flock.start;
340                                 req->l_policy_data.l_flock.end =
341                                         lock->l_policy_data.l_flock.end;
342                                 *flags |= LDLM_FL_LOCK_CHANGED;
343                                 return LDLM_ITER_STOP;
344                         }
345
346                         /* add lock to blocking list before deadlock
347                          * check to prevent race */
348                         ldlm_flock_blocking_link(req, lock);
349
350                         if (ldlm_flock_deadlock(req, lock)) {
351                                 ldlm_flock_blocking_unlink(req);
352                                 ldlm_flock_destroy(req, mode, *flags);
353                                 *err = -EDEADLK;
354                                 return LDLM_ITER_STOP;
355                         }
356
357                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
358                         *flags |= LDLM_FL_BLOCK_GRANTED;
359                         return LDLM_ITER_STOP;
360                 }
361                 if (reprocess_failed)
362                         return LDLM_ITER_CONTINUE;
363         }
364
365         if (*flags & LDLM_FL_TEST_LOCK) {
366                 ldlm_flock_destroy(req, mode, *flags);
367                 req->l_req_mode = LCK_NL;
368                 *flags |= LDLM_FL_LOCK_CHANGED;
369                 return LDLM_ITER_STOP;
370         }
371
372         /* In case we had slept on this lock request take it off of the
373          * deadlock detection hash list. */
374         ldlm_flock_blocking_unlink(req);
375
376         /* Scan the locks owned by this process that overlap this request.
377          * We may have to merge or split existing locks. */
378
379         if (!ownlocks)
380                 ownlocks = &res->lr_granted;
381
382         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
383                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
384
385                 if (!ldlm_same_flock_owner(lock, new))
386                         break;
387
388                 if (lock->l_granted_mode == mode) {
389                         /* If the modes are the same then we need to process
390                          * locks that overlap OR adjoin the new lock. The extra
391                          * logic condition is necessary to deal with arithmetic
392                          * overflow and underflow. */
393                         if ((new->l_policy_data.l_flock.start >
394                              (lock->l_policy_data.l_flock.end + 1))
395                             && (lock->l_policy_data.l_flock.end !=
396                                 OBD_OBJECT_EOF))
397                                 continue;
398
399                         if ((new->l_policy_data.l_flock.end <
400                              (lock->l_policy_data.l_flock.start - 1))
401                             && (lock->l_policy_data.l_flock.start != 0))
402                                 break;
403
404                         if (new->l_policy_data.l_flock.start <
405                             lock->l_policy_data.l_flock.start) {
406                                 lock->l_policy_data.l_flock.start =
407                                         new->l_policy_data.l_flock.start;
408                         } else {
409                                 new->l_policy_data.l_flock.start =
410                                         lock->l_policy_data.l_flock.start;
411                         }
412
413                         if (new->l_policy_data.l_flock.end >
414                             lock->l_policy_data.l_flock.end) {
415                                 lock->l_policy_data.l_flock.end =
416                                         new->l_policy_data.l_flock.end;
417                         } else {
418                                 new->l_policy_data.l_flock.end =
419                                         lock->l_policy_data.l_flock.end;
420                         }
421
422                         if (added) {
423                                 ldlm_flock_destroy(lock, mode, *flags);
424                         } else {
425                                 new = lock;
426                                 added = 1;
427                         }
428                         continue;
429                 }
430
431                 if (new->l_policy_data.l_flock.start >
432                     lock->l_policy_data.l_flock.end)
433                         continue;
434
435                 if (new->l_policy_data.l_flock.end <
436                     lock->l_policy_data.l_flock.start)
437                         break;
438
439                 ++overlaps;
440
441                 if (new->l_policy_data.l_flock.start <=
442                     lock->l_policy_data.l_flock.start) {
443                         if (new->l_policy_data.l_flock.end <
444                             lock->l_policy_data.l_flock.end) {
445                                 lock->l_policy_data.l_flock.start =
446                                         new->l_policy_data.l_flock.end + 1;
447                                 break;
448                         }
449                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
450                         continue;
451                 }
452                 if (new->l_policy_data.l_flock.end >=
453                     lock->l_policy_data.l_flock.end) {
454                         lock->l_policy_data.l_flock.end =
455                                 new->l_policy_data.l_flock.start - 1;
456                         continue;
457                 }
458
459                 /* split the existing lock into two locks */
460
461                 /* if this is an F_UNLCK operation then we could avoid
462                  * allocating a new lock and use the req lock passed in
463                  * with the request but this would complicate the reply
464                  * processing since updates to req get reflected in the
465                  * reply. The client side replays the lock request so
466                  * it must see the original lock data in the reply. */
467
468                 /* XXX - if ldlm_lock_new() can sleep we should
469                  * release the lr_lock, allocate the new lock,
470                  * and restart processing this lock. */
471                 if (!new2) {
472                         unlock_res_and_lock(req);
473                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
474                                                 lock->l_granted_mode, &null_cbs,
475                                                 NULL, 0, LVB_T_NONE);
476                         lock_res_and_lock(req);
477                         if (!new2) {
478                                 ldlm_flock_destroy(req, lock->l_granted_mode,
479                                                    *flags);
480                                 *err = -ENOLCK;
481                                 return LDLM_ITER_STOP;
482                         }
483                         goto reprocess;
484                 }
485
486                 splitted = 1;
487
488                 new2->l_granted_mode = lock->l_granted_mode;
489                 new2->l_policy_data.l_flock.pid =
490                         new->l_policy_data.l_flock.pid;
491                 new2->l_policy_data.l_flock.owner =
492                         new->l_policy_data.l_flock.owner;
493                 new2->l_policy_data.l_flock.start =
494                         lock->l_policy_data.l_flock.start;
495                 new2->l_policy_data.l_flock.end =
496                         new->l_policy_data.l_flock.start - 1;
497                 lock->l_policy_data.l_flock.start =
498                         new->l_policy_data.l_flock.end + 1;
499                 new2->l_conn_export = lock->l_conn_export;
500                 if (lock->l_export != NULL) {
501                         new2->l_export = class_export_lock_get(lock->l_export,
502                                                                new2);
503                         if (new2->l_export->exp_lock_hash &&
504                             hlist_unhashed(&new2->l_exp_hash))
505                                 cfs_hash_add(new2->l_export->exp_lock_hash,
506                                              &new2->l_remote_handle,
507                                              &new2->l_exp_hash);
508                 }
509                 if (*flags == LDLM_FL_WAIT_NOREPROC)
510                         ldlm_lock_addref_internal_nolock(new2,
511                                                          lock->l_granted_mode);
512
513                 /* insert new2 at lock */
514                 ldlm_resource_add_lock(res, ownlocks, new2);
515                 LDLM_LOCK_RELEASE(new2);
516                 break;
517         }
518
519         /* if new2 is created but never used, destroy it*/
520         if (splitted == 0 && new2 != NULL)
521                 ldlm_lock_destroy_nolock(new2);
522
523         /* At this point we're granting the lock request. */
524         req->l_granted_mode = req->l_req_mode;
525
526         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
527         if (!added) {
528                 list_del_init(&req->l_res_link);
529                 /* insert new lock before ownlocks in list. */
530                 ldlm_resource_add_lock(res, ownlocks, req);
531         }
532
533         if (*flags != LDLM_FL_WAIT_NOREPROC) {
534                 /* The only one possible case for client-side calls flock
535                  * policy function is ldlm_flock_completion_ast inside which
536                  * carries LDLM_FL_WAIT_NOREPROC flag. */
537                 CERROR("Illegal parameter for client-side-only module.\n");
538                 LBUG();
539         }
540
541         /* In case we're reprocessing the requested lock we can't destroy
542          * it until after calling ldlm_add_ast_work_item() above so that laawi()
543          * can bump the reference count on \a req. Otherwise \a req
544          * could be freed before the completion AST can be sent.  */
545         if (added)
546                 ldlm_flock_destroy(req, mode, *flags);
547
548         ldlm_resource_dump(D_INFO, res);
549         return LDLM_ITER_CONTINUE;
550 }
551
552 struct ldlm_flock_wait_data {
553         struct ldlm_lock *fwd_lock;
554         int            fwd_generation;
555 };
556
557 static void
558 ldlm_flock_interrupted_wait(void *data)
559 {
560         struct ldlm_lock *lock;
561
562         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
563
564         /* take lock off the deadlock detection hash list. */
565         lock_res_and_lock(lock);
566         ldlm_flock_blocking_unlink(lock);
567
568         /* client side - set flag to prevent lock from being put on LRU list */
569         lock->l_flags |= LDLM_FL_CBPENDING;
570         unlock_res_and_lock(lock);
571 }
572
573 /**
574  * Flock completion callback function.
575  *
576  * \param lock [in,out]: A lock to be handled
577  * \param flags    [in]: flags
578  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
579  *
580  * \retval 0    : success
581  * \retval <0   : failure
582  */
583 int
584 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
585 {
586         struct file_lock                *getlk = lock->l_ast_data;
587         struct obd_device             *obd;
588         struct obd_import             *imp = NULL;
589         struct ldlm_flock_wait_data     fwd;
590         struct l_wait_info            lwi;
591         ldlm_error_t                err;
592         int                          rc = 0;
593
594         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
595                flags, data, getlk);
596
597         /* Import invalidation. We need to actually release the lock
598          * references being held, so that it can go away. No point in
599          * holding the lock even if app still believes it has it, since
600          * server already dropped it anyway. Only for granted locks too. */
601         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
602             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
603                 if (lock->l_req_mode == lock->l_granted_mode &&
604                     lock->l_granted_mode != LCK_NL &&
605                     NULL == data)
606                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
607
608                 /* Need to wake up the waiter if we were evicted */
609                 wake_up(&lock->l_waitq);
610                 return 0;
611         }
612
613         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
614
615         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
616                        LDLM_FL_BLOCK_CONV))) {
617                 if (NULL == data)
618                         /* mds granted the lock in the reply */
619                         goto granted;
620                 /* CP AST RPC: lock get granted, wake it up */
621                 wake_up(&lock->l_waitq);
622                 return 0;
623         }
624
625         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
626         fwd.fwd_lock = lock;
627         obd = class_exp2obd(lock->l_conn_export);
628
629         /* if this is a local lock, there is no import */
630         if (NULL != obd)
631                 imp = obd->u.cli.cl_import;
632
633         if (NULL != imp) {
634                 spin_lock(&imp->imp_lock);
635                 fwd.fwd_generation = imp->imp_generation;
636                 spin_unlock(&imp->imp_lock);
637         }
638
639         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
640
641         /* Go to sleep until the lock is granted. */
642         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
643
644         if (rc) {
645                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
646                            rc);
647                 return rc;
648         }
649
650 granted:
651         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
652
653         if (lock->l_flags & LDLM_FL_DESTROYED) {
654                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
655                 return 0;
656         }
657
658         if (lock->l_flags & LDLM_FL_FAILED) {
659                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
660                 return -EIO;
661         }
662
663         if (rc) {
664                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
665                            rc);
666                 return rc;
667         }
668
669         LDLM_DEBUG(lock, "client-side enqueue granted");
670
671         lock_res_and_lock(lock);
672
673         /* take lock off the deadlock detection hash list. */
674         ldlm_flock_blocking_unlink(lock);
675
676         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
677         list_del_init(&lock->l_res_link);
678
679         if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
680                 LDLM_DEBUG(lock, "client-side enqueue deadlock received");
681                 rc = -EDEADLK;
682         } else if (flags & LDLM_FL_TEST_LOCK) {
683                 /* fcntl(F_GETLK) request */
684                 /* The old mode was saved in getlk->fl_type so that if the mode
685                  * in the lock changes we can decref the appropriate refcount.*/
686                 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
687                 switch (lock->l_granted_mode) {
688                 case LCK_PR:
689                         getlk->fl_type = F_RDLCK;
690                         break;
691                 case LCK_PW:
692                         getlk->fl_type = F_WRLCK;
693                         break;
694                 default:
695                         getlk->fl_type = F_UNLCK;
696                 }
697                 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
698                 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
699                 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
700         } else {
701                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
702
703                 /* We need to reprocess the lock to do merges or splits
704                  * with existing locks owned by this process. */
705                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
706         }
707         unlock_res_and_lock(lock);
708         return rc;
709 }
710 EXPORT_SYMBOL(ldlm_flock_completion_ast);
711
712 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
713                             void *data, int flag)
714 {
715         LASSERT(lock);
716         LASSERT(flag == LDLM_CB_CANCELING);
717
718         /* take lock off the deadlock detection hash list. */
719         lock_res_and_lock(lock);
720         ldlm_flock_blocking_unlink(lock);
721         unlock_res_and_lock(lock);
722         return 0;
723 }
724
725 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
726                                        ldlm_policy_data_t *lpolicy)
727 {
728         memset(lpolicy, 0, sizeof(*lpolicy));
729         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
730         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
731         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
732         /* Compat code, old clients had no idea about owner field and
733          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
734          * April 2011 */
735         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
736 }
737
738
739 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
740                                        ldlm_policy_data_t *lpolicy)
741 {
742         memset(lpolicy, 0, sizeof(*lpolicy));
743         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
744         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
745         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
746         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
747 }
748
749 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
750                                      ldlm_wire_policy_data_t *wpolicy)
751 {
752         memset(wpolicy, 0, sizeof(*wpolicy));
753         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
754         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
755         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
756         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
757 }
758
759 /*
760  * Export handle<->flock hash operations.
761  */
762 static unsigned
763 ldlm_export_flock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
764 {
765         return cfs_hash_u64_hash(*(__u64 *)key, mask);
766 }
767
768 static void *
769 ldlm_export_flock_key(struct hlist_node *hnode)
770 {
771         struct ldlm_lock *lock;
772
773         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
774         return &lock->l_policy_data.l_flock.owner;
775 }
776
777 static int
778 ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
779 {
780         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
781 }
782
783 static void *
784 ldlm_export_flock_object(struct hlist_node *hnode)
785 {
786         return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
787 }
788
789 static void
790 ldlm_export_flock_get(struct cfs_hash *hs, struct hlist_node *hnode)
791 {
792         struct ldlm_lock *lock;
793         struct ldlm_flock *flock;
794
795         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
796         LDLM_LOCK_GET(lock);
797
798         flock = &lock->l_policy_data.l_flock;
799         LASSERT(flock->blocking_export != NULL);
800         class_export_get(flock->blocking_export);
801         flock->blocking_refs++;
802 }
803
804 static void
805 ldlm_export_flock_put(struct cfs_hash *hs, struct hlist_node *hnode)
806 {
807         struct ldlm_lock *lock;
808         struct ldlm_flock *flock;
809
810         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
811         LDLM_LOCK_RELEASE(lock);
812
813         flock = &lock->l_policy_data.l_flock;
814         LASSERT(flock->blocking_export != NULL);
815         class_export_put(flock->blocking_export);
816         if (--flock->blocking_refs == 0) {
817                 flock->blocking_owner = 0;
818                 flock->blocking_export = NULL;
819         }
820 }
821
822 static cfs_hash_ops_t ldlm_export_flock_ops = {
823         .hs_hash        = ldlm_export_flock_hash,
824         .hs_key  = ldlm_export_flock_key,
825         .hs_keycmp      = ldlm_export_flock_keycmp,
826         .hs_object      = ldlm_export_flock_object,
827         .hs_get  = ldlm_export_flock_get,
828         .hs_put  = ldlm_export_flock_put,
829         .hs_put_locked  = ldlm_export_flock_put,
830 };
831
832 int ldlm_init_flock_export(struct obd_export *exp)
833 {
834         if (strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
835                 return 0;
836
837         exp->exp_flock_hash =
838                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
839                                 HASH_EXP_LOCK_CUR_BITS,
840                                 HASH_EXP_LOCK_MAX_BITS,
841                                 HASH_EXP_LOCK_BKT_BITS, 0,
842                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
843                                 &ldlm_export_flock_ops,
844                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
845         if (!exp->exp_flock_hash)
846                 return -ENOMEM;
847
848         return 0;
849 }
850 EXPORT_SYMBOL(ldlm_init_flock_export);
851
852 void ldlm_destroy_flock_export(struct obd_export *exp)
853 {
854         if (exp->exp_flock_hash) {
855                 cfs_hash_putref(exp->exp_flock_hash);
856                 exp->exp_flock_hash = NULL;
857         }
858 }
859 EXPORT_SYMBOL(ldlm_destroy_flock_export);