These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 # include <linux/module.h>
40
41 #include "../include/lustre_intent.h"
42 #include "../include/obd.h"
43 #include "../include/obd_class.h"
44 #include "../include/lustre_dlm.h"
45 #include "../include/lustre_fid.h"      /* fid_res_name_eq() */
46 #include "../include/lustre_mdc.h"
47 #include "../include/lustre_net.h"
48 #include "../include/lustre_req_layout.h"
49 #include "mdc_internal.h"
50
51 struct mdc_getattr_args {
52         struct obd_export          *ga_exp;
53         struct md_enqueue_info      *ga_minfo;
54         struct ldlm_enqueue_info    *ga_einfo;
55 };
56
57 int it_disposition(struct lookup_intent *it, int flag)
58 {
59         return it->d.lustre.it_disposition & flag;
60 }
61 EXPORT_SYMBOL(it_disposition);
62
63 void it_set_disposition(struct lookup_intent *it, int flag)
64 {
65         it->d.lustre.it_disposition |= flag;
66 }
67 EXPORT_SYMBOL(it_set_disposition);
68
69 void it_clear_disposition(struct lookup_intent *it, int flag)
70 {
71         it->d.lustre.it_disposition &= ~flag;
72 }
73 EXPORT_SYMBOL(it_clear_disposition);
74
75 int it_open_error(int phase, struct lookup_intent *it)
76 {
77         if (it_disposition(it, DISP_OPEN_LEASE)) {
78                 if (phase >= DISP_OPEN_LEASE)
79                         return it->d.lustre.it_status;
80                 else
81                         return 0;
82         }
83         if (it_disposition(it, DISP_OPEN_OPEN)) {
84                 if (phase >= DISP_OPEN_OPEN)
85                         return it->d.lustre.it_status;
86                 else
87                         return 0;
88         }
89
90         if (it_disposition(it, DISP_OPEN_CREATE)) {
91                 if (phase >= DISP_OPEN_CREATE)
92                         return it->d.lustre.it_status;
93                 else
94                         return 0;
95         }
96
97         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
98                 if (phase >= DISP_LOOKUP_EXECD)
99                         return it->d.lustre.it_status;
100                 else
101                         return 0;
102         }
103
104         if (it_disposition(it, DISP_IT_EXECD)) {
105                 if (phase >= DISP_IT_EXECD)
106                         return it->d.lustre.it_status;
107                 else
108                         return 0;
109         }
110         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
111                it->d.lustre.it_status);
112         LBUG();
113         return 0;
114 }
115 EXPORT_SYMBOL(it_open_error);
116
117 /* this must be called on a lockh that is known to have a referenced lock */
118 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
119                       __u64 *bits)
120 {
121         struct ldlm_lock *lock;
122         struct inode *new_inode = data;
123
124         if (bits)
125                 *bits = 0;
126
127         if (!*lockh)
128                 return 0;
129
130         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131
132         LASSERT(lock != NULL);
133         lock_res_and_lock(lock);
134         if (lock->l_resource->lr_lvb_inode &&
135             lock->l_resource->lr_lvb_inode != data) {
136                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
137
138                 LASSERTF(old_inode->i_state & I_FREEING,
139                          "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
140                          old_inode, old_inode->i_ino, old_inode->i_generation,
141                          old_inode->i_state, new_inode, new_inode->i_ino,
142                          new_inode->i_generation);
143         }
144         lock->l_resource->lr_lvb_inode = new_inode;
145         if (bits)
146                 *bits = lock->l_policy_data.l_inodebits.bits;
147
148         unlock_res_and_lock(lock);
149         LDLM_LOCK_PUT(lock);
150
151         return 0;
152 }
153
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
155                            const struct lu_fid *fid, ldlm_type_t type,
156                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
157                            struct lustre_handle *lockh)
158 {
159         struct ldlm_res_id res_id;
160         ldlm_mode_t rc;
161
162         fid_build_reg_res_name(fid, &res_id);
163         /* LU-4405: Clear bits not supported by server */
164         policy->l_inodebits.bits &= exp_connect_ibits(exp);
165         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166                              &res_id, type, policy, mode, lockh, 0);
167         return rc;
168 }
169
170 int mdc_cancel_unused(struct obd_export *exp,
171                       const struct lu_fid *fid,
172                       ldlm_policy_data_t *policy,
173                       ldlm_mode_t mode,
174                       ldlm_cancel_flags_t flags,
175                       void *opaque)
176 {
177         struct ldlm_res_id res_id;
178         struct obd_device *obd = class_exp2obd(exp);
179         int rc;
180
181         fid_build_reg_res_name(fid, &res_id);
182         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
183                                              policy, mode, flags, opaque);
184         return rc;
185 }
186
187 int mdc_null_inode(struct obd_export *exp,
188                    const struct lu_fid *fid)
189 {
190         struct ldlm_res_id res_id;
191         struct ldlm_resource *res;
192         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
193
194         LASSERTF(ns != NULL, "no namespace passed\n");
195
196         fid_build_reg_res_name(fid, &res_id);
197
198         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
199         if (res == NULL)
200                 return 0;
201
202         lock_res(res);
203         res->lr_lvb_inode = NULL;
204         unlock_res(res);
205
206         ldlm_resource_putref(res);
207         return 0;
208 }
209
210 /* find any ldlm lock of the inode in mdc
211  * return 0    not find
212  *      1    find one
213  *      < 0    error */
214 int mdc_find_cbdata(struct obd_export *exp,
215                     const struct lu_fid *fid,
216                     ldlm_iterator_t it, void *data)
217 {
218         struct ldlm_res_id res_id;
219         int rc = 0;
220
221         fid_build_reg_res_name((struct lu_fid *)fid, &res_id);
222         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
223                                    it, data);
224         if (rc == LDLM_ITER_STOP)
225                 return 1;
226         else if (rc == LDLM_ITER_CONTINUE)
227                 return 0;
228         return rc;
229 }
230
231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
232 {
233         /* Don't hold error requests for replay. */
234         if (req->rq_replay) {
235                 spin_lock(&req->rq_lock);
236                 req->rq_replay = 0;
237                 spin_unlock(&req->rq_lock);
238         }
239         if (rc && req->rq_transno != 0) {
240                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
241                 LBUG();
242         }
243 }
244
245 /* Save a large LOV EA into the request buffer so that it is available
246  * for replay.  We don't do this in the initial request because the
247  * original request doesn't need this buffer (at most it sends just the
248  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249  * buffer and may also be difficult to allocate and save a very large
250  * request buffer for each open. (bug 5707)
251  *
252  * OOM here may cause recovery failure if lmm is needed (only for the
253  * original open if the MDS crashed just when this client also OOM'd)
254  * but this is incredibly unlikely, and questionable whether the client
255  * could do MDS recovery under OOM anyways... */
256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257                                 struct mdt_body *body)
258 {
259         int     rc;
260
261         /* FIXME: remove this explicit offset. */
262         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
263                                         body->eadatasize);
264         if (rc) {
265                 CERROR("Can't enlarge segment %d size to %d\n",
266                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
267                 body->valid &= ~OBD_MD_FLEASIZE;
268                 body->eadatasize = 0;
269         }
270 }
271
272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273                                                    struct lookup_intent *it,
274                                                    struct md_op_data *op_data,
275                                                    void *lmm, int lmmsize,
276                                                    void *cb_data)
277 {
278         struct ptlrpc_request *req;
279         struct obd_device     *obddev = class_exp2obd(exp);
280         struct ldlm_intent    *lit;
281         LIST_HEAD(cancels);
282         int                 count = 0;
283         int                 mode;
284         int                 rc;
285
286         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
287
288         /* XXX: openlock is not cancelled for cross-refs. */
289         /* If inode is known, cancel conflicting OPEN locks. */
290         if (fid_is_sane(&op_data->op_fid2)) {
291                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
292                         if (it->it_flags & FMODE_WRITE)
293                                 mode = LCK_EX;
294                         else
295                                 mode = LCK_PR;
296                 } else {
297                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
298                                 mode = LCK_CW;
299                         else if (it->it_flags & __FMODE_EXEC)
300                                 mode = LCK_PR;
301                         else
302                                 mode = LCK_CR;
303                 }
304                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
305                                                 &cancels, mode,
306                                                 MDS_INODELOCK_OPEN);
307         }
308
309         /* If CREATE, cancel parent's UPDATE lock. */
310         if (it->it_op & IT_CREAT)
311                 mode = LCK_EX;
312         else
313                 mode = LCK_CR;
314         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
315                                          &cancels, mode,
316                                          MDS_INODELOCK_UPDATE);
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
319                                    &RQF_LDLM_INTENT_OPEN);
320         if (req == NULL) {
321                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
322                 return ERR_PTR(-ENOMEM);
323         }
324
325         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
326                              op_data->op_namelen + 1);
327         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
328                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
329
330         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
331         if (rc < 0) {
332                 ptlrpc_request_free(req);
333                 return ERR_PTR(rc);
334         }
335
336         spin_lock(&req->rq_lock);
337         req->rq_replay = req->rq_import->imp_replayable;
338         spin_unlock(&req->rq_lock);
339
340         /* pack the intent */
341         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
342         lit->opc = (__u64)it->it_op;
343
344         /* pack the intended request */
345         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
346                       lmmsize);
347
348         /* for remote client, fetch remote perm for current user */
349         if (client_is_remote(exp))
350                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
351                                      sizeof(struct mdt_remote_perm));
352         ptlrpc_request_set_replen(req);
353         return req;
354 }
355
356 static struct ptlrpc_request *
357 mdc_intent_getxattr_pack(struct obd_export *exp,
358                          struct lookup_intent *it,
359                          struct md_op_data *op_data)
360 {
361         struct ptlrpc_request   *req;
362         struct ldlm_intent      *lit;
363         int                     rc, count = 0, maxdata;
364         LIST_HEAD(cancels);
365
366         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
367                                         &RQF_LDLM_INTENT_GETXATTR);
368         if (req == NULL)
369                 return ERR_PTR(-ENOMEM);
370
371         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
372         if (rc) {
373                 ptlrpc_request_free(req);
374                 return ERR_PTR(rc);
375         }
376
377         /* pack the intent */
378         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
379         lit->opc = IT_GETXATTR;
380
381         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
382
383         /* pack the intended request */
384         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
385                       0);
386
387         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
388                                 RCL_SERVER, maxdata);
389
390         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
391                                 RCL_SERVER, maxdata);
392
393         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
394                                 RCL_SERVER, maxdata);
395
396         ptlrpc_request_set_replen(req);
397
398         return req;
399 }
400
401 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
402                                                      struct lookup_intent *it,
403                                                      struct md_op_data *op_data)
404 {
405         struct ptlrpc_request *req;
406         struct obd_device     *obddev = class_exp2obd(exp);
407         struct ldlm_intent    *lit;
408         int                 rc;
409
410         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
411                                    &RQF_LDLM_INTENT_UNLINK);
412         if (req == NULL)
413                 return ERR_PTR(-ENOMEM);
414
415         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
416                              op_data->op_namelen + 1);
417
418         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
419         if (rc) {
420                 ptlrpc_request_free(req);
421                 return ERR_PTR(rc);
422         }
423
424         /* pack the intent */
425         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
426         lit->opc = (__u64)it->it_op;
427
428         /* pack the intended request */
429         mdc_unlink_pack(req, op_data);
430
431         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
432                              obddev->u.cli.cl_default_mds_easize);
433         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
434                              obddev->u.cli.cl_default_mds_cookiesize);
435         ptlrpc_request_set_replen(req);
436         return req;
437 }
438
439 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
440                                                     struct lookup_intent *it,
441                                                     struct md_op_data *op_data)
442 {
443         struct ptlrpc_request *req;
444         struct obd_device     *obddev = class_exp2obd(exp);
445         u64                    valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
446                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
447                                        OBD_MD_MEA |
448                                        (client_is_remote(exp) ?
449                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
450         struct ldlm_intent    *lit;
451         int                 rc;
452         int                 easize;
453
454         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
455                                    &RQF_LDLM_INTENT_GETATTR);
456         if (req == NULL)
457                 return ERR_PTR(-ENOMEM);
458
459         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
460                              op_data->op_namelen + 1);
461
462         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
463         if (rc) {
464                 ptlrpc_request_free(req);
465                 return ERR_PTR(rc);
466         }
467
468         /* pack the intent */
469         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
470         lit->opc = (__u64)it->it_op;
471
472         if (obddev->u.cli.cl_default_mds_easize > 0)
473                 easize = obddev->u.cli.cl_default_mds_easize;
474         else
475                 easize = obddev->u.cli.cl_max_mds_easize;
476
477         /* pack the intended request */
478         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
479
480         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
481         if (client_is_remote(exp))
482                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
483                                      sizeof(struct mdt_remote_perm));
484         ptlrpc_request_set_replen(req);
485         return req;
486 }
487
488 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
489                                                      struct lookup_intent *it,
490                                                      struct md_op_data *unused)
491 {
492         struct obd_device     *obd = class_exp2obd(exp);
493         struct ptlrpc_request *req;
494         struct ldlm_intent    *lit;
495         struct layout_intent  *layout;
496         int rc;
497
498         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
499                                 &RQF_LDLM_INTENT_LAYOUT);
500         if (req == NULL)
501                 return ERR_PTR(-ENOMEM);
502
503         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
504         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
505         if (rc) {
506                 ptlrpc_request_free(req);
507                 return ERR_PTR(rc);
508         }
509
510         /* pack the intent */
511         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
512         lit->opc = (__u64)it->it_op;
513
514         /* pack the layout intent request */
515         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
516         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
517          * set for replication */
518         layout->li_opc = LAYOUT_INTENT_ACCESS;
519
520         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
521                              obd->u.cli.cl_default_mds_easize);
522         ptlrpc_request_set_replen(req);
523         return req;
524 }
525
526 static struct ptlrpc_request *
527 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
528 {
529         struct ptlrpc_request *req;
530         int rc;
531
532         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
533         if (req == NULL)
534                 return ERR_PTR(-ENOMEM);
535
536         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
537         if (rc) {
538                 ptlrpc_request_free(req);
539                 return ERR_PTR(rc);
540         }
541
542         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
543         ptlrpc_request_set_replen(req);
544         return req;
545 }
546
547 static int mdc_finish_enqueue(struct obd_export *exp,
548                               struct ptlrpc_request *req,
549                               struct ldlm_enqueue_info *einfo,
550                               struct lookup_intent *it,
551                               struct lustre_handle *lockh,
552                               int rc)
553 {
554         struct req_capsule  *pill = &req->rq_pill;
555         struct ldlm_request *lockreq;
556         struct ldlm_reply   *lockrep;
557         struct lustre_intent_data *intent = &it->d.lustre;
558         struct ldlm_lock    *lock;
559         void            *lvb_data = NULL;
560         int               lvb_len = 0;
561
562         LASSERT(rc >= 0);
563         /* Similarly, if we're going to replay this request, we don't want to
564          * actually get a lock, just perform the intent. */
565         if (req->rq_transno || req->rq_replay) {
566                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
567                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
568         }
569
570         if (rc == ELDLM_LOCK_ABORTED) {
571                 einfo->ei_mode = 0;
572                 memset(lockh, 0, sizeof(*lockh));
573                 rc = 0;
574         } else { /* rc = 0 */
575                 lock = ldlm_handle2lock(lockh);
576                 LASSERT(lock != NULL);
577
578                 /* If the server gave us back a different lock mode, we should
579                  * fix up our variables. */
580                 if (lock->l_req_mode != einfo->ei_mode) {
581                         ldlm_lock_addref(lockh, lock->l_req_mode);
582                         ldlm_lock_decref(lockh, einfo->ei_mode);
583                         einfo->ei_mode = lock->l_req_mode;
584                 }
585                 LDLM_LOCK_PUT(lock);
586         }
587
588         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
589         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
590
591         intent->it_disposition = (int)lockrep->lock_policy_res1;
592         intent->it_status = (int)lockrep->lock_policy_res2;
593         intent->it_lock_mode = einfo->ei_mode;
594         intent->it_lock_handle = lockh->cookie;
595         intent->it_data = req;
596
597         /* Technically speaking rq_transno must already be zero if
598          * it_status is in error, so the check is a bit redundant */
599         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
600                 mdc_clear_replay_flag(req, intent->it_status);
601
602         /* If we're doing an IT_OPEN which did not result in an actual
603          * successful open, then we need to remove the bit which saves
604          * this request for unconditional replay.
605          *
606          * It's important that we do this first!  Otherwise we might exit the
607          * function without doing so, and try to replay a failed create
608          * (bug 3440) */
609         if (it->it_op & IT_OPEN && req->rq_replay &&
610             (!it_disposition(it, DISP_OPEN_OPEN) || intent->it_status != 0))
611                 mdc_clear_replay_flag(req, intent->it_status);
612
613         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
614                   it->it_op, intent->it_disposition, intent->it_status);
615
616         /* We know what to expect, so we do any byte flipping required here */
617         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
618                 struct mdt_body *body;
619
620                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
621                 if (body == NULL) {
622                         CERROR("Can't swab mdt_body\n");
623                         return -EPROTO;
624                 }
625
626                 if (it_disposition(it, DISP_OPEN_OPEN) &&
627                     !it_open_error(DISP_OPEN_OPEN, it)) {
628                         /*
629                          * If this is a successful OPEN request, we need to set
630                          * replay handler and data early, so that if replay
631                          * happens immediately after swabbing below, new reply
632                          * is swabbed by that handler correctly.
633                          */
634                         mdc_set_open_replay_data(NULL, NULL, it);
635                 }
636
637                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
638                         void *eadata;
639
640                         mdc_update_max_ea_from_body(exp, body);
641
642                         /*
643                          * The eadata is opaque; just check that it is there.
644                          * Eventually, obd_unpackmd() will check the contents.
645                          */
646                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
647                                                               body->eadatasize);
648                         if (eadata == NULL)
649                                 return -EPROTO;
650
651                         /* save lvb data and length in case this is for layout
652                          * lock */
653                         lvb_data = eadata;
654                         lvb_len = body->eadatasize;
655
656                         /*
657                          * We save the reply LOV EA in case we have to replay a
658                          * create for recovery.  If we didn't allocate a large
659                          * enough request buffer above we need to reallocate it
660                          * here to hold the actual LOV EA.
661                          *
662                          * To not save LOV EA if request is not going to replay
663                          * (for example error one).
664                          */
665                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
666                                 void *lmm;
667
668                                 if (req_capsule_get_size(pill, &RMF_EADATA,
669                                                          RCL_CLIENT) <
670                                     body->eadatasize)
671                                         mdc_realloc_openmsg(req, body);
672                                 else
673                                         req_capsule_shrink(pill, &RMF_EADATA,
674                                                            body->eadatasize,
675                                                            RCL_CLIENT);
676
677                                 req_capsule_set_size(pill, &RMF_EADATA,
678                                                      RCL_CLIENT,
679                                                      body->eadatasize);
680
681                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
682                                 if (lmm)
683                                         memcpy(lmm, eadata, body->eadatasize);
684                         }
685                 }
686
687                 if (body->valid & OBD_MD_FLRMTPERM) {
688                         struct mdt_remote_perm *perm;
689
690                         LASSERT(client_is_remote(exp));
691                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
692                                                 lustre_swab_mdt_remote_perm);
693                         if (perm == NULL)
694                                 return -EPROTO;
695                 }
696         } else if (it->it_op & IT_LAYOUT) {
697                 /* maybe the lock was granted right away and layout
698                  * is packed into RMF_DLM_LVB of req */
699                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
700                 if (lvb_len > 0) {
701                         lvb_data = req_capsule_server_sized_get(pill,
702                                                         &RMF_DLM_LVB, lvb_len);
703                         if (lvb_data == NULL)
704                                 return -EPROTO;
705                 }
706         }
707
708         /* fill in stripe data for layout lock */
709         lock = ldlm_handle2lock(lockh);
710         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
711                 void *lmm;
712
713                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
714                         ldlm_it2str(it->it_op), lvb_len);
715
716                 lmm = libcfs_kvzalloc(lvb_len, GFP_NOFS);
717                 if (lmm == NULL) {
718                         LDLM_LOCK_PUT(lock);
719                         return -ENOMEM;
720                 }
721                 memcpy(lmm, lvb_data, lvb_len);
722
723                 /* install lvb_data */
724                 lock_res_and_lock(lock);
725                 if (lock->l_lvb_data == NULL) {
726                         lock->l_lvb_type = LVB_T_LAYOUT;
727                         lock->l_lvb_data = lmm;
728                         lock->l_lvb_len = lvb_len;
729                         lmm = NULL;
730                 }
731                 unlock_res_and_lock(lock);
732                 if (lmm != NULL)
733                         kvfree(lmm);
734         }
735         if (lock != NULL)
736                 LDLM_LOCK_PUT(lock);
737
738         return rc;
739 }
740
741 /* We always reserve enough space in the reply packet for a stripe MD, because
742  * we don't know in advance the file type. */
743 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
744                 struct lookup_intent *it, struct md_op_data *op_data,
745                 struct lustre_handle *lockh, void *lmm, int lmmsize,
746                 struct ptlrpc_request **reqp, u64 extra_lock_flags)
747 {
748         static const ldlm_policy_data_t lookup_policy = {
749                 .l_inodebits = { MDS_INODELOCK_LOOKUP }
750         };
751         static const ldlm_policy_data_t update_policy = {
752                 .l_inodebits = { MDS_INODELOCK_UPDATE }
753         };
754         static const ldlm_policy_data_t layout_policy = {
755                 .l_inodebits = { MDS_INODELOCK_LAYOUT }
756         };
757         static const ldlm_policy_data_t getxattr_policy = {
758                 .l_inodebits = { MDS_INODELOCK_XATTR }
759         };
760         ldlm_policy_data_t const *policy = &lookup_policy;
761         struct obd_device *obddev = class_exp2obd(exp);
762         struct ptlrpc_request *req;
763         u64 flags, saved_flags = extra_lock_flags;
764         struct ldlm_res_id res_id;
765         int generation, resends = 0;
766         struct ldlm_reply *lockrep;
767         enum lvb_type lvb_type = LVB_T_NONE;
768         int rc;
769
770         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
771                  einfo->ei_type);
772
773         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
774
775         if (it) {
776                 saved_flags |= LDLM_FL_HAS_INTENT;
777                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
778                         policy = &update_policy;
779                 else if (it->it_op & IT_LAYOUT)
780                         policy = &layout_policy;
781                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
782                         policy = &getxattr_policy;
783         }
784
785         LASSERT(reqp == NULL);
786
787         generation = obddev->u.cli.cl_import->imp_generation;
788 resend:
789         flags = saved_flags;
790         if (!it) {
791                 /* The only way right now is FLOCK, in this case we hide flock
792                    policy as lmm, but lmmsize is 0 */
793                 LASSERT(lmm && lmmsize == 0);
794                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
795                          einfo->ei_type);
796                 policy = lmm;
797                 res_id.name[3] = LDLM_FLOCK;
798                 req = NULL;
799         } else if (it->it_op & IT_OPEN) {
800                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
801                                            einfo->ei_cbdata);
802                 policy = &update_policy;
803                 einfo->ei_cbdata = NULL;
804                 lmm = NULL;
805         } else if (it->it_op & IT_UNLINK) {
806                 req = mdc_intent_unlink_pack(exp, it, op_data);
807         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
808                 req = mdc_intent_getattr_pack(exp, it, op_data);
809         } else if (it->it_op & IT_READDIR) {
810                 req = mdc_enqueue_pack(exp, 0);
811         } else if (it->it_op & IT_LAYOUT) {
812                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
813                         return -EOPNOTSUPP;
814                 req = mdc_intent_layout_pack(exp, it, op_data);
815                 lvb_type = LVB_T_LAYOUT;
816         } else if (it->it_op & IT_GETXATTR) {
817                 req = mdc_intent_getxattr_pack(exp, it, op_data);
818         } else {
819                 LBUG();
820                 return -EINVAL;
821         }
822
823         if (IS_ERR(req))
824                 return PTR_ERR(req);
825
826         if (req != NULL && it && it->it_op & IT_CREAT)
827                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
828                  * retry logic */
829                 req->rq_no_retry_einprogress = 1;
830
831         if (resends) {
832                 req->rq_generation_set = 1;
833                 req->rq_import_generation = generation;
834                 req->rq_sent = ktime_get_real_seconds() + resends;
835         }
836
837         /* It is important to obtain rpc_lock first (if applicable), so that
838          * threads that are serialised with rpc_lock are not polluting our
839          * rpcs in flight counter. We do not do flock request limiting, though*/
840         if (it) {
841                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
842                 rc = mdc_enter_request(&obddev->u.cli);
843                 if (rc != 0) {
844                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
845                         mdc_clear_replay_flag(req, 0);
846                         ptlrpc_req_finished(req);
847                         return rc;
848                 }
849         }
850
851         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
852                               0, lvb_type, lockh, 0);
853         if (!it) {
854                 /* For flock requests we immediately return without further
855                    delay and let caller deal with the rest, since rest of
856                    this function metadata processing makes no sense for flock
857                    requests anyway. But in case of problem during comms with
858                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
859                    can not rely on caller and this mainly for F_UNLCKs
860                    (explicits or automatically generated by Kernel to clean
861                    current FLocks upon exit) that can't be trashed */
862                 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
863                         goto resend;
864                 return rc;
865         }
866
867         mdc_exit_request(&obddev->u.cli);
868         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
869
870         if (rc < 0) {
871                 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
872                              "%s: ldlm_cli_enqueue failed: rc = %d\n",
873                              obddev->obd_name, rc);
874
875                 mdc_clear_replay_flag(req, rc);
876                 ptlrpc_req_finished(req);
877                 return rc;
878         }
879
880         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
881         LASSERT(lockrep != NULL);
882
883         lockrep->lock_policy_res2 =
884                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
885
886         /* Retry the create infinitely when we get -EINPROGRESS from
887          * server. This is required by the new quota design. */
888         if (it->it_op & IT_CREAT &&
889             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
890                 mdc_clear_replay_flag(req, rc);
891                 ptlrpc_req_finished(req);
892                 resends++;
893
894                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
895                        obddev->obd_name, resends, it->it_op,
896                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
897
898                 if (generation == obddev->u.cli.cl_import->imp_generation) {
899                         goto resend;
900                 } else {
901                         CDEBUG(D_HA, "resend cross eviction\n");
902                         return -EIO;
903                 }
904         }
905
906         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
907         if (rc < 0) {
908                 if (lustre_handle_is_used(lockh)) {
909                         ldlm_lock_decref(lockh, einfo->ei_mode);
910                         memset(lockh, 0, sizeof(*lockh));
911                 }
912                 ptlrpc_req_finished(req);
913
914                 it->d.lustre.it_lock_handle = 0;
915                 it->d.lustre.it_lock_mode = 0;
916                 it->d.lustre.it_data = NULL;
917         }
918
919         return rc;
920 }
921
922 static int mdc_finish_intent_lock(struct obd_export *exp,
923                                   struct ptlrpc_request *request,
924                                   struct md_op_data *op_data,
925                                   struct lookup_intent *it,
926                                   struct lustre_handle *lockh)
927 {
928         struct lustre_handle old_lock;
929         struct mdt_body *mdt_body;
930         struct ldlm_lock *lock;
931         int rc;
932
933         LASSERT(request != NULL);
934         LASSERT(request != LP_POISON);
935         LASSERT(request->rq_repmsg != LP_POISON);
936
937         if (!it_disposition(it, DISP_IT_EXECD)) {
938                 /* The server failed before it even started executing the
939                  * intent, i.e. because it couldn't unpack the request. */
940                 LASSERT(it->d.lustre.it_status != 0);
941                 return it->d.lustre.it_status;
942         }
943         rc = it_open_error(DISP_IT_EXECD, it);
944         if (rc)
945                 return rc;
946
947         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
948         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
949
950         /* If we were revalidating a fid/name pair, mark the intent in
951          * case we fail and get called again from lookup */
952         if (fid_is_sane(&op_data->op_fid2) &&
953             it->it_create_mode & M_CHECK_STALE &&
954             it->it_op != IT_GETATTR) {
955
956                 /* Also: did we find the same inode? */
957                 /* sever can return one of two fids:
958                  * op_fid2 - new allocated fid - if file is created.
959                  * op_fid3 - existent fid - if file only open.
960                  * op_fid3 is saved in lmv_intent_open */
961                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
962                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
963                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
964                                "\n", PFID(&op_data->op_fid2),
965                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
966                         return -ESTALE;
967                 }
968         }
969
970         rc = it_open_error(DISP_LOOKUP_EXECD, it);
971         if (rc)
972                 return rc;
973
974         /* keep requests around for the multiple phases of the call
975          * this shows the DISP_XX must guarantee we make it into the call
976          */
977         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
978             it_disposition(it, DISP_OPEN_CREATE) &&
979             !it_open_error(DISP_OPEN_CREATE, it)) {
980                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
981                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
982         }
983         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
984             it_disposition(it, DISP_OPEN_OPEN) &&
985             !it_open_error(DISP_OPEN_OPEN, it)) {
986                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
987                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
988                 /* BUG 11546 - eviction in the middle of open rpc processing */
989                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
990         }
991
992         if (it->it_op & IT_CREAT) {
993                 /* XXX this belongs in ll_create_it */
994         } else if (it->it_op == IT_OPEN) {
995                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
996         } else {
997                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
998         }
999
1000         /* If we already have a matching lock, then cancel the new
1001          * one.  We have to set the data here instead of in
1002          * mdc_enqueue, because we need to use the child's inode as
1003          * the l_ast_data to match, and that's not available until
1004          * intent_finish has performed the iget().) */
1005         lock = ldlm_handle2lock(lockh);
1006         if (lock) {
1007                 ldlm_policy_data_t policy = lock->l_policy_data;
1008
1009                 LDLM_DEBUG(lock, "matching against this");
1010
1011                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1012                                          &lock->l_resource->lr_name),
1013                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1014                          PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1015                 LDLM_LOCK_PUT(lock);
1016
1017                 memcpy(&old_lock, lockh, sizeof(*lockh));
1018                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1019                                     LDLM_IBITS, &policy, LCK_NL,
1020                                     &old_lock, 0)) {
1021                         ldlm_lock_decref_and_cancel(lockh,
1022                                                     it->d.lustre.it_lock_mode);
1023                         memcpy(lockh, &old_lock, sizeof(old_lock));
1024                         it->d.lustre.it_lock_handle = lockh->cookie;
1025                 }
1026         }
1027         CDEBUG(D_DENTRY,
1028                "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1029                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1030                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1031         return rc;
1032 }
1033
1034 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1035                         struct lu_fid *fid, __u64 *bits)
1036 {
1037         /* We could just return 1 immediately, but since we should only
1038          * be called in revalidate_it if we already have a lock, let's
1039          * verify that. */
1040         struct ldlm_res_id res_id;
1041         struct lustre_handle lockh;
1042         ldlm_policy_data_t policy;
1043         ldlm_mode_t mode;
1044
1045         if (it->d.lustre.it_lock_handle) {
1046                 lockh.cookie = it->d.lustre.it_lock_handle;
1047                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1048         } else {
1049                 fid_build_reg_res_name(fid, &res_id);
1050                 switch (it->it_op) {
1051                 case IT_GETATTR:
1052                         /* File attributes are held under multiple bits:
1053                          * nlink is under lookup lock, size and times are
1054                          * under UPDATE lock and recently we've also got
1055                          * a separate permissions lock for owner/group/acl that
1056                          * were protected by lookup lock before.
1057                          * Getattr must provide all of that information,
1058                          * so we need to ensure we have all of those locks.
1059                          * Unfortunately, if the bits are split across multiple
1060                          * locks, there's no easy way to match all of them here,
1061                          * so an extra RPC would be performed to fetch all
1062                          * of those bits at once for now. */
1063                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1064                          * but for old MDTs (< 2.4), permission is covered
1065                          * by LOOKUP lock, so it needs to match all bits here.*/
1066                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1067                                                   MDS_INODELOCK_LOOKUP |
1068                                                   MDS_INODELOCK_PERM;
1069                         break;
1070                 case IT_LAYOUT:
1071                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1072                         break;
1073                 default:
1074                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1075                         break;
1076                 }
1077
1078                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1079                                        LDLM_IBITS, &policy,
1080                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1081                                       &lockh);
1082         }
1083
1084         if (mode) {
1085                 it->d.lustre.it_lock_handle = lockh.cookie;
1086                 it->d.lustre.it_lock_mode = mode;
1087         } else {
1088                 it->d.lustre.it_lock_handle = 0;
1089                 it->d.lustre.it_lock_mode = 0;
1090         }
1091
1092         return !!mode;
1093 }
1094
1095 /*
1096  * This long block is all about fixing up the lock and request state
1097  * so that it is correct as of the moment _before_ the operation was
1098  * applied; that way, the VFS will think that everything is normal and
1099  * call Lustre's regular VFS methods.
1100  *
1101  * If we're performing a creation, that means that unless the creation
1102  * failed with EEXIST, we should fake up a negative dentry.
1103  *
1104  * For everything else, we want to lookup to succeed.
1105  *
1106  * One additional note: if CREATE or OPEN succeeded, we add an extra
1107  * reference to the request because we need to keep it around until
1108  * ll_create/ll_open gets called.
1109  *
1110  * The server will return to us, in it_disposition, an indication of
1111  * exactly what d.lustre.it_status refers to.
1112  *
1113  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1114  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1115  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1116  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1117  * was successful.
1118  *
1119  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1120  * child lookup.
1121  */
1122 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1123                     void *lmm, int lmmsize, struct lookup_intent *it,
1124                     int lookup_flags, struct ptlrpc_request **reqp,
1125                     ldlm_blocking_callback cb_blocking,
1126                     __u64 extra_lock_flags)
1127 {
1128         struct ldlm_enqueue_info einfo = {
1129                 .ei_type        = LDLM_IBITS,
1130                 .ei_mode        = it_to_lock_mode(it),
1131                 .ei_cb_bl       = cb_blocking,
1132                 .ei_cb_cp       = ldlm_completion_ast,
1133         };
1134         struct lustre_handle lockh;
1135         int rc = 0;
1136
1137         LASSERT(it);
1138
1139         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1140                 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1141                 op_data->op_name, PFID(&op_data->op_fid2),
1142                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1143                 it->it_flags);
1144
1145         lockh.cookie = 0;
1146         if (fid_is_sane(&op_data->op_fid2) &&
1147             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1148                 /* We could just return 1 immediately, but since we should only
1149                  * be called in revalidate_it if we already have a lock, let's
1150                  * verify that. */
1151                 it->d.lustre.it_lock_handle = 0;
1152                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1153                 /* Only return failure if it was not GETATTR by cfid
1154                    (from inode_revalidate) */
1155                 if (rc || op_data->op_namelen != 0)
1156                         return rc;
1157         }
1158
1159         /* For case if upper layer did not alloc fid, do it now. */
1160         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1161                 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1162                 if (rc < 0) {
1163                         CERROR("Can't alloc new fid, rc %d\n", rc);
1164                         return rc;
1165                 }
1166         }
1167         rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1168                          extra_lock_flags);
1169         if (rc < 0)
1170                 return rc;
1171
1172         *reqp = it->d.lustre.it_data;
1173         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1174         return rc;
1175 }
1176
1177 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1178                                               struct ptlrpc_request *req,
1179                                               void *args, int rc)
1180 {
1181         struct mdc_getattr_args  *ga = args;
1182         struct obd_export       *exp = ga->ga_exp;
1183         struct md_enqueue_info   *minfo = ga->ga_minfo;
1184         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1185         struct lookup_intent     *it;
1186         struct lustre_handle     *lockh;
1187         struct obd_device       *obddev;
1188         struct ldlm_reply        *lockrep;
1189         __u64                flags = LDLM_FL_HAS_INTENT;
1190
1191         it    = &minfo->mi_it;
1192         lockh = &minfo->mi_lockh;
1193
1194         obddev = class_exp2obd(exp);
1195
1196         mdc_exit_request(&obddev->u.cli);
1197         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1198                 rc = -ETIMEDOUT;
1199
1200         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1201                                    &flags, NULL, 0, lockh, rc);
1202         if (rc < 0) {
1203                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1204                 mdc_clear_replay_flag(req, rc);
1205                 goto out;
1206         }
1207
1208         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1209         LASSERT(lockrep != NULL);
1210
1211         lockrep->lock_policy_res2 =
1212                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1213
1214         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1215         if (rc)
1216                 goto out;
1217
1218         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1219
1220 out:
1221         kfree(einfo);
1222         minfo->mi_cb(req, minfo, rc);
1223         return 0;
1224 }
1225
1226 int mdc_intent_getattr_async(struct obd_export *exp,
1227                              struct md_enqueue_info *minfo,
1228                              struct ldlm_enqueue_info *einfo)
1229 {
1230         struct md_op_data       *op_data = &minfo->mi_data;
1231         struct lookup_intent    *it = &minfo->mi_it;
1232         struct ptlrpc_request   *req;
1233         struct mdc_getattr_args *ga;
1234         struct obd_device       *obddev = class_exp2obd(exp);
1235         struct ldlm_res_id       res_id;
1236         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1237          *     for statahead currently. Consider CMD in future, such two bits
1238          *     maybe managed by different MDS, should be adjusted then. */
1239         ldlm_policy_data_t       policy = {
1240                                         .l_inodebits = { MDS_INODELOCK_LOOKUP |
1241                                                          MDS_INODELOCK_UPDATE }
1242                                  };
1243         int                   rc = 0;
1244         __u64               flags = LDLM_FL_HAS_INTENT;
1245
1246         CDEBUG(D_DLMTRACE,
1247                 "name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1248                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1249                 ldlm_it2str(it->it_op), it->it_flags);
1250
1251         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1252         req = mdc_intent_getattr_pack(exp, it, op_data);
1253         if (IS_ERR(req))
1254                 return PTR_ERR(req);
1255
1256         rc = mdc_enter_request(&obddev->u.cli);
1257         if (rc != 0) {
1258                 ptlrpc_req_finished(req);
1259                 return rc;
1260         }
1261
1262         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1263                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1264         if (rc < 0) {
1265                 mdc_exit_request(&obddev->u.cli);
1266                 ptlrpc_req_finished(req);
1267                 return rc;
1268         }
1269
1270         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1271         ga = ptlrpc_req_async_args(req);
1272         ga->ga_exp = exp;
1273         ga->ga_minfo = minfo;
1274         ga->ga_einfo = einfo;
1275
1276         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1277         ptlrpcd_add_req(req);
1278
1279         return 0;
1280 }