4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28 * Developed under the sponsorship of the US Government under
29 * Subcontract No. B514193
31 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32 * Use is subject to license terms.
34 * Copyright (c) 2010, 2012, Intel Corporation.
37 * This file is part of Lustre, http://www.lustre.org/
38 * Lustre is a trademark of Sun Microsystems, Inc.
42 * This file implements POSIX lock type for Lustre.
43 * Its policy properties are start and end of extent and PID.
45 * These locks are only done through MDS due to POSIX semantics requiring
46 * e.g. that locks could be only partially released and as such split into
47 * two parts, and also that two adjacent locks from the same process may be
48 * merged into a single wider lock.
50 * Lock modes are mapped like this:
51 * PR and PW for READ and WRITE locks
52 * NL to request a releasing of a portion of the lock
54 * These flock locks never timeout.
57 #define DEBUG_SUBSYSTEM S_LDLM
59 #include "../include/lustre_dlm.h"
60 #include "../include/obd_support.h"
61 #include "../include/obd_class.h"
62 #include "../include/lustre_lib.h"
63 #include <linux/list.h>
64 #include "ldlm_internal.h"
67 * list_for_remaining_safe - iterate over the remaining entries in a list
68 * and safeguard against removal of a list entry.
69 * \param pos the &struct list_head to use as a loop counter. pos MUST
70 * have been initialized prior to using it in this macro.
71 * \param n another &struct list_head to use as temporary storage
72 * \param head the head for your list.
74 #define list_for_remaining_safe(pos, n, head) \
75 for (n = pos->next; pos != (head); pos = n, n = pos->next)
78 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
80 return((new->l_policy_data.l_flock.owner ==
81 lock->l_policy_data.l_flock.owner) &&
82 (new->l_export == lock->l_export));
86 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
88 return((new->l_policy_data.l_flock.start <=
89 lock->l_policy_data.l_flock.end) &&
90 (new->l_policy_data.l_flock.end >=
91 lock->l_policy_data.l_flock.start));
95 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
97 LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
100 /* Safe to not lock here, since it should be empty anyway */
101 LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
103 list_del_init(&lock->l_res_link);
104 if (flags == LDLM_FL_WAIT_NOREPROC &&
105 !(lock->l_flags & LDLM_FL_FAILED)) {
106 /* client side - set a flag to prevent sending a CANCEL */
107 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
109 /* when reaching here, it is under lock_res_and_lock(). Thus,
110 need call the nolock version of ldlm_lock_decref_internal*/
111 ldlm_lock_decref_internal_nolock(lock, mode);
114 ldlm_lock_destroy_nolock(lock);
118 * Process a granting attempt for flock lock.
119 * Must be called under ns lock held.
121 * This function looks for any conflicts for \a lock in the granted or
122 * waiting queues. The lock is granted if no conflicts are found in
125 * It is also responsible for splitting a lock if a portion of the lock
128 * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
129 * - blocking ASTs have already been sent
131 * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
132 * - blocking ASTs have not been sent yet, so list of conflicting locks
133 * would be collected and ASTs sent.
135 static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
136 int first_enq, ldlm_error_t *err,
137 struct list_head *work_list)
139 struct ldlm_resource *res = req->l_resource;
140 struct ldlm_namespace *ns = ldlm_res_to_ns(res);
141 struct list_head *tmp;
142 struct list_head *ownlocks = NULL;
143 struct ldlm_lock *lock = NULL;
144 struct ldlm_lock *new = req;
145 struct ldlm_lock *new2 = NULL;
146 ldlm_mode_t mode = req->l_req_mode;
147 int added = (mode == LCK_NL);
150 const struct ldlm_callback_suite null_cbs = { NULL };
153 "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
154 *flags, new->l_policy_data.l_flock.owner,
155 new->l_policy_data.l_flock.pid, mode,
156 req->l_policy_data.l_flock.start,
157 req->l_policy_data.l_flock.end);
161 /* No blocking ASTs are sent to the clients for
162 * Posix file & record locks */
163 req->l_blocking_ast = NULL;
166 if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
167 /* This loop determines where this processes locks start
168 * in the resource lr_granted list. */
169 list_for_each(tmp, &res->lr_granted) {
170 lock = list_entry(tmp, struct ldlm_lock,
172 if (ldlm_same_flock_owner(lock, req)) {
178 int reprocess_failed = 0;
180 lockmode_verify(mode);
182 /* This loop determines if there are existing locks
183 * that conflict with the new lock request. */
184 list_for_each(tmp, &res->lr_granted) {
185 lock = list_entry(tmp, struct ldlm_lock,
188 if (ldlm_same_flock_owner(lock, req)) {
194 /* locks are compatible, overlap doesn't matter */
195 if (lockmode_compat(lock->l_granted_mode, mode))
198 if (!ldlm_flocks_overlap(lock, req))
202 reprocess_failed = 1;
206 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
207 ldlm_flock_destroy(req, mode, *flags);
209 return LDLM_ITER_STOP;
212 if (*flags & LDLM_FL_TEST_LOCK) {
213 ldlm_flock_destroy(req, mode, *flags);
214 req->l_req_mode = lock->l_granted_mode;
215 req->l_policy_data.l_flock.pid =
216 lock->l_policy_data.l_flock.pid;
217 req->l_policy_data.l_flock.start =
218 lock->l_policy_data.l_flock.start;
219 req->l_policy_data.l_flock.end =
220 lock->l_policy_data.l_flock.end;
221 *flags |= LDLM_FL_LOCK_CHANGED;
222 return LDLM_ITER_STOP;
225 ldlm_resource_add_lock(res, &res->lr_waiting, req);
226 *flags |= LDLM_FL_BLOCK_GRANTED;
227 return LDLM_ITER_STOP;
229 if (reprocess_failed)
230 return LDLM_ITER_CONTINUE;
233 if (*flags & LDLM_FL_TEST_LOCK) {
234 ldlm_flock_destroy(req, mode, *flags);
235 req->l_req_mode = LCK_NL;
236 *flags |= LDLM_FL_LOCK_CHANGED;
237 return LDLM_ITER_STOP;
240 /* Scan the locks owned by this process that overlap this request.
241 * We may have to merge or split existing locks. */
244 ownlocks = &res->lr_granted;
246 list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
247 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
249 if (!ldlm_same_flock_owner(lock, new))
252 if (lock->l_granted_mode == mode) {
253 /* If the modes are the same then we need to process
254 * locks that overlap OR adjoin the new lock. The extra
255 * logic condition is necessary to deal with arithmetic
256 * overflow and underflow. */
257 if ((new->l_policy_data.l_flock.start >
258 (lock->l_policy_data.l_flock.end + 1))
259 && (lock->l_policy_data.l_flock.end !=
263 if ((new->l_policy_data.l_flock.end <
264 (lock->l_policy_data.l_flock.start - 1))
265 && (lock->l_policy_data.l_flock.start != 0))
268 if (new->l_policy_data.l_flock.start <
269 lock->l_policy_data.l_flock.start) {
270 lock->l_policy_data.l_flock.start =
271 new->l_policy_data.l_flock.start;
273 new->l_policy_data.l_flock.start =
274 lock->l_policy_data.l_flock.start;
277 if (new->l_policy_data.l_flock.end >
278 lock->l_policy_data.l_flock.end) {
279 lock->l_policy_data.l_flock.end =
280 new->l_policy_data.l_flock.end;
282 new->l_policy_data.l_flock.end =
283 lock->l_policy_data.l_flock.end;
287 ldlm_flock_destroy(lock, mode, *flags);
295 if (new->l_policy_data.l_flock.start >
296 lock->l_policy_data.l_flock.end)
299 if (new->l_policy_data.l_flock.end <
300 lock->l_policy_data.l_flock.start)
305 if (new->l_policy_data.l_flock.start <=
306 lock->l_policy_data.l_flock.start) {
307 if (new->l_policy_data.l_flock.end <
308 lock->l_policy_data.l_flock.end) {
309 lock->l_policy_data.l_flock.start =
310 new->l_policy_data.l_flock.end + 1;
313 ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
316 if (new->l_policy_data.l_flock.end >=
317 lock->l_policy_data.l_flock.end) {
318 lock->l_policy_data.l_flock.end =
319 new->l_policy_data.l_flock.start - 1;
323 /* split the existing lock into two locks */
325 /* if this is an F_UNLCK operation then we could avoid
326 * allocating a new lock and use the req lock passed in
327 * with the request but this would complicate the reply
328 * processing since updates to req get reflected in the
329 * reply. The client side replays the lock request so
330 * it must see the original lock data in the reply. */
332 /* XXX - if ldlm_lock_new() can sleep we should
333 * release the lr_lock, allocate the new lock,
334 * and restart processing this lock. */
336 unlock_res_and_lock(req);
337 new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
338 lock->l_granted_mode, &null_cbs,
339 NULL, 0, LVB_T_NONE);
340 lock_res_and_lock(req);
342 ldlm_flock_destroy(req, lock->l_granted_mode,
345 return LDLM_ITER_STOP;
352 new2->l_granted_mode = lock->l_granted_mode;
353 new2->l_policy_data.l_flock.pid =
354 new->l_policy_data.l_flock.pid;
355 new2->l_policy_data.l_flock.owner =
356 new->l_policy_data.l_flock.owner;
357 new2->l_policy_data.l_flock.start =
358 lock->l_policy_data.l_flock.start;
359 new2->l_policy_data.l_flock.end =
360 new->l_policy_data.l_flock.start - 1;
361 lock->l_policy_data.l_flock.start =
362 new->l_policy_data.l_flock.end + 1;
363 new2->l_conn_export = lock->l_conn_export;
364 if (lock->l_export != NULL) {
365 new2->l_export = class_export_lock_get(lock->l_export,
367 if (new2->l_export->exp_lock_hash &&
368 hlist_unhashed(&new2->l_exp_hash))
369 cfs_hash_add(new2->l_export->exp_lock_hash,
370 &new2->l_remote_handle,
373 if (*flags == LDLM_FL_WAIT_NOREPROC)
374 ldlm_lock_addref_internal_nolock(new2,
375 lock->l_granted_mode);
377 /* insert new2 at lock */
378 ldlm_resource_add_lock(res, ownlocks, new2);
379 LDLM_LOCK_RELEASE(new2);
383 /* if new2 is created but never used, destroy it*/
384 if (splitted == 0 && new2 != NULL)
385 ldlm_lock_destroy_nolock(new2);
387 /* At this point we're granting the lock request. */
388 req->l_granted_mode = req->l_req_mode;
391 list_del_init(&req->l_res_link);
392 /* insert new lock before ownlocks in list. */
393 ldlm_resource_add_lock(res, ownlocks, req);
396 if (*flags != LDLM_FL_WAIT_NOREPROC) {
397 /* The only one possible case for client-side calls flock
398 * policy function is ldlm_flock_completion_ast inside which
399 * carries LDLM_FL_WAIT_NOREPROC flag. */
400 CERROR("Illegal parameter for client-side-only module.\n");
404 /* In case we're reprocessing the requested lock we can't destroy
405 * it until after calling ldlm_add_ast_work_item() above so that laawi()
406 * can bump the reference count on \a req. Otherwise \a req
407 * could be freed before the completion AST can be sent. */
409 ldlm_flock_destroy(req, mode, *flags);
411 ldlm_resource_dump(D_INFO, res);
412 return LDLM_ITER_CONTINUE;
415 struct ldlm_flock_wait_data {
416 struct ldlm_lock *fwd_lock;
421 ldlm_flock_interrupted_wait(void *data)
423 struct ldlm_lock *lock;
425 lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
427 lock_res_and_lock(lock);
429 /* client side - set flag to prevent lock from being put on LRU list */
430 lock->l_flags |= LDLM_FL_CBPENDING;
431 unlock_res_and_lock(lock);
435 * Flock completion callback function.
437 * \param lock [in,out]: A lock to be handled
438 * \param flags [in]: flags
439 * \param *data [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
441 * \retval 0 : success
442 * \retval <0 : failure
445 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
447 struct file_lock *getlk = lock->l_ast_data;
448 struct obd_device *obd;
449 struct obd_import *imp = NULL;
450 struct ldlm_flock_wait_data fwd;
451 struct l_wait_info lwi;
455 CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
458 /* Import invalidation. We need to actually release the lock
459 * references being held, so that it can go away. No point in
460 * holding the lock even if app still believes it has it, since
461 * server already dropped it anyway. Only for granted locks too. */
462 if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
463 (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
464 if (lock->l_req_mode == lock->l_granted_mode &&
465 lock->l_granted_mode != LCK_NL &&
467 ldlm_lock_decref_internal(lock, lock->l_req_mode);
469 /* Need to wake up the waiter if we were evicted */
470 wake_up(&lock->l_waitq);
474 LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
476 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
477 LDLM_FL_BLOCK_CONV))) {
479 /* mds granted the lock in the reply */
481 /* CP AST RPC: lock get granted, wake it up */
482 wake_up(&lock->l_waitq);
486 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
488 obd = class_exp2obd(lock->l_conn_export);
490 /* if this is a local lock, there is no import */
492 imp = obd->u.cli.cl_import;
495 spin_lock(&imp->imp_lock);
496 fwd.fwd_generation = imp->imp_generation;
497 spin_unlock(&imp->imp_lock);
500 lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
502 /* Go to sleep until the lock is granted. */
503 rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
506 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
512 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
514 if (lock->l_flags & LDLM_FL_DESTROYED) {
515 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
519 if (lock->l_flags & LDLM_FL_FAILED) {
520 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
525 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
530 LDLM_DEBUG(lock, "client-side enqueue granted");
532 lock_res_and_lock(lock);
534 /* ldlm_lock_enqueue() has already placed lock on the granted list. */
535 list_del_init(&lock->l_res_link);
537 if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
538 LDLM_DEBUG(lock, "client-side enqueue deadlock received");
540 } else if (flags & LDLM_FL_TEST_LOCK) {
541 /* fcntl(F_GETLK) request */
542 /* The old mode was saved in getlk->fl_type so that if the mode
543 * in the lock changes we can decref the appropriate refcount.*/
544 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
545 switch (lock->l_granted_mode) {
547 getlk->fl_type = F_RDLCK;
550 getlk->fl_type = F_WRLCK;
553 getlk->fl_type = F_UNLCK;
555 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
556 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
557 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
559 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
561 /* We need to reprocess the lock to do merges or splits
562 * with existing locks owned by this process. */
563 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
565 unlock_res_and_lock(lock);
568 EXPORT_SYMBOL(ldlm_flock_completion_ast);
570 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
571 ldlm_policy_data_t *lpolicy)
573 memset(lpolicy, 0, sizeof(*lpolicy));
574 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
575 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
576 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
577 /* Compat code, old clients had no idea about owner field and
578 * relied solely on pid for ownership. Introduced in LU-104, 2.1,
580 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
583 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
584 ldlm_policy_data_t *lpolicy)
586 memset(lpolicy, 0, sizeof(*lpolicy));
587 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
588 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
589 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
590 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
593 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
594 ldlm_wire_policy_data_t *wpolicy)
596 memset(wpolicy, 0, sizeof(*wpolicy));
597 wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
598 wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
599 wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
600 wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;