These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #include "../include/lustre_dlm.h"
60 #include "../include/obd_support.h"
61 #include "../include/obd_class.h"
62 #include "../include/lustre_lib.h"
63 #include <linux/list.h>
64 #include "ldlm_internal.h"
65
66 /**
67  * list_for_remaining_safe - iterate over the remaining entries in a list
68  *            and safeguard against removal of a list entry.
69  * \param pos   the &struct list_head to use as a loop counter. pos MUST
70  *            have been initialized prior to using it in this macro.
71  * \param n     another &struct list_head to use as temporary storage
72  * \param head  the head for your list.
73  */
74 #define list_for_remaining_safe(pos, n, head) \
75         for (n = pos->next; pos != (head); pos = n, n = pos->next)
76
77 static inline int
78 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
79 {
80         return((new->l_policy_data.l_flock.owner ==
81                 lock->l_policy_data.l_flock.owner) &&
82                (new->l_export == lock->l_export));
83 }
84
85 static inline int
86 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
87 {
88         return((new->l_policy_data.l_flock.start <=
89                 lock->l_policy_data.l_flock.end) &&
90                (new->l_policy_data.l_flock.end >=
91                 lock->l_policy_data.l_flock.start));
92 }
93
94 static inline void
95 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
96 {
97         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
98                    mode, flags);
99
100         /* Safe to not lock here, since it should be empty anyway */
101         LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
102
103         list_del_init(&lock->l_res_link);
104         if (flags == LDLM_FL_WAIT_NOREPROC &&
105             !(lock->l_flags & LDLM_FL_FAILED)) {
106                 /* client side - set a flag to prevent sending a CANCEL */
107                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
108
109                 /* when reaching here, it is under lock_res_and_lock(). Thus,
110                    need call the nolock version of ldlm_lock_decref_internal*/
111                 ldlm_lock_decref_internal_nolock(lock, mode);
112         }
113
114         ldlm_lock_destroy_nolock(lock);
115 }
116
117 /**
118  * Process a granting attempt for flock lock.
119  * Must be called under ns lock held.
120  *
121  * This function looks for any conflicts for \a lock in the granted or
122  * waiting queues. The lock is granted if no conflicts are found in
123  * either queue.
124  *
125  * It is also responsible for splitting a lock if a portion of the lock
126  * is released.
127  *
128  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
129  *   - blocking ASTs have already been sent
130  *
131  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
132  *   - blocking ASTs have not been sent yet, so list of conflicting locks
133  *     would be collected and ASTs sent.
134  */
135 static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
136                                    int first_enq, ldlm_error_t *err,
137                                    struct list_head *work_list)
138 {
139         struct ldlm_resource *res = req->l_resource;
140         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
141         struct list_head *tmp;
142         struct list_head *ownlocks = NULL;
143         struct ldlm_lock *lock = NULL;
144         struct ldlm_lock *new = req;
145         struct ldlm_lock *new2 = NULL;
146         ldlm_mode_t mode = req->l_req_mode;
147         int added = (mode == LCK_NL);
148         int overlaps = 0;
149         int splitted = 0;
150         const struct ldlm_callback_suite null_cbs = { NULL };
151
152         CDEBUG(D_DLMTRACE,
153                "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
154                *flags, new->l_policy_data.l_flock.owner,
155                new->l_policy_data.l_flock.pid, mode,
156                req->l_policy_data.l_flock.start,
157                req->l_policy_data.l_flock.end);
158
159         *err = ELDLM_OK;
160
161         /* No blocking ASTs are sent to the clients for
162          * Posix file & record locks */
163         req->l_blocking_ast = NULL;
164
165 reprocess:
166         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
167                 /* This loop determines where this processes locks start
168                  * in the resource lr_granted list. */
169                 list_for_each(tmp, &res->lr_granted) {
170                         lock = list_entry(tmp, struct ldlm_lock,
171                                               l_res_link);
172                         if (ldlm_same_flock_owner(lock, req)) {
173                                 ownlocks = tmp;
174                                 break;
175                         }
176                 }
177         } else {
178                 int reprocess_failed = 0;
179
180                 lockmode_verify(mode);
181
182                 /* This loop determines if there are existing locks
183                  * that conflict with the new lock request. */
184                 list_for_each(tmp, &res->lr_granted) {
185                         lock = list_entry(tmp, struct ldlm_lock,
186                                               l_res_link);
187
188                         if (ldlm_same_flock_owner(lock, req)) {
189                                 if (!ownlocks)
190                                         ownlocks = tmp;
191                                 continue;
192                         }
193
194                         /* locks are compatible, overlap doesn't matter */
195                         if (lockmode_compat(lock->l_granted_mode, mode))
196                                 continue;
197
198                         if (!ldlm_flocks_overlap(lock, req))
199                                 continue;
200
201                         if (!first_enq) {
202                                 reprocess_failed = 1;
203                                 continue;
204                         }
205
206                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
207                                 ldlm_flock_destroy(req, mode, *flags);
208                                 *err = -EAGAIN;
209                                 return LDLM_ITER_STOP;
210                         }
211
212                         if (*flags & LDLM_FL_TEST_LOCK) {
213                                 ldlm_flock_destroy(req, mode, *flags);
214                                 req->l_req_mode = lock->l_granted_mode;
215                                 req->l_policy_data.l_flock.pid =
216                                         lock->l_policy_data.l_flock.pid;
217                                 req->l_policy_data.l_flock.start =
218                                         lock->l_policy_data.l_flock.start;
219                                 req->l_policy_data.l_flock.end =
220                                         lock->l_policy_data.l_flock.end;
221                                 *flags |= LDLM_FL_LOCK_CHANGED;
222                                 return LDLM_ITER_STOP;
223                         }
224
225                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
226                         *flags |= LDLM_FL_BLOCK_GRANTED;
227                         return LDLM_ITER_STOP;
228                 }
229                 if (reprocess_failed)
230                         return LDLM_ITER_CONTINUE;
231         }
232
233         if (*flags & LDLM_FL_TEST_LOCK) {
234                 ldlm_flock_destroy(req, mode, *flags);
235                 req->l_req_mode = LCK_NL;
236                 *flags |= LDLM_FL_LOCK_CHANGED;
237                 return LDLM_ITER_STOP;
238         }
239
240         /* Scan the locks owned by this process that overlap this request.
241          * We may have to merge or split existing locks. */
242
243         if (!ownlocks)
244                 ownlocks = &res->lr_granted;
245
246         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
247                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
248
249                 if (!ldlm_same_flock_owner(lock, new))
250                         break;
251
252                 if (lock->l_granted_mode == mode) {
253                         /* If the modes are the same then we need to process
254                          * locks that overlap OR adjoin the new lock. The extra
255                          * logic condition is necessary to deal with arithmetic
256                          * overflow and underflow. */
257                         if ((new->l_policy_data.l_flock.start >
258                              (lock->l_policy_data.l_flock.end + 1))
259                             && (lock->l_policy_data.l_flock.end !=
260                                 OBD_OBJECT_EOF))
261                                 continue;
262
263                         if ((new->l_policy_data.l_flock.end <
264                              (lock->l_policy_data.l_flock.start - 1))
265                             && (lock->l_policy_data.l_flock.start != 0))
266                                 break;
267
268                         if (new->l_policy_data.l_flock.start <
269                             lock->l_policy_data.l_flock.start) {
270                                 lock->l_policy_data.l_flock.start =
271                                         new->l_policy_data.l_flock.start;
272                         } else {
273                                 new->l_policy_data.l_flock.start =
274                                         lock->l_policy_data.l_flock.start;
275                         }
276
277                         if (new->l_policy_data.l_flock.end >
278                             lock->l_policy_data.l_flock.end) {
279                                 lock->l_policy_data.l_flock.end =
280                                         new->l_policy_data.l_flock.end;
281                         } else {
282                                 new->l_policy_data.l_flock.end =
283                                         lock->l_policy_data.l_flock.end;
284                         }
285
286                         if (added) {
287                                 ldlm_flock_destroy(lock, mode, *flags);
288                         } else {
289                                 new = lock;
290                                 added = 1;
291                         }
292                         continue;
293                 }
294
295                 if (new->l_policy_data.l_flock.start >
296                     lock->l_policy_data.l_flock.end)
297                         continue;
298
299                 if (new->l_policy_data.l_flock.end <
300                     lock->l_policy_data.l_flock.start)
301                         break;
302
303                 ++overlaps;
304
305                 if (new->l_policy_data.l_flock.start <=
306                     lock->l_policy_data.l_flock.start) {
307                         if (new->l_policy_data.l_flock.end <
308                             lock->l_policy_data.l_flock.end) {
309                                 lock->l_policy_data.l_flock.start =
310                                         new->l_policy_data.l_flock.end + 1;
311                                 break;
312                         }
313                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
314                         continue;
315                 }
316                 if (new->l_policy_data.l_flock.end >=
317                     lock->l_policy_data.l_flock.end) {
318                         lock->l_policy_data.l_flock.end =
319                                 new->l_policy_data.l_flock.start - 1;
320                         continue;
321                 }
322
323                 /* split the existing lock into two locks */
324
325                 /* if this is an F_UNLCK operation then we could avoid
326                  * allocating a new lock and use the req lock passed in
327                  * with the request but this would complicate the reply
328                  * processing since updates to req get reflected in the
329                  * reply. The client side replays the lock request so
330                  * it must see the original lock data in the reply. */
331
332                 /* XXX - if ldlm_lock_new() can sleep we should
333                  * release the lr_lock, allocate the new lock,
334                  * and restart processing this lock. */
335                 if (!new2) {
336                         unlock_res_and_lock(req);
337                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
338                                                 lock->l_granted_mode, &null_cbs,
339                                                 NULL, 0, LVB_T_NONE);
340                         lock_res_and_lock(req);
341                         if (!new2) {
342                                 ldlm_flock_destroy(req, lock->l_granted_mode,
343                                                    *flags);
344                                 *err = -ENOLCK;
345                                 return LDLM_ITER_STOP;
346                         }
347                         goto reprocess;
348                 }
349
350                 splitted = 1;
351
352                 new2->l_granted_mode = lock->l_granted_mode;
353                 new2->l_policy_data.l_flock.pid =
354                         new->l_policy_data.l_flock.pid;
355                 new2->l_policy_data.l_flock.owner =
356                         new->l_policy_data.l_flock.owner;
357                 new2->l_policy_data.l_flock.start =
358                         lock->l_policy_data.l_flock.start;
359                 new2->l_policy_data.l_flock.end =
360                         new->l_policy_data.l_flock.start - 1;
361                 lock->l_policy_data.l_flock.start =
362                         new->l_policy_data.l_flock.end + 1;
363                 new2->l_conn_export = lock->l_conn_export;
364                 if (lock->l_export != NULL) {
365                         new2->l_export = class_export_lock_get(lock->l_export,
366                                                                new2);
367                         if (new2->l_export->exp_lock_hash &&
368                             hlist_unhashed(&new2->l_exp_hash))
369                                 cfs_hash_add(new2->l_export->exp_lock_hash,
370                                              &new2->l_remote_handle,
371                                              &new2->l_exp_hash);
372                 }
373                 if (*flags == LDLM_FL_WAIT_NOREPROC)
374                         ldlm_lock_addref_internal_nolock(new2,
375                                                          lock->l_granted_mode);
376
377                 /* insert new2 at lock */
378                 ldlm_resource_add_lock(res, ownlocks, new2);
379                 LDLM_LOCK_RELEASE(new2);
380                 break;
381         }
382
383         /* if new2 is created but never used, destroy it*/
384         if (splitted == 0 && new2 != NULL)
385                 ldlm_lock_destroy_nolock(new2);
386
387         /* At this point we're granting the lock request. */
388         req->l_granted_mode = req->l_req_mode;
389
390         if (!added) {
391                 list_del_init(&req->l_res_link);
392                 /* insert new lock before ownlocks in list. */
393                 ldlm_resource_add_lock(res, ownlocks, req);
394         }
395
396         if (*flags != LDLM_FL_WAIT_NOREPROC) {
397                 /* The only one possible case for client-side calls flock
398                  * policy function is ldlm_flock_completion_ast inside which
399                  * carries LDLM_FL_WAIT_NOREPROC flag. */
400                 CERROR("Illegal parameter for client-side-only module.\n");
401                 LBUG();
402         }
403
404         /* In case we're reprocessing the requested lock we can't destroy
405          * it until after calling ldlm_add_ast_work_item() above so that laawi()
406          * can bump the reference count on \a req. Otherwise \a req
407          * could be freed before the completion AST can be sent.  */
408         if (added)
409                 ldlm_flock_destroy(req, mode, *flags);
410
411         ldlm_resource_dump(D_INFO, res);
412         return LDLM_ITER_CONTINUE;
413 }
414
415 struct ldlm_flock_wait_data {
416         struct ldlm_lock *fwd_lock;
417         int            fwd_generation;
418 };
419
420 static void
421 ldlm_flock_interrupted_wait(void *data)
422 {
423         struct ldlm_lock *lock;
424
425         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
426
427         lock_res_and_lock(lock);
428
429         /* client side - set flag to prevent lock from being put on LRU list */
430         lock->l_flags |= LDLM_FL_CBPENDING;
431         unlock_res_and_lock(lock);
432 }
433
434 /**
435  * Flock completion callback function.
436  *
437  * \param lock [in,out]: A lock to be handled
438  * \param flags    [in]: flags
439  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
440  *
441  * \retval 0    : success
442  * \retval <0   : failure
443  */
444 int
445 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
446 {
447         struct file_lock                *getlk = lock->l_ast_data;
448         struct obd_device             *obd;
449         struct obd_import             *imp = NULL;
450         struct ldlm_flock_wait_data     fwd;
451         struct l_wait_info            lwi;
452         ldlm_error_t                err;
453         int                          rc = 0;
454
455         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
456                flags, data, getlk);
457
458         /* Import invalidation. We need to actually release the lock
459          * references being held, so that it can go away. No point in
460          * holding the lock even if app still believes it has it, since
461          * server already dropped it anyway. Only for granted locks too. */
462         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
463             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
464                 if (lock->l_req_mode == lock->l_granted_mode &&
465                     lock->l_granted_mode != LCK_NL &&
466                     data == NULL)
467                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
468
469                 /* Need to wake up the waiter if we were evicted */
470                 wake_up(&lock->l_waitq);
471                 return 0;
472         }
473
474         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
475
476         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
477                        LDLM_FL_BLOCK_CONV))) {
478                 if (data == NULL)
479                         /* mds granted the lock in the reply */
480                         goto granted;
481                 /* CP AST RPC: lock get granted, wake it up */
482                 wake_up(&lock->l_waitq);
483                 return 0;
484         }
485
486         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
487         fwd.fwd_lock = lock;
488         obd = class_exp2obd(lock->l_conn_export);
489
490         /* if this is a local lock, there is no import */
491         if (obd != NULL)
492                 imp = obd->u.cli.cl_import;
493
494         if (imp != NULL) {
495                 spin_lock(&imp->imp_lock);
496                 fwd.fwd_generation = imp->imp_generation;
497                 spin_unlock(&imp->imp_lock);
498         }
499
500         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
501
502         /* Go to sleep until the lock is granted. */
503         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
504
505         if (rc) {
506                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
507                            rc);
508                 return rc;
509         }
510
511 granted:
512         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
513
514         if (lock->l_flags & LDLM_FL_DESTROYED) {
515                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
516                 return 0;
517         }
518
519         if (lock->l_flags & LDLM_FL_FAILED) {
520                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
521                 return -EIO;
522         }
523
524         if (rc) {
525                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
526                            rc);
527                 return rc;
528         }
529
530         LDLM_DEBUG(lock, "client-side enqueue granted");
531
532         lock_res_and_lock(lock);
533
534         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
535         list_del_init(&lock->l_res_link);
536
537         if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
538                 LDLM_DEBUG(lock, "client-side enqueue deadlock received");
539                 rc = -EDEADLK;
540         } else if (flags & LDLM_FL_TEST_LOCK) {
541                 /* fcntl(F_GETLK) request */
542                 /* The old mode was saved in getlk->fl_type so that if the mode
543                  * in the lock changes we can decref the appropriate refcount.*/
544                 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
545                 switch (lock->l_granted_mode) {
546                 case LCK_PR:
547                         getlk->fl_type = F_RDLCK;
548                         break;
549                 case LCK_PW:
550                         getlk->fl_type = F_WRLCK;
551                         break;
552                 default:
553                         getlk->fl_type = F_UNLCK;
554                 }
555                 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
556                 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
557                 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
558         } else {
559                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
560
561                 /* We need to reprocess the lock to do merges or splits
562                  * with existing locks owned by this process. */
563                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
564         }
565         unlock_res_and_lock(lock);
566         return rc;
567 }
568 EXPORT_SYMBOL(ldlm_flock_completion_ast);
569
570 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
571                                        ldlm_policy_data_t *lpolicy)
572 {
573         memset(lpolicy, 0, sizeof(*lpolicy));
574         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
575         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
576         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
577         /* Compat code, old clients had no idea about owner field and
578          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
579          * April 2011 */
580         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
581 }
582
583 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
584                                        ldlm_policy_data_t *lpolicy)
585 {
586         memset(lpolicy, 0, sizeof(*lpolicy));
587         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
588         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
589         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
590         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
591 }
592
593 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
594                                      ldlm_wire_policy_data_t *wpolicy)
595 {
596         memset(wpolicy, 0, sizeof(*wpolicy));
597         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
598         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
599         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
600         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
601 }