Add the rt linux 4.1.3-rt3 as base
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / lov / lovsub_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_lock for LOVSUB layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_LOV
42
43 #include "lov_cl_internal.h"
44
45 /** \addtogroup lov
46  *  @{
47  */
48
49 /*****************************************************************************
50  *
51  * Lovsub lock operations.
52  *
53  */
54
55 static void lovsub_lock_fini(const struct lu_env *env,
56                              struct cl_lock_slice *slice)
57 {
58         struct lovsub_lock   *lsl;
59
60         lsl = cl2lovsub_lock(slice);
61         LASSERT(list_empty(&lsl->lss_parents));
62         OBD_SLAB_FREE_PTR(lsl, lovsub_lock_kmem);
63 }
64
65 static void lovsub_parent_lock(const struct lu_env *env, struct lov_lock *lov)
66 {
67         struct cl_lock *parent;
68
69         parent = lov->lls_cl.cls_lock;
70         cl_lock_get(parent);
71         lu_ref_add(&parent->cll_reference, "lovsub-parent", current);
72         cl_lock_mutex_get(env, parent);
73 }
74
75 static void lovsub_parent_unlock(const struct lu_env *env, struct lov_lock *lov)
76 {
77         struct cl_lock *parent;
78
79         parent = lov->lls_cl.cls_lock;
80         cl_lock_mutex_put(env, lov->lls_cl.cls_lock);
81         lu_ref_del(&parent->cll_reference, "lovsub-parent", current);
82         cl_lock_put(env, parent);
83 }
84
85 /**
86  * Implements cl_lock_operations::clo_state() method for lovsub layer, which
87  * method is called whenever sub-lock state changes. Propagates state change
88  * to the top-locks.
89  */
90 static void lovsub_lock_state(const struct lu_env *env,
91                               const struct cl_lock_slice *slice,
92                               enum cl_lock_state state)
93 {
94         struct lovsub_lock   *sub = cl2lovsub_lock(slice);
95         struct lov_lock_link *scan;
96
97         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
98
99         list_for_each_entry(scan, &sub->lss_parents, lll_list) {
100                 struct lov_lock *lov    = scan->lll_super;
101                 struct cl_lock  *parent = lov->lls_cl.cls_lock;
102
103                 if (sub->lss_active != parent) {
104                         lovsub_parent_lock(env, lov);
105                         cl_lock_signal(env, parent);
106                         lovsub_parent_unlock(env, lov);
107                 }
108         }
109 }
110
111 /**
112  * Implementation of cl_lock_operation::clo_weigh() estimating lock weight by
113  * asking parent lock.
114  */
115 static unsigned long lovsub_lock_weigh(const struct lu_env *env,
116                                        const struct cl_lock_slice *slice)
117 {
118         struct lovsub_lock *lock = cl2lovsub_lock(slice);
119         struct lov_lock    *lov;
120         unsigned long       dumbbell;
121
122         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
123
124         if (!list_empty(&lock->lss_parents)) {
125                 /*
126                  * It is not clear whether all parents have to be asked and
127                  * their estimations summed, or it is enough to ask one. For
128                  * the current usages, one is always enough.
129                  */
130                 lov = container_of(lock->lss_parents.next,
131                                    struct lov_lock_link, lll_list)->lll_super;
132
133                 lovsub_parent_lock(env, lov);
134                 dumbbell = cl_lock_weigh(env, lov->lls_cl.cls_lock);
135                 lovsub_parent_unlock(env, lov);
136         } else
137                 dumbbell = 0;
138
139         return dumbbell;
140 }
141
142 /**
143  * Maps start/end offsets within a stripe, to offsets within a file.
144  */
145 static void lovsub_lock_descr_map(const struct cl_lock_descr *in,
146                                   struct lov_object *lov,
147                                   int stripe, struct cl_lock_descr *out)
148 {
149         pgoff_t size; /* stripe size in pages */
150         pgoff_t skip; /* how many pages in every stripe are occupied by
151                        * "other" stripes */
152         pgoff_t start;
153         pgoff_t end;
154
155         start = in->cld_start;
156         end   = in->cld_end;
157
158         if (lov->lo_lsm->lsm_stripe_count > 1) {
159                 size = cl_index(lov2cl(lov), lov->lo_lsm->lsm_stripe_size);
160                 skip = (lov->lo_lsm->lsm_stripe_count - 1) * size;
161
162                 /* XXX overflow check here? */
163                 start += start/size * skip + stripe * size;
164
165                 if (end != CL_PAGE_EOF) {
166                         end += end/size * skip + stripe * size;
167                         /*
168                          * And check for overflow...
169                          */
170                         if (end < in->cld_end)
171                                 end = CL_PAGE_EOF;
172                 }
173         }
174         out->cld_start = start;
175         out->cld_end   = end;
176 }
177
178 /**
179  * Adjusts parent lock extent when a sub-lock is attached to a parent. This is
180  * called in two ways:
181  *
182  *     - as part of receive call-back, when server returns granted extent to
183  *       the client, and
184  *
185  *     - when top-lock finds existing sub-lock in the cache.
186  *
187  * Note, that lock mode is not propagated to the parent: i.e., if CLM_READ
188  * top-lock matches CLM_WRITE sub-lock, top-lock is still CLM_READ.
189  */
190 int lov_sublock_modify(const struct lu_env *env, struct lov_lock *lov,
191                        struct lovsub_lock *sublock,
192                        const struct cl_lock_descr *d, int idx)
193 {
194         struct cl_lock       *parent;
195         struct lovsub_object *subobj;
196         struct cl_lock_descr *pd;
197         struct cl_lock_descr *parent_descr;
198         int                result;
199
200         parent       = lov->lls_cl.cls_lock;
201         parent_descr = &parent->cll_descr;
202         LASSERT(cl_lock_mode_match(d->cld_mode, parent_descr->cld_mode));
203
204         subobj = cl2lovsub(sublock->lss_cl.cls_obj);
205         pd     = &lov_env_info(env)->lti_ldescr;
206
207         pd->cld_obj  = parent_descr->cld_obj;
208         pd->cld_mode = parent_descr->cld_mode;
209         pd->cld_gid  = parent_descr->cld_gid;
210         lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
211         lov->lls_sub[idx].sub_got = *d;
212         /*
213          * Notify top-lock about modification, if lock description changes
214          * materially.
215          */
216         if (!cl_lock_ext_match(parent_descr, pd))
217                 result = cl_lock_modify(env, parent, pd);
218         else
219                 result = 0;
220         return result;
221 }
222
223 static int lovsub_lock_modify(const struct lu_env *env,
224                               const struct cl_lock_slice *s,
225                               const struct cl_lock_descr *d)
226 {
227         struct lovsub_lock   *lock   = cl2lovsub_lock(s);
228         struct lov_lock_link *scan;
229         struct lov_lock      *lov;
230         int result                 = 0;
231
232         LASSERT(cl_lock_mode_match(d->cld_mode,
233                                    s->cls_lock->cll_descr.cld_mode));
234         list_for_each_entry(scan, &lock->lss_parents, lll_list) {
235                 int rc;
236
237                 lov = scan->lll_super;
238                 lovsub_parent_lock(env, lov);
239                 rc = lov_sublock_modify(env, lov, lock, d, scan->lll_idx);
240                 lovsub_parent_unlock(env, lov);
241                 result = result ?: rc;
242         }
243         return result;
244 }
245
246 static int lovsub_lock_closure(const struct lu_env *env,
247                                const struct cl_lock_slice *slice,
248                                struct cl_lock_closure *closure)
249 {
250         struct lovsub_lock   *sub;
251         struct cl_lock       *parent;
252         struct lov_lock_link *scan;
253         int                result;
254
255         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
256
257         sub    = cl2lovsub_lock(slice);
258         result = 0;
259
260         list_for_each_entry(scan, &sub->lss_parents, lll_list) {
261                 parent = scan->lll_super->lls_cl.cls_lock;
262                 result = cl_lock_closure_build(env, parent, closure);
263                 if (result != 0)
264                         break;
265         }
266         return result;
267 }
268
269 /**
270  * A helper function for lovsub_lock_delete() that deals with a given parent
271  * top-lock.
272  */
273 static int lovsub_lock_delete_one(const struct lu_env *env,
274                                   struct cl_lock *child, struct lov_lock *lov)
275 {
276         struct cl_lock *parent;
277         int          result;
278
279         parent = lov->lls_cl.cls_lock;
280         if (parent->cll_error)
281                 return 0;
282
283         result = 0;
284         switch (parent->cll_state) {
285         case CLS_ENQUEUED:
286                 /* See LU-1355 for the case that a glimpse lock is
287                  * interrupted by signal */
288                 LASSERT(parent->cll_flags & CLF_CANCELLED);
289                 break;
290         case CLS_QUEUING:
291         case CLS_FREEING:
292                 cl_lock_signal(env, parent);
293                 break;
294         case CLS_INTRANSIT:
295                 /*
296                  * Here lies a problem: a sub-lock is canceled while top-lock
297                  * is being unlocked. Top-lock cannot be moved into CLS_NEW
298                  * state, because unlocking has to succeed eventually by
299                  * placing lock into CLS_CACHED (or failing it), see
300                  * cl_unuse_try(). Nor can top-lock be left in CLS_CACHED
301                  * state, because lov maintains an invariant that all
302                  * sub-locks exist in CLS_CACHED (this allows cached top-lock
303                  * to be reused immediately). Nor can we wait for top-lock
304                  * state to change, because this can be synchronous to the
305                  * current thread.
306                  *
307                  * We know for sure that lov_lock_unuse() will be called at
308                  * least one more time to finish un-using, so leave a mark on
309                  * the top-lock, that will be seen by the next call to
310                  * lov_lock_unuse().
311                  */
312                 if (cl_lock_is_intransit(parent))
313                         lov->lls_cancel_race = 1;
314                 break;
315         case CLS_CACHED:
316                 /*
317                  * if a sub-lock is canceled move its top-lock into CLS_NEW
318                  * state to preserve an invariant that a top-lock in
319                  * CLS_CACHED is immediately ready for re-use (i.e., has all
320                  * sub-locks), and so that next attempt to re-use the top-lock
321                  * enqueues missing sub-lock.
322                  */
323                 cl_lock_state_set(env, parent, CLS_NEW);
324                 /* fall through */
325         case CLS_NEW:
326                 /*
327                  * if last sub-lock is canceled, destroy the top-lock (which
328                  * is now `empty') proactively.
329                  */
330                 if (lov->lls_nr_filled == 0) {
331                         /* ... but unfortunately, this cannot be done easily,
332                          * as cancellation of a top-lock might acquire mutices
333                          * of its other sub-locks, violating lock ordering,
334                          * see cl_lock_{cancel,delete}() preconditions.
335                          *
336                          * To work around this, the mutex of this sub-lock is
337                          * released, top-lock is destroyed, and sub-lock mutex
338                          * acquired again. The list of parents has to be
339                          * re-scanned from the beginning after this.
340                          *
341                          * Only do this if no mutices other than on @child and
342                          * @parent are held by the current thread.
343                          *
344                          * TODO: The lock modal here is too complex, because
345                          * the lock may be canceled and deleted by voluntarily:
346                          *    cl_lock_request
347                          *      -> osc_lock_enqueue_wait
348                          *      -> osc_lock_cancel_wait
349                          *        -> cl_lock_delete
350                          *          -> lovsub_lock_delete
351                          *            -> cl_lock_cancel/delete
352                          *              -> ...
353                          *
354                          * The better choice is to spawn a kernel thread for
355                          * this purpose. -jay
356                          */
357                         if (cl_lock_nr_mutexed(env) == 2) {
358                                 cl_lock_mutex_put(env, child);
359                                 cl_lock_cancel(env, parent);
360                                 cl_lock_delete(env, parent);
361                                 result = 1;
362                         }
363                 }
364                 break;
365         case CLS_HELD:
366                 CL_LOCK_DEBUG(D_ERROR, env, parent, "Delete CLS_HELD lock\n");
367         default:
368                 CERROR("Impossible state: %d\n", parent->cll_state);
369                 LBUG();
370                 break;
371         }
372
373         return result;
374 }
375
376 /**
377  * An implementation of cl_lock_operations::clo_delete() method. This is
378  * invoked in "bottom-to-top" delete, when lock destruction starts from the
379  * sub-lock (e.g, as a result of ldlm lock LRU policy).
380  */
381 static void lovsub_lock_delete(const struct lu_env *env,
382                                const struct cl_lock_slice *slice)
383 {
384         struct cl_lock     *child = slice->cls_lock;
385         struct lovsub_lock *sub   = cl2lovsub_lock(slice);
386         int restart;
387
388         LASSERT(cl_lock_is_mutexed(child));
389
390         /*
391          * Destruction of a sub-lock might take multiple iterations, because
392          * when the last sub-lock of a given top-lock is deleted, top-lock is
393          * canceled proactively, and this requires to release sub-lock
394          * mutex. Once sub-lock mutex has been released, list of its parents
395          * has to be re-scanned from the beginning.
396          */
397         do {
398                 struct lov_lock      *lov;
399                 struct lov_lock_link *scan;
400                 struct lov_lock_link *temp;
401                 struct lov_lock_sub  *subdata;
402
403                 restart = 0;
404                 list_for_each_entry_safe(scan, temp,
405                                              &sub->lss_parents, lll_list) {
406                         lov     = scan->lll_super;
407                         subdata = &lov->lls_sub[scan->lll_idx];
408                         lovsub_parent_lock(env, lov);
409                         subdata->sub_got = subdata->sub_descr;
410                         lov_lock_unlink(env, scan, sub);
411                         restart = lovsub_lock_delete_one(env, child, lov);
412                         lovsub_parent_unlock(env, lov);
413
414                         if (restart) {
415                                 cl_lock_mutex_get(env, child);
416                                 break;
417                         }
418                }
419         } while (restart);
420 }
421
422 static int lovsub_lock_print(const struct lu_env *env, void *cookie,
423                              lu_printer_t p, const struct cl_lock_slice *slice)
424 {
425         struct lovsub_lock   *sub = cl2lovsub_lock(slice);
426         struct lov_lock      *lov;
427         struct lov_lock_link *scan;
428
429         list_for_each_entry(scan, &sub->lss_parents, lll_list) {
430                 lov = scan->lll_super;
431                 (*p)(env, cookie, "[%d %p ", scan->lll_idx, lov);
432                 if (lov != NULL)
433                         cl_lock_descr_print(env, cookie, p,
434                                             &lov->lls_cl.cls_lock->cll_descr);
435                 (*p)(env, cookie, "] ");
436         }
437         return 0;
438 }
439
440 static const struct cl_lock_operations lovsub_lock_ops = {
441         .clo_fini    = lovsub_lock_fini,
442         .clo_state   = lovsub_lock_state,
443         .clo_delete  = lovsub_lock_delete,
444         .clo_modify  = lovsub_lock_modify,
445         .clo_closure = lovsub_lock_closure,
446         .clo_weigh   = lovsub_lock_weigh,
447         .clo_print   = lovsub_lock_print
448 };
449
450 int lovsub_lock_init(const struct lu_env *env, struct cl_object *obj,
451                      struct cl_lock *lock, const struct cl_io *io)
452 {
453         struct lovsub_lock *lsk;
454         int result;
455
456         OBD_SLAB_ALLOC_PTR_GFP(lsk, lovsub_lock_kmem, GFP_NOFS);
457         if (lsk != NULL) {
458                 INIT_LIST_HEAD(&lsk->lss_parents);
459                 cl_lock_slice_add(lock, &lsk->lss_cl, obj, &lovsub_lock_ops);
460                 result = 0;
461         } else
462                 result = -ENOMEM;
463         return result;
464 }
465
466 /** @} lov */