These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / ldlm / ldlm_lockd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lockd.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #include "../../include/linux/libcfs/libcfs.h"
45 #include "../include/lustre_dlm.h"
46 #include "../include/obd_class.h"
47 #include <linux/list.h>
48 #include "ldlm_internal.h"
49
50 static int ldlm_num_threads;
51 module_param(ldlm_num_threads, int, 0444);
52 MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
53
54 static char *ldlm_cpts;
55 module_param(ldlm_cpts, charp, 0444);
56 MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
57
58 static struct mutex     ldlm_ref_mutex;
59 static int ldlm_refcount;
60
61 static struct kobject *ldlm_kobj;
62 struct kset *ldlm_ns_kset;
63 static struct kset *ldlm_svc_kset;
64
65 struct ldlm_cb_async_args {
66         struct ldlm_cb_set_arg *ca_set_arg;
67         struct ldlm_lock       *ca_lock;
68 };
69
70 /* LDLM state */
71
72 static struct ldlm_state *ldlm_state;
73
74 #define ELT_STOPPED   0
75 #define ELT_READY     1
76 #define ELT_TERMINATE 2
77
78 struct ldlm_bl_pool {
79         spinlock_t              blp_lock;
80
81         /*
82          * blp_prio_list is used for callbacks that should be handled
83          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
84          * see bug 13843
85          */
86         struct list_head              blp_prio_list;
87
88         /*
89          * blp_list is used for all other callbacks which are likely
90          * to take longer to process.
91          */
92         struct list_head              blp_list;
93
94         wait_queue_head_t            blp_waitq;
95         struct completion       blp_comp;
96         atomic_t            blp_num_threads;
97         atomic_t            blp_busy_threads;
98         int                  blp_min_threads;
99         int                  blp_max_threads;
100 };
101
102 struct ldlm_bl_work_item {
103         struct list_head              blwi_entry;
104         struct ldlm_namespace  *blwi_ns;
105         struct ldlm_lock_desc   blwi_ld;
106         struct ldlm_lock       *blwi_lock;
107         struct list_head              blwi_head;
108         int                  blwi_count;
109         struct completion       blwi_comp;
110         ldlm_cancel_flags_t     blwi_flags;
111         int                  blwi_mem_pressure;
112 };
113
114 /**
115  * Callback handler for receiving incoming blocking ASTs.
116  *
117  * This can only happen on client side.
118  */
119 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
120                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
121 {
122         int do_ast;
123
124         LDLM_DEBUG(lock, "client blocking AST callback handler");
125
126         lock_res_and_lock(lock);
127         lock->l_flags |= LDLM_FL_CBPENDING;
128
129         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
130                 lock->l_flags |= LDLM_FL_CANCEL;
131
132         do_ast = !lock->l_readers && !lock->l_writers;
133         unlock_res_and_lock(lock);
134
135         if (do_ast) {
136                 CDEBUG(D_DLMTRACE,
137                        "Lock %p already unused, calling callback (%p)\n", lock,
138                        lock->l_blocking_ast);
139                 if (lock->l_blocking_ast != NULL)
140                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
141                                              LDLM_CB_BLOCKING);
142         } else {
143                 CDEBUG(D_DLMTRACE,
144                        "Lock %p is referenced, will be cancelled later\n",
145                        lock);
146         }
147
148         LDLM_DEBUG(lock, "client blocking callback handler END");
149         LDLM_LOCK_RELEASE(lock);
150 }
151
152 /**
153  * Callback handler for receiving incoming completion ASTs.
154  *
155  * This only can happen on client side.
156  */
157 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
158                                     struct ldlm_namespace *ns,
159                                     struct ldlm_request *dlm_req,
160                                     struct ldlm_lock *lock)
161 {
162         int lvb_len;
163         LIST_HEAD(ast_list);
164         int rc = 0;
165
166         LDLM_DEBUG(lock, "client completion callback handler START");
167
168         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
169                 int to = cfs_time_seconds(1);
170
171                 while (to > 0) {
172                         set_current_state(TASK_INTERRUPTIBLE);
173                         schedule_timeout(to);
174                         if (lock->l_granted_mode == lock->l_req_mode ||
175                             lock->l_flags & LDLM_FL_DESTROYED)
176                                 break;
177                 }
178         }
179
180         lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
181         if (lvb_len < 0) {
182                 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
183                 rc = lvb_len;
184                 goto out;
185         } else if (lvb_len > 0) {
186                 if (lock->l_lvb_len > 0) {
187                         /* for extent lock, lvb contains ost_lvb{}. */
188                         LASSERT(lock->l_lvb_data != NULL);
189
190                         if (unlikely(lock->l_lvb_len < lvb_len)) {
191                                 LDLM_ERROR(lock, "Replied LVB is larger than expectation, expected = %d, replied = %d",
192                                            lock->l_lvb_len, lvb_len);
193                                 rc = -EINVAL;
194                                 goto out;
195                         }
196                 } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
197                                                      * variable length */
198                         void *lvb_data;
199
200                         lvb_data = kzalloc(lvb_len, GFP_NOFS);
201                         if (!lvb_data) {
202                                 LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
203                                 rc = -ENOMEM;
204                                 goto out;
205                         }
206
207                         lock_res_and_lock(lock);
208                         LASSERT(lock->l_lvb_data == NULL);
209                         lock->l_lvb_type = LVB_T_LAYOUT;
210                         lock->l_lvb_data = lvb_data;
211                         lock->l_lvb_len = lvb_len;
212                         unlock_res_and_lock(lock);
213                 }
214         }
215
216         lock_res_and_lock(lock);
217         if ((lock->l_flags & LDLM_FL_DESTROYED) ||
218             lock->l_granted_mode == lock->l_req_mode) {
219                 /* bug 11300: the lock has already been granted */
220                 unlock_res_and_lock(lock);
221                 LDLM_DEBUG(lock, "Double grant race happened");
222                 rc = 0;
223                 goto out;
224         }
225
226         /* If we receive the completion AST before the actual enqueue returned,
227          * then we might need to switch lock modes, resources, or extents. */
228         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
229                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
230                 LDLM_DEBUG(lock, "completion AST, new lock mode");
231         }
232
233         if (lock->l_resource->lr_type != LDLM_PLAIN) {
234                 ldlm_convert_policy_to_local(req->rq_export,
235                                           dlm_req->lock_desc.l_resource.lr_type,
236                                           &dlm_req->lock_desc.l_policy_data,
237                                           &lock->l_policy_data);
238                 LDLM_DEBUG(lock, "completion AST, new policy data");
239         }
240
241         ldlm_resource_unlink_lock(lock);
242         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
243                    &lock->l_resource->lr_name,
244                    sizeof(lock->l_resource->lr_name)) != 0) {
245                 unlock_res_and_lock(lock);
246                 rc = ldlm_lock_change_resource(ns, lock,
247                                 &dlm_req->lock_desc.l_resource.lr_name);
248                 if (rc < 0) {
249                         LDLM_ERROR(lock, "Failed to allocate resource");
250                         goto out;
251                 }
252                 LDLM_DEBUG(lock, "completion AST, new resource");
253                 CERROR("change resource!\n");
254                 lock_res_and_lock(lock);
255         }
256
257         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
258                 /* BL_AST locks are not needed in LRU.
259                  * Let ldlm_cancel_lru() be fast. */
260                 ldlm_lock_remove_from_lru(lock);
261                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
262                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
263         }
264
265         if (lock->l_lvb_len > 0) {
266                 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
267                                    lock->l_lvb_data, lvb_len);
268                 if (rc < 0) {
269                         unlock_res_and_lock(lock);
270                         goto out;
271                 }
272         }
273
274         ldlm_grant_lock(lock, &ast_list);
275         unlock_res_and_lock(lock);
276
277         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
278
279         /* Let Enqueue to call osc_lock_upcall() and initialize
280          * l_ast_data */
281         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
282
283         ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
284
285         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
286                           lock);
287         goto out;
288
289 out:
290         if (rc < 0) {
291                 lock_res_and_lock(lock);
292                 lock->l_flags |= LDLM_FL_FAILED;
293                 unlock_res_and_lock(lock);
294                 wake_up(&lock->l_waitq);
295         }
296         LDLM_LOCK_RELEASE(lock);
297 }
298
299 /**
300  * Callback handler for receiving incoming glimpse ASTs.
301  *
302  * This only can happen on client side.  After handling the glimpse AST
303  * we also consider dropping the lock here if it is unused locally for a
304  * long time.
305  */
306 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
307                                     struct ldlm_namespace *ns,
308                                     struct ldlm_request *dlm_req,
309                                     struct ldlm_lock *lock)
310 {
311         int rc = -ENOSYS;
312
313         LDLM_DEBUG(lock, "client glimpse AST callback handler");
314
315         if (lock->l_glimpse_ast != NULL)
316                 rc = lock->l_glimpse_ast(lock, req);
317
318         if (req->rq_repmsg != NULL) {
319                 ptlrpc_reply(req);
320         } else {
321                 req->rq_status = rc;
322                 ptlrpc_error(req);
323         }
324
325         lock_res_and_lock(lock);
326         if (lock->l_granted_mode == LCK_PW &&
327             !lock->l_readers && !lock->l_writers &&
328             cfs_time_after(cfs_time_current(),
329                            cfs_time_add(lock->l_last_used,
330                                         cfs_time_seconds(10)))) {
331                 unlock_res_and_lock(lock);
332                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
333                         ldlm_handle_bl_callback(ns, NULL, lock);
334
335                 return;
336         }
337         unlock_res_and_lock(lock);
338         LDLM_LOCK_RELEASE(lock);
339 }
340
341 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
342 {
343         if (req->rq_no_reply)
344                 return 0;
345
346         req->rq_status = rc;
347         if (!req->rq_packed_final) {
348                 rc = lustre_pack_reply(req, 1, NULL, NULL);
349                 if (rc)
350                         return rc;
351         }
352         return ptlrpc_reply(req);
353 }
354
355 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
356                                ldlm_cancel_flags_t cancel_flags)
357 {
358         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
359
360         spin_lock(&blp->blp_lock);
361         if (blwi->blwi_lock &&
362             blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
363                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
364                 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
365         } else {
366                 /* other blocking callbacks are added to the regular list */
367                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
368         }
369         spin_unlock(&blp->blp_lock);
370
371         wake_up(&blp->blp_waitq);
372
373         /* can not check blwi->blwi_flags as blwi could be already freed in
374            LCF_ASYNC mode */
375         if (!(cancel_flags & LCF_ASYNC))
376                 wait_for_completion(&blwi->blwi_comp);
377
378         return 0;
379 }
380
381 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
382                              struct ldlm_namespace *ns,
383                              struct ldlm_lock_desc *ld,
384                              struct list_head *cancels, int count,
385                              struct ldlm_lock *lock,
386                              ldlm_cancel_flags_t cancel_flags)
387 {
388         init_completion(&blwi->blwi_comp);
389         INIT_LIST_HEAD(&blwi->blwi_head);
390
391         if (memory_pressure_get())
392                 blwi->blwi_mem_pressure = 1;
393
394         blwi->blwi_ns = ns;
395         blwi->blwi_flags = cancel_flags;
396         if (ld != NULL)
397                 blwi->blwi_ld = *ld;
398         if (count) {
399                 list_add(&blwi->blwi_head, cancels);
400                 list_del_init(cancels);
401                 blwi->blwi_count = count;
402         } else {
403                 blwi->blwi_lock = lock;
404         }
405 }
406
407 /**
408  * Queues a list of locks \a cancels containing \a count locks
409  * for later processing by a blocking thread.  If \a count is zero,
410  * then the lock referenced as \a lock is queued instead.
411  *
412  * The blocking thread would then call ->l_blocking_ast callback in the lock.
413  * If list addition fails an error is returned and caller is supposed to
414  * call ->l_blocking_ast itself.
415  */
416 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
417                              struct ldlm_lock_desc *ld,
418                              struct ldlm_lock *lock,
419                              struct list_head *cancels, int count,
420                              ldlm_cancel_flags_t cancel_flags)
421 {
422         if (cancels && count == 0)
423                 return 0;
424
425         if (cancel_flags & LCF_ASYNC) {
426                 struct ldlm_bl_work_item *blwi;
427
428                 blwi = kzalloc(sizeof(*blwi), GFP_NOFS);
429                 if (!blwi)
430                         return -ENOMEM;
431                 init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
432
433                 return __ldlm_bl_to_thread(blwi, cancel_flags);
434         } else {
435                 /* if it is synchronous call do minimum mem alloc, as it could
436                  * be triggered from kernel shrinker
437                  */
438                 struct ldlm_bl_work_item blwi;
439
440                 memset(&blwi, 0, sizeof(blwi));
441                 init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
442                 return __ldlm_bl_to_thread(&blwi, cancel_flags);
443         }
444 }
445
446 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
447                            struct ldlm_lock *lock)
448 {
449         return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
450 }
451
452 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
453                            struct list_head *cancels, int count,
454                            ldlm_cancel_flags_t cancel_flags)
455 {
456         return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
457 }
458
459 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
460 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
461 {
462         struct obd_device *obd = req->rq_export->exp_obd;
463         char *key;
464         void *val;
465         int keylen, vallen;
466         int rc = -ENOSYS;
467
468         DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
469
470         req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
471
472         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
473         if (key == NULL) {
474                 DEBUG_REQ(D_IOCTL, req, "no set_info key");
475                 return -EFAULT;
476         }
477         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
478                                       RCL_CLIENT);
479         val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
480         if (val == NULL) {
481                 DEBUG_REQ(D_IOCTL, req, "no set_info val");
482                 return -EFAULT;
483         }
484         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
485                                       RCL_CLIENT);
486
487         /* We are responsible for swabbing contents of val */
488
489         if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
490                 /* Pass it on to mdc (the "export" in this case) */
491                 rc = obd_set_info_async(req->rq_svc_thread->t_env,
492                                         req->rq_export,
493                                         sizeof(KEY_HSM_COPYTOOL_SEND),
494                                         KEY_HSM_COPYTOOL_SEND,
495                                         vallen, val, NULL);
496         else
497                 DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
498
499         return rc;
500 }
501
502 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
503                                         const char *msg, int rc,
504                                         struct lustre_handle *handle)
505 {
506         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
507                   "%s: [nid %s] [rc %d] [lock %#llx]",
508                   msg, libcfs_id2str(req->rq_peer), rc,
509                   handle ? handle->cookie : 0);
510         if (req->rq_no_reply)
511                 CWARN("No reply was sent, maybe cause bug 21636.\n");
512         else if (rc)
513                 CWARN("Send reply failed, maybe cause bug 21636.\n");
514 }
515
516 static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
517 {
518         struct obd_quotactl *oqctl;
519         struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
520
521         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
522         if (oqctl == NULL) {
523                 CERROR("Can't unpack obd_quotactl\n");
524                 return -EPROTO;
525         }
526
527         oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat);
528
529         cli->cl_qchk_stat = oqctl->qc_stat;
530         return 0;
531 }
532
533 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
534 static int ldlm_callback_handler(struct ptlrpc_request *req)
535 {
536         struct ldlm_namespace *ns;
537         struct ldlm_request *dlm_req;
538         struct ldlm_lock *lock;
539         int rc;
540
541         /* Requests arrive in sender's byte order.  The ptlrpc service
542          * handler has already checked and, if necessary, byte-swapped the
543          * incoming request message body, but I am responsible for the
544          * message buffers. */
545
546         /* do nothing for sec context finalize */
547         if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
548                 return 0;
549
550         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
551
552         if (req->rq_export == NULL) {
553                 rc = ldlm_callback_reply(req, -ENOTCONN);
554                 ldlm_callback_errmsg(req, "Operate on unconnected server",
555                                      rc, NULL);
556                 return 0;
557         }
558
559         LASSERT(req->rq_export != NULL);
560         LASSERT(req->rq_export->exp_obd != NULL);
561
562         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
563         case LDLM_BL_CALLBACK:
564                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
565                         return 0;
566                 break;
567         case LDLM_CP_CALLBACK:
568                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
569                         return 0;
570                 break;
571         case LDLM_GL_CALLBACK:
572                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
573                         return 0;
574                 break;
575         case LDLM_SET_INFO:
576                 rc = ldlm_handle_setinfo(req);
577                 ldlm_callback_reply(req, rc);
578                 return 0;
579         case OBD_QC_CALLBACK:
580                 req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
581                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
582                         return 0;
583                 rc = ldlm_handle_qc_callback(req);
584                 ldlm_callback_reply(req, rc);
585                 return 0;
586         default:
587                 CERROR("unknown opcode %u\n",
588                        lustre_msg_get_opc(req->rq_reqmsg));
589                 ldlm_callback_reply(req, -EPROTO);
590                 return 0;
591         }
592
593         ns = req->rq_export->exp_obd->obd_namespace;
594         LASSERT(ns != NULL);
595
596         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
597
598         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
599         if (dlm_req == NULL) {
600                 rc = ldlm_callback_reply(req, -EPROTO);
601                 ldlm_callback_errmsg(req, "Operate without parameter", rc,
602                                      NULL);
603                 return 0;
604         }
605
606         /* Force a known safe race, send a cancel to the server for a lock
607          * which the server has already started a blocking callback on. */
608         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
609             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
610                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
611                 if (rc < 0)
612                         CERROR("ldlm_cli_cancel: %d\n", rc);
613         }
614
615         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
616         if (!lock) {
617                 CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock disappeared\n",
618                        dlm_req->lock_handle[0].cookie);
619                 rc = ldlm_callback_reply(req, -EINVAL);
620                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
621                                      &dlm_req->lock_handle[0]);
622                 return 0;
623         }
624
625         if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
626             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
627                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
628
629         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
630         lock_res_and_lock(lock);
631         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
632                                               LDLM_AST_FLAGS);
633         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
634                 /* If somebody cancels lock and cache is already dropped,
635                  * or lock is failed before cp_ast received on client,
636                  * we can tell the server we have no lock. Otherwise, we
637                  * should send cancel after dropping the cache. */
638                 if (((lock->l_flags & LDLM_FL_CANCELING) &&
639                     (lock->l_flags & LDLM_FL_BL_DONE)) ||
640                     (lock->l_flags & LDLM_FL_FAILED)) {
641                         LDLM_DEBUG(lock, "callback on lock %#llx - lock disappeared\n",
642                                    dlm_req->lock_handle[0].cookie);
643                         unlock_res_and_lock(lock);
644                         LDLM_LOCK_RELEASE(lock);
645                         rc = ldlm_callback_reply(req, -EINVAL);
646                         ldlm_callback_errmsg(req, "Operate on stale lock", rc,
647                                              &dlm_req->lock_handle[0]);
648                         return 0;
649                 }
650                 /* BL_AST locks are not needed in LRU.
651                  * Let ldlm_cancel_lru() be fast. */
652                 ldlm_lock_remove_from_lru(lock);
653                 lock->l_flags |= LDLM_FL_BL_AST;
654         }
655         unlock_res_and_lock(lock);
656
657         /* We want the ost thread to get this reply so that it can respond
658          * to ost requests (write cache writeback) that might be triggered
659          * in the callback.
660          *
661          * But we'd also like to be able to indicate in the reply that we're
662          * cancelling right now, because it's unused, or have an intent result
663          * in the reply, so we might have to push the responsibility for sending
664          * the reply down into the AST handlers, alas. */
665
666         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
667         case LDLM_BL_CALLBACK:
668                 CDEBUG(D_INODE, "blocking ast\n");
669                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
670                 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
671                         rc = ldlm_callback_reply(req, 0);
672                         if (req->rq_no_reply || rc)
673                                 ldlm_callback_errmsg(req, "Normal process", rc,
674                                                      &dlm_req->lock_handle[0]);
675                 }
676                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
677                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
678                 break;
679         case LDLM_CP_CALLBACK:
680                 CDEBUG(D_INODE, "completion ast\n");
681                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
682                 ldlm_callback_reply(req, 0);
683                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
684                 break;
685         case LDLM_GL_CALLBACK:
686                 CDEBUG(D_INODE, "glimpse ast\n");
687                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
688                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
689                 break;
690         default:
691                 LBUG();                  /* checked above */
692         }
693
694         return 0;
695 }
696
697 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
698 {
699         struct ldlm_bl_work_item *blwi = NULL;
700         static unsigned int num_bl;
701
702         spin_lock(&blp->blp_lock);
703         /* process a request from the blp_list at least every blp_num_threads */
704         if (!list_empty(&blp->blp_list) &&
705             (list_empty(&blp->blp_prio_list) || num_bl == 0))
706                 blwi = list_entry(blp->blp_list.next,
707                                       struct ldlm_bl_work_item, blwi_entry);
708         else
709                 if (!list_empty(&blp->blp_prio_list))
710                         blwi = list_entry(blp->blp_prio_list.next,
711                                               struct ldlm_bl_work_item,
712                                               blwi_entry);
713
714         if (blwi) {
715                 if (++num_bl >= atomic_read(&blp->blp_num_threads))
716                         num_bl = 0;
717                 list_del(&blwi->blwi_entry);
718         }
719         spin_unlock(&blp->blp_lock);
720
721         return blwi;
722 }
723
724 /* This only contains temporary data until the thread starts */
725 struct ldlm_bl_thread_data {
726         char                    bltd_name[CFS_CURPROC_COMM_MAX];
727         struct ldlm_bl_pool     *bltd_blp;
728         struct completion       bltd_comp;
729         int                     bltd_num;
730 };
731
732 static int ldlm_bl_thread_main(void *arg);
733
734 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
735 {
736         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
737         struct task_struct *task;
738
739         init_completion(&bltd.bltd_comp);
740         bltd.bltd_num = atomic_read(&blp->blp_num_threads);
741         snprintf(bltd.bltd_name, sizeof(bltd.bltd_name),
742                 "ldlm_bl_%02d", bltd.bltd_num);
743         task = kthread_run(ldlm_bl_thread_main, &bltd, "%s", bltd.bltd_name);
744         if (IS_ERR(task)) {
745                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
746                        atomic_read(&blp->blp_num_threads), PTR_ERR(task));
747                 return PTR_ERR(task);
748         }
749         wait_for_completion(&bltd.bltd_comp);
750
751         return 0;
752 }
753
754 /**
755  * Main blocking requests processing thread.
756  *
757  * Callers put locks into its queue by calling ldlm_bl_to_thread.
758  * This thread in the end ends up doing actual call to ->l_blocking_ast
759  * for queued locks.
760  */
761 static int ldlm_bl_thread_main(void *arg)
762 {
763         struct ldlm_bl_pool *blp;
764
765         {
766                 struct ldlm_bl_thread_data *bltd = arg;
767
768                 blp = bltd->bltd_blp;
769
770                 atomic_inc(&blp->blp_num_threads);
771                 atomic_inc(&blp->blp_busy_threads);
772
773                 complete(&bltd->bltd_comp);
774                 /* cannot use bltd after this, it is only on caller's stack */
775         }
776
777         while (1) {
778                 struct l_wait_info lwi = { 0 };
779                 struct ldlm_bl_work_item *blwi = NULL;
780                 int busy;
781
782                 blwi = ldlm_bl_get_work(blp);
783
784                 if (blwi == NULL) {
785                         atomic_dec(&blp->blp_busy_threads);
786                         l_wait_event_exclusive(blp->blp_waitq,
787                                          (blwi = ldlm_bl_get_work(blp)) != NULL,
788                                          &lwi);
789                         busy = atomic_inc_return(&blp->blp_busy_threads);
790                 } else {
791                         busy = atomic_read(&blp->blp_busy_threads);
792                 }
793
794                 if (blwi->blwi_ns == NULL)
795                         /* added by ldlm_cleanup() */
796                         break;
797
798                 /* Not fatal if racy and have a few too many threads */
799                 if (unlikely(busy < blp->blp_max_threads &&
800                              busy >= atomic_read(&blp->blp_num_threads) &&
801                              !blwi->blwi_mem_pressure))
802                         /* discard the return value, we tried */
803                         ldlm_bl_thread_start(blp);
804
805                 if (blwi->blwi_mem_pressure)
806                         memory_pressure_set();
807
808                 if (blwi->blwi_count) {
809                         int count;
810                         /* The special case when we cancel locks in LRU
811                          * asynchronously, we pass the list of locks here.
812                          * Thus locks are marked LDLM_FL_CANCELING, but NOT
813                          * canceled locally yet. */
814                         count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
815                                                            blwi->blwi_count,
816                                                            LCF_BL_AST);
817                         ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
818                                              blwi->blwi_flags);
819                 } else {
820                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
821                                                 blwi->blwi_lock);
822                 }
823                 if (blwi->blwi_mem_pressure)
824                         memory_pressure_clr();
825
826                 if (blwi->blwi_flags & LCF_ASYNC)
827                         kfree(blwi);
828                 else
829                         complete(&blwi->blwi_comp);
830         }
831
832         atomic_dec(&blp->blp_busy_threads);
833         atomic_dec(&blp->blp_num_threads);
834         complete(&blp->blp_comp);
835         return 0;
836 }
837
838 static int ldlm_setup(void);
839 static int ldlm_cleanup(void);
840
841 int ldlm_get_ref(void)
842 {
843         int rc = 0;
844
845         mutex_lock(&ldlm_ref_mutex);
846         if (++ldlm_refcount == 1) {
847                 rc = ldlm_setup();
848                 if (rc)
849                         ldlm_refcount--;
850         }
851         mutex_unlock(&ldlm_ref_mutex);
852
853         return rc;
854 }
855 EXPORT_SYMBOL(ldlm_get_ref);
856
857 void ldlm_put_ref(void)
858 {
859         mutex_lock(&ldlm_ref_mutex);
860         if (ldlm_refcount == 1) {
861                 int rc = ldlm_cleanup();
862
863                 if (rc)
864                         CERROR("ldlm_cleanup failed: %d\n", rc);
865                 else
866                         ldlm_refcount--;
867         } else {
868                 ldlm_refcount--;
869         }
870         mutex_unlock(&ldlm_ref_mutex);
871 }
872 EXPORT_SYMBOL(ldlm_put_ref);
873
874 extern unsigned int ldlm_cancel_unused_locks_before_replay;
875
876 static ssize_t cancel_unused_locks_before_replay_show(struct kobject *kobj,
877                                                       struct attribute *attr,
878                                                       char *buf)
879 {
880         return sprintf(buf, "%d\n", ldlm_cancel_unused_locks_before_replay);
881 }
882
883 static ssize_t cancel_unused_locks_before_replay_store(struct kobject *kobj,
884                                                        struct attribute *attr,
885                                                        const char *buffer,
886                                                        size_t count)
887 {
888         int rc;
889         unsigned long val;
890
891         rc = kstrtoul(buffer, 10, &val);
892         if (rc)
893                 return rc;
894
895         ldlm_cancel_unused_locks_before_replay = val;
896
897         return count;
898 }
899 LUSTRE_RW_ATTR(cancel_unused_locks_before_replay);
900
901 /* These are for root of /sys/fs/lustre/ldlm */
902 static struct attribute *ldlm_attrs[] = {
903         &lustre_attr_cancel_unused_locks_before_replay.attr,
904         NULL,
905 };
906
907 static struct attribute_group ldlm_attr_group = {
908         .attrs = ldlm_attrs,
909 };
910
911 static int ldlm_setup(void)
912 {
913         static struct ptlrpc_service_conf       conf;
914         struct ldlm_bl_pool                     *blp = NULL;
915         int rc = 0;
916         int i;
917
918         if (ldlm_state != NULL)
919                 return -EALREADY;
920
921         ldlm_state = kzalloc(sizeof(*ldlm_state), GFP_NOFS);
922         if (!ldlm_state)
923                 return -ENOMEM;
924
925         ldlm_kobj = kobject_create_and_add("ldlm", lustre_kobj);
926         if (!ldlm_kobj) {
927                 rc = -ENOMEM;
928                 goto out;
929         }
930
931         rc = sysfs_create_group(ldlm_kobj, &ldlm_attr_group);
932         if (rc)
933                 goto out;
934
935         ldlm_ns_kset = kset_create_and_add("namespaces", NULL, ldlm_kobj);
936         if (!ldlm_ns_kset) {
937                 rc = -ENOMEM;
938                 goto out;
939         }
940
941         ldlm_svc_kset = kset_create_and_add("services", NULL, ldlm_kobj);
942         if (!ldlm_svc_kset) {
943                 rc = -ENOMEM;
944                 goto out;
945         }
946
947         rc = ldlm_debugfs_setup();
948         if (rc != 0)
949                 goto out;
950
951         memset(&conf, 0, sizeof(conf));
952         conf = (typeof(conf)) {
953                 .psc_name               = "ldlm_cbd",
954                 .psc_watchdog_factor    = 2,
955                 .psc_buf                = {
956                         .bc_nbufs               = LDLM_CLIENT_NBUFS,
957                         .bc_buf_size            = LDLM_BUFSIZE,
958                         .bc_req_max_size        = LDLM_MAXREQSIZE,
959                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
960                         .bc_req_portal          = LDLM_CB_REQUEST_PORTAL,
961                         .bc_rep_portal          = LDLM_CB_REPLY_PORTAL,
962                 },
963                 .psc_thr                = {
964                         .tc_thr_name            = "ldlm_cb",
965                         .tc_thr_factor          = LDLM_THR_FACTOR,
966                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
967                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
968                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
969                         .tc_nthrs_user          = ldlm_num_threads,
970                         .tc_cpu_affinity        = 1,
971                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
972                 },
973                 .psc_cpt                = {
974                         .cc_pattern             = ldlm_cpts,
975                 },
976                 .psc_ops                = {
977                         .so_req_handler         = ldlm_callback_handler,
978                 },
979         };
980         ldlm_state->ldlm_cb_service =
981                         ptlrpc_register_service(&conf, ldlm_svc_kset,
982                                                 ldlm_svc_debugfs_dir);
983         if (IS_ERR(ldlm_state->ldlm_cb_service)) {
984                 CERROR("failed to start service\n");
985                 rc = PTR_ERR(ldlm_state->ldlm_cb_service);
986                 ldlm_state->ldlm_cb_service = NULL;
987                 goto out;
988         }
989
990         blp = kzalloc(sizeof(*blp), GFP_NOFS);
991         if (!blp) {
992                 rc = -ENOMEM;
993                 goto out;
994         }
995         ldlm_state->ldlm_bl_pool = blp;
996
997         spin_lock_init(&blp->blp_lock);
998         INIT_LIST_HEAD(&blp->blp_list);
999         INIT_LIST_HEAD(&blp->blp_prio_list);
1000         init_waitqueue_head(&blp->blp_waitq);
1001         atomic_set(&blp->blp_num_threads, 0);
1002         atomic_set(&blp->blp_busy_threads, 0);
1003
1004         if (ldlm_num_threads == 0) {
1005                 blp->blp_min_threads = LDLM_NTHRS_INIT;
1006                 blp->blp_max_threads = LDLM_NTHRS_MAX;
1007         } else {
1008                 blp->blp_min_threads = blp->blp_max_threads =
1009                         min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
1010                                                          ldlm_num_threads));
1011         }
1012
1013         for (i = 0; i < blp->blp_min_threads; i++) {
1014                 rc = ldlm_bl_thread_start(blp);
1015                 if (rc < 0)
1016                         goto out;
1017         }
1018
1019         rc = ldlm_pools_init();
1020         if (rc) {
1021                 CERROR("Failed to initialize LDLM pools: %d\n", rc);
1022                 goto out;
1023         }
1024         return 0;
1025
1026  out:
1027         ldlm_cleanup();
1028         return rc;
1029 }
1030
1031 static int ldlm_cleanup(void)
1032 {
1033         if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
1034             !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
1035                 CERROR("ldlm still has namespaces; clean these up first.\n");
1036                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
1037                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
1038                 return -EBUSY;
1039         }
1040
1041         ldlm_pools_fini();
1042
1043         if (ldlm_state->ldlm_bl_pool != NULL) {
1044                 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1045
1046                 while (atomic_read(&blp->blp_num_threads) > 0) {
1047                         struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1048
1049                         init_completion(&blp->blp_comp);
1050
1051                         spin_lock(&blp->blp_lock);
1052                         list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1053                         wake_up(&blp->blp_waitq);
1054                         spin_unlock(&blp->blp_lock);
1055
1056                         wait_for_completion(&blp->blp_comp);
1057                 }
1058
1059                 kfree(blp);
1060         }
1061
1062         if (ldlm_state->ldlm_cb_service != NULL)
1063                 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1064
1065         if (ldlm_ns_kset)
1066                 kset_unregister(ldlm_ns_kset);
1067         if (ldlm_svc_kset)
1068                 kset_unregister(ldlm_svc_kset);
1069         if (ldlm_kobj)
1070                 kobject_put(ldlm_kobj);
1071
1072         ldlm_debugfs_cleanup();
1073
1074         kfree(ldlm_state);
1075         ldlm_state = NULL;
1076
1077         return 0;
1078 }
1079
1080 int ldlm_init(void)
1081 {
1082         mutex_init(&ldlm_ref_mutex);
1083         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
1084         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1085         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1086                                                sizeof(struct ldlm_resource), 0,
1087                                                SLAB_HWCACHE_ALIGN, NULL);
1088         if (ldlm_resource_slab == NULL)
1089                 return -ENOMEM;
1090
1091         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1092                               sizeof(struct ldlm_lock), 0,
1093                               SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
1094         if (ldlm_lock_slab == NULL) {
1095                 kmem_cache_destroy(ldlm_resource_slab);
1096                 return -ENOMEM;
1097         }
1098
1099         ldlm_interval_slab = kmem_cache_create("interval_node",
1100                                         sizeof(struct ldlm_interval),
1101                                         0, SLAB_HWCACHE_ALIGN, NULL);
1102         if (ldlm_interval_slab == NULL) {
1103                 kmem_cache_destroy(ldlm_resource_slab);
1104                 kmem_cache_destroy(ldlm_lock_slab);
1105                 return -ENOMEM;
1106         }
1107 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1108         class_export_dump_hook = ldlm_dump_export_locks;
1109 #endif
1110         return 0;
1111 }
1112
1113 void ldlm_exit(void)
1114 {
1115         if (ldlm_refcount)
1116                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1117         kmem_cache_destroy(ldlm_resource_slab);
1118         /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1119          * synchronize_rcu() to wait a grace period elapsed, so that
1120          * ldlm_lock_free() get a chance to be called. */
1121         synchronize_rcu();
1122         kmem_cache_destroy(ldlm_lock_slab);
1123         kmem_cache_destroy(ldlm_interval_slab);
1124 }