Add the rt linux 4.1.3-rt3 as base
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 # include <linux/module.h>
40
41 #include "../include/lustre_intent.h"
42 #include "../include/obd.h"
43 #include "../include/obd_class.h"
44 #include "../include/lustre_dlm.h"
45 #include "../include/lustre_fid.h"      /* fid_res_name_eq() */
46 #include "../include/lustre_mdc.h"
47 #include "../include/lustre_net.h"
48 #include "../include/lustre_req_layout.h"
49 #include "mdc_internal.h"
50
51 struct mdc_getattr_args {
52         struct obd_export          *ga_exp;
53         struct md_enqueue_info      *ga_minfo;
54         struct ldlm_enqueue_info    *ga_einfo;
55 };
56
57 int it_disposition(struct lookup_intent *it, int flag)
58 {
59         return it->d.lustre.it_disposition & flag;
60 }
61 EXPORT_SYMBOL(it_disposition);
62
63 void it_set_disposition(struct lookup_intent *it, int flag)
64 {
65         it->d.lustre.it_disposition |= flag;
66 }
67 EXPORT_SYMBOL(it_set_disposition);
68
69 void it_clear_disposition(struct lookup_intent *it, int flag)
70 {
71         it->d.lustre.it_disposition &= ~flag;
72 }
73 EXPORT_SYMBOL(it_clear_disposition);
74
75 int it_open_error(int phase, struct lookup_intent *it)
76 {
77         if (it_disposition(it, DISP_OPEN_LEASE)) {
78                 if (phase >= DISP_OPEN_LEASE)
79                         return it->d.lustre.it_status;
80                 else
81                         return 0;
82         }
83         if (it_disposition(it, DISP_OPEN_OPEN)) {
84                 if (phase >= DISP_OPEN_OPEN)
85                         return it->d.lustre.it_status;
86                 else
87                         return 0;
88         }
89
90         if (it_disposition(it, DISP_OPEN_CREATE)) {
91                 if (phase >= DISP_OPEN_CREATE)
92                         return it->d.lustre.it_status;
93                 else
94                         return 0;
95         }
96
97         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
98                 if (phase >= DISP_LOOKUP_EXECD)
99                         return it->d.lustre.it_status;
100                 else
101                         return 0;
102         }
103
104         if (it_disposition(it, DISP_IT_EXECD)) {
105                 if (phase >= DISP_IT_EXECD)
106                         return it->d.lustre.it_status;
107                 else
108                         return 0;
109         }
110         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
111                it->d.lustre.it_status);
112         LBUG();
113         return 0;
114 }
115 EXPORT_SYMBOL(it_open_error);
116
117 /* this must be called on a lockh that is known to have a referenced lock */
118 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
119                       __u64 *bits)
120 {
121         struct ldlm_lock *lock;
122         struct inode *new_inode = data;
123
124         if (bits)
125                 *bits = 0;
126
127         if (!*lockh)
128                 return 0;
129
130         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131
132         LASSERT(lock != NULL);
133         lock_res_and_lock(lock);
134         if (lock->l_resource->lr_lvb_inode &&
135             lock->l_resource->lr_lvb_inode != data) {
136                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
137
138                 LASSERTF(old_inode->i_state & I_FREEING,
139                          "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
140                          old_inode, old_inode->i_ino, old_inode->i_generation,
141                          old_inode->i_state, new_inode, new_inode->i_ino,
142                          new_inode->i_generation);
143         }
144         lock->l_resource->lr_lvb_inode = new_inode;
145         if (bits)
146                 *bits = lock->l_policy_data.l_inodebits.bits;
147
148         unlock_res_and_lock(lock);
149         LDLM_LOCK_PUT(lock);
150
151         return 0;
152 }
153
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
155                            const struct lu_fid *fid, ldlm_type_t type,
156                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
157                            struct lustre_handle *lockh)
158 {
159         struct ldlm_res_id res_id;
160         ldlm_mode_t rc;
161
162         fid_build_reg_res_name(fid, &res_id);
163         /* LU-4405: Clear bits not supported by server */
164         policy->l_inodebits.bits &= exp_connect_ibits(exp);
165         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166                              &res_id, type, policy, mode, lockh, 0);
167         return rc;
168 }
169
170 int mdc_cancel_unused(struct obd_export *exp,
171                       const struct lu_fid *fid,
172                       ldlm_policy_data_t *policy,
173                       ldlm_mode_t mode,
174                       ldlm_cancel_flags_t flags,
175                       void *opaque)
176 {
177         struct ldlm_res_id res_id;
178         struct obd_device *obd = class_exp2obd(exp);
179         int rc;
180
181         fid_build_reg_res_name(fid, &res_id);
182         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
183                                              policy, mode, flags, opaque);
184         return rc;
185 }
186
187 int mdc_null_inode(struct obd_export *exp,
188                    const struct lu_fid *fid)
189 {
190         struct ldlm_res_id res_id;
191         struct ldlm_resource *res;
192         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
193
194         LASSERTF(ns != NULL, "no namespace passed\n");
195
196         fid_build_reg_res_name(fid, &res_id);
197
198         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
199         if (res == NULL)
200                 return 0;
201
202         lock_res(res);
203         res->lr_lvb_inode = NULL;
204         unlock_res(res);
205
206         ldlm_resource_putref(res);
207         return 0;
208 }
209
210 /* find any ldlm lock of the inode in mdc
211  * return 0    not find
212  *      1    find one
213  *      < 0    error */
214 int mdc_find_cbdata(struct obd_export *exp,
215                     const struct lu_fid *fid,
216                     ldlm_iterator_t it, void *data)
217 {
218         struct ldlm_res_id res_id;
219         int rc = 0;
220
221         fid_build_reg_res_name((struct lu_fid *)fid, &res_id);
222         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
223                                    it, data);
224         if (rc == LDLM_ITER_STOP)
225                 return 1;
226         else if (rc == LDLM_ITER_CONTINUE)
227                 return 0;
228         return rc;
229 }
230
231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
232 {
233         /* Don't hold error requests for replay. */
234         if (req->rq_replay) {
235                 spin_lock(&req->rq_lock);
236                 req->rq_replay = 0;
237                 spin_unlock(&req->rq_lock);
238         }
239         if (rc && req->rq_transno != 0) {
240                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
241                 LBUG();
242         }
243 }
244
245 /* Save a large LOV EA into the request buffer so that it is available
246  * for replay.  We don't do this in the initial request because the
247  * original request doesn't need this buffer (at most it sends just the
248  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249  * buffer and may also be difficult to allocate and save a very large
250  * request buffer for each open. (bug 5707)
251  *
252  * OOM here may cause recovery failure if lmm is needed (only for the
253  * original open if the MDS crashed just when this client also OOM'd)
254  * but this is incredibly unlikely, and questionable whether the client
255  * could do MDS recovery under OOM anyways... */
256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257                                 struct mdt_body *body)
258 {
259         int     rc;
260
261         /* FIXME: remove this explicit offset. */
262         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
263                                         body->eadatasize);
264         if (rc) {
265                 CERROR("Can't enlarge segment %d size to %d\n",
266                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
267                 body->valid &= ~OBD_MD_FLEASIZE;
268                 body->eadatasize = 0;
269         }
270 }
271
272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273                                                    struct lookup_intent *it,
274                                                    struct md_op_data *op_data,
275                                                    void *lmm, int lmmsize,
276                                                    void *cb_data)
277 {
278         struct ptlrpc_request *req;
279         struct obd_device     *obddev = class_exp2obd(exp);
280         struct ldlm_intent    *lit;
281         LIST_HEAD(cancels);
282         int                 count = 0;
283         int                 mode;
284         int                 rc;
285
286         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
287
288         /* XXX: openlock is not cancelled for cross-refs. */
289         /* If inode is known, cancel conflicting OPEN locks. */
290         if (fid_is_sane(&op_data->op_fid2)) {
291                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
292                         if (it->it_flags & FMODE_WRITE)
293                                 mode = LCK_EX;
294                         else
295                                 mode = LCK_PR;
296                 } else {
297                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
298                                 mode = LCK_CW;
299                         else if (it->it_flags & __FMODE_EXEC)
300                                 mode = LCK_PR;
301                         else
302                                 mode = LCK_CR;
303                 }
304                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
305                                                 &cancels, mode,
306                                                 MDS_INODELOCK_OPEN);
307         }
308
309         /* If CREATE, cancel parent's UPDATE lock. */
310         if (it->it_op & IT_CREAT)
311                 mode = LCK_EX;
312         else
313                 mode = LCK_CR;
314         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
315                                          &cancels, mode,
316                                          MDS_INODELOCK_UPDATE);
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
319                                    &RQF_LDLM_INTENT_OPEN);
320         if (req == NULL) {
321                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
322                 return ERR_PTR(-ENOMEM);
323         }
324
325         /* parent capability */
326         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
327         /* child capability, reserve the size according to parent capa, it will
328          * be filled after we get the reply */
329         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
330
331         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
332                              op_data->op_namelen + 1);
333         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
334                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
335
336         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
337         if (rc < 0) {
338                 ptlrpc_request_free(req);
339                 return ERR_PTR(rc);
340         }
341
342         spin_lock(&req->rq_lock);
343         req->rq_replay = req->rq_import->imp_replayable;
344         spin_unlock(&req->rq_lock);
345
346         /* pack the intent */
347         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
348         lit->opc = (__u64)it->it_op;
349
350         /* pack the intended request */
351         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
352                       lmmsize);
353
354         /* for remote client, fetch remote perm for current user */
355         if (client_is_remote(exp))
356                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
357                                      sizeof(struct mdt_remote_perm));
358         ptlrpc_request_set_replen(req);
359         return req;
360 }
361
362 static struct ptlrpc_request *
363 mdc_intent_getxattr_pack(struct obd_export *exp,
364                          struct lookup_intent *it,
365                          struct md_op_data *op_data)
366 {
367         struct ptlrpc_request   *req;
368         struct ldlm_intent      *lit;
369         int                     rc, count = 0, maxdata;
370         LIST_HEAD(cancels);
371
372
373
374         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
375                                         &RQF_LDLM_INTENT_GETXATTR);
376         if (req == NULL)
377                 return ERR_PTR(-ENOMEM);
378
379         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
380
381         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 return ERR_PTR(rc);
385         }
386
387         /* pack the intent */
388         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
389         lit->opc = IT_GETXATTR;
390
391         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
392
393         /* pack the intended request */
394         mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
395                         op_data->op_valid, maxdata, -1, 0);
396
397         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
398                                 RCL_SERVER, maxdata);
399
400         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
401                                 RCL_SERVER, maxdata);
402
403         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
404                                 RCL_SERVER, maxdata);
405
406         ptlrpc_request_set_replen(req);
407
408         return req;
409 }
410
411 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
412                                                      struct lookup_intent *it,
413                                                      struct md_op_data *op_data)
414 {
415         struct ptlrpc_request *req;
416         struct obd_device     *obddev = class_exp2obd(exp);
417         struct ldlm_intent    *lit;
418         int                 rc;
419
420         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
421                                    &RQF_LDLM_INTENT_UNLINK);
422         if (req == NULL)
423                 return ERR_PTR(-ENOMEM);
424
425         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
426         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
427                              op_data->op_namelen + 1);
428
429         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
430         if (rc) {
431                 ptlrpc_request_free(req);
432                 return ERR_PTR(rc);
433         }
434
435         /* pack the intent */
436         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
437         lit->opc = (__u64)it->it_op;
438
439         /* pack the intended request */
440         mdc_unlink_pack(req, op_data);
441
442         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
443                              obddev->u.cli.cl_default_mds_easize);
444         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
445                              obddev->u.cli.cl_default_mds_cookiesize);
446         ptlrpc_request_set_replen(req);
447         return req;
448 }
449
450 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
451                                                     struct lookup_intent *it,
452                                                     struct md_op_data *op_data)
453 {
454         struct ptlrpc_request *req;
455         struct obd_device     *obddev = class_exp2obd(exp);
456         u64                    valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
457                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
458                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
459                                        (client_is_remote(exp) ?
460                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
461         struct ldlm_intent    *lit;
462         int                 rc;
463         int                 easize;
464
465         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
466                                    &RQF_LDLM_INTENT_GETATTR);
467         if (req == NULL)
468                 return ERR_PTR(-ENOMEM);
469
470         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
471         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
472                              op_data->op_namelen + 1);
473
474         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
475         if (rc) {
476                 ptlrpc_request_free(req);
477                 return ERR_PTR(rc);
478         }
479
480         /* pack the intent */
481         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
482         lit->opc = (__u64)it->it_op;
483
484         if (obddev->u.cli.cl_default_mds_easize > 0)
485                 easize = obddev->u.cli.cl_default_mds_easize;
486         else
487                 easize = obddev->u.cli.cl_max_mds_easize;
488
489         /* pack the intended request */
490         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
491
492         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
493         if (client_is_remote(exp))
494                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
495                                      sizeof(struct mdt_remote_perm));
496         ptlrpc_request_set_replen(req);
497         return req;
498 }
499
500 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
501                                                      struct lookup_intent *it,
502                                                      struct md_op_data *unused)
503 {
504         struct obd_device     *obd = class_exp2obd(exp);
505         struct ptlrpc_request *req;
506         struct ldlm_intent    *lit;
507         struct layout_intent  *layout;
508         int rc;
509
510         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
511                                 &RQF_LDLM_INTENT_LAYOUT);
512         if (req == NULL)
513                 return ERR_PTR(-ENOMEM);
514
515         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
516         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
517         if (rc) {
518                 ptlrpc_request_free(req);
519                 return ERR_PTR(rc);
520         }
521
522         /* pack the intent */
523         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
524         lit->opc = (__u64)it->it_op;
525
526         /* pack the layout intent request */
527         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
528         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
529          * set for replication */
530         layout->li_opc = LAYOUT_INTENT_ACCESS;
531
532         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
533                              obd->u.cli.cl_default_mds_easize);
534         ptlrpc_request_set_replen(req);
535         return req;
536 }
537
538 static struct ptlrpc_request *
539 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
540 {
541         struct ptlrpc_request *req;
542         int rc;
543
544         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
545         if (req == NULL)
546                 return ERR_PTR(-ENOMEM);
547
548         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
549         if (rc) {
550                 ptlrpc_request_free(req);
551                 return ERR_PTR(rc);
552         }
553
554         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
555         ptlrpc_request_set_replen(req);
556         return req;
557 }
558
559 static int mdc_finish_enqueue(struct obd_export *exp,
560                               struct ptlrpc_request *req,
561                               struct ldlm_enqueue_info *einfo,
562                               struct lookup_intent *it,
563                               struct lustre_handle *lockh,
564                               int rc)
565 {
566         struct req_capsule  *pill = &req->rq_pill;
567         struct ldlm_request *lockreq;
568         struct ldlm_reply   *lockrep;
569         struct lustre_intent_data *intent = &it->d.lustre;
570         struct ldlm_lock    *lock;
571         void            *lvb_data = NULL;
572         int               lvb_len = 0;
573
574         LASSERT(rc >= 0);
575         /* Similarly, if we're going to replay this request, we don't want to
576          * actually get a lock, just perform the intent. */
577         if (req->rq_transno || req->rq_replay) {
578                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
579                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
580         }
581
582         if (rc == ELDLM_LOCK_ABORTED) {
583                 einfo->ei_mode = 0;
584                 memset(lockh, 0, sizeof(*lockh));
585                 rc = 0;
586         } else { /* rc = 0 */
587                 lock = ldlm_handle2lock(lockh);
588                 LASSERT(lock != NULL);
589
590                 /* If the server gave us back a different lock mode, we should
591                  * fix up our variables. */
592                 if (lock->l_req_mode != einfo->ei_mode) {
593                         ldlm_lock_addref(lockh, lock->l_req_mode);
594                         ldlm_lock_decref(lockh, einfo->ei_mode);
595                         einfo->ei_mode = lock->l_req_mode;
596                 }
597                 LDLM_LOCK_PUT(lock);
598         }
599
600         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
601         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
602
603         intent->it_disposition = (int)lockrep->lock_policy_res1;
604         intent->it_status = (int)lockrep->lock_policy_res2;
605         intent->it_lock_mode = einfo->ei_mode;
606         intent->it_lock_handle = lockh->cookie;
607         intent->it_data = req;
608
609         /* Technically speaking rq_transno must already be zero if
610          * it_status is in error, so the check is a bit redundant */
611         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
612                 mdc_clear_replay_flag(req, intent->it_status);
613
614         /* If we're doing an IT_OPEN which did not result in an actual
615          * successful open, then we need to remove the bit which saves
616          * this request for unconditional replay.
617          *
618          * It's important that we do this first!  Otherwise we might exit the
619          * function without doing so, and try to replay a failed create
620          * (bug 3440) */
621         if (it->it_op & IT_OPEN && req->rq_replay &&
622             (!it_disposition(it, DISP_OPEN_OPEN) || intent->it_status != 0))
623                 mdc_clear_replay_flag(req, intent->it_status);
624
625         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
626                   it->it_op, intent->it_disposition, intent->it_status);
627
628         /* We know what to expect, so we do any byte flipping required here */
629         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
630                 struct mdt_body *body;
631
632                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
633                 if (body == NULL) {
634                         CERROR("Can't swab mdt_body\n");
635                         return -EPROTO;
636                 }
637
638                 if (it_disposition(it, DISP_OPEN_OPEN) &&
639                     !it_open_error(DISP_OPEN_OPEN, it)) {
640                         /*
641                          * If this is a successful OPEN request, we need to set
642                          * replay handler and data early, so that if replay
643                          * happens immediately after swabbing below, new reply
644                          * is swabbed by that handler correctly.
645                          */
646                         mdc_set_open_replay_data(NULL, NULL, it);
647                 }
648
649                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
650                         void *eadata;
651
652                         mdc_update_max_ea_from_body(exp, body);
653
654                         /*
655                          * The eadata is opaque; just check that it is there.
656                          * Eventually, obd_unpackmd() will check the contents.
657                          */
658                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
659                                                               body->eadatasize);
660                         if (eadata == NULL)
661                                 return -EPROTO;
662
663                         /* save lvb data and length in case this is for layout
664                          * lock */
665                         lvb_data = eadata;
666                         lvb_len = body->eadatasize;
667
668                         /*
669                          * We save the reply LOV EA in case we have to replay a
670                          * create for recovery.  If we didn't allocate a large
671                          * enough request buffer above we need to reallocate it
672                          * here to hold the actual LOV EA.
673                          *
674                          * To not save LOV EA if request is not going to replay
675                          * (for example error one).
676                          */
677                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
678                                 void *lmm;
679
680                                 if (req_capsule_get_size(pill, &RMF_EADATA,
681                                                          RCL_CLIENT) <
682                                     body->eadatasize)
683                                         mdc_realloc_openmsg(req, body);
684                                 else
685                                         req_capsule_shrink(pill, &RMF_EADATA,
686                                                            body->eadatasize,
687                                                            RCL_CLIENT);
688
689                                 req_capsule_set_size(pill, &RMF_EADATA,
690                                                      RCL_CLIENT,
691                                                      body->eadatasize);
692
693                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
694                                 if (lmm)
695                                         memcpy(lmm, eadata, body->eadatasize);
696                         }
697                 }
698
699                 if (body->valid & OBD_MD_FLRMTPERM) {
700                         struct mdt_remote_perm *perm;
701
702                         LASSERT(client_is_remote(exp));
703                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
704                                                 lustre_swab_mdt_remote_perm);
705                         if (perm == NULL)
706                                 return -EPROTO;
707                 }
708                 if (body->valid & OBD_MD_FLMDSCAPA) {
709                         struct lustre_capa *capa, *p;
710
711                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
712                         if (capa == NULL)
713                                 return -EPROTO;
714
715                         if (it->it_op & IT_OPEN) {
716                                 /* client fid capa will be checked in replay */
717                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
718                                 LASSERT(p);
719                                 *p = *capa;
720                         }
721                 }
722                 if (body->valid & OBD_MD_FLOSSCAPA) {
723                         struct lustre_capa *capa;
724
725                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
726                         if (capa == NULL)
727                                 return -EPROTO;
728                 }
729         } else if (it->it_op & IT_LAYOUT) {
730                 /* maybe the lock was granted right away and layout
731                  * is packed into RMF_DLM_LVB of req */
732                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
733                 if (lvb_len > 0) {
734                         lvb_data = req_capsule_server_sized_get(pill,
735                                                         &RMF_DLM_LVB, lvb_len);
736                         if (lvb_data == NULL)
737                                 return -EPROTO;
738                 }
739         }
740
741         /* fill in stripe data for layout lock */
742         lock = ldlm_handle2lock(lockh);
743         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
744                 void *lmm;
745
746                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
747                         ldlm_it2str(it->it_op), lvb_len);
748
749                 OBD_ALLOC_LARGE(lmm, lvb_len);
750                 if (lmm == NULL) {
751                         LDLM_LOCK_PUT(lock);
752                         return -ENOMEM;
753                 }
754                 memcpy(lmm, lvb_data, lvb_len);
755
756                 /* install lvb_data */
757                 lock_res_and_lock(lock);
758                 if (lock->l_lvb_data == NULL) {
759                         lock->l_lvb_type = LVB_T_LAYOUT;
760                         lock->l_lvb_data = lmm;
761                         lock->l_lvb_len = lvb_len;
762                         lmm = NULL;
763                 }
764                 unlock_res_and_lock(lock);
765                 if (lmm != NULL)
766                         OBD_FREE_LARGE(lmm, lvb_len);
767         }
768         if (lock != NULL)
769                 LDLM_LOCK_PUT(lock);
770
771         return rc;
772 }
773
774 /* We always reserve enough space in the reply packet for a stripe MD, because
775  * we don't know in advance the file type. */
776 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
777                 struct lookup_intent *it, struct md_op_data *op_data,
778                 struct lustre_handle *lockh, void *lmm, int lmmsize,
779                 struct ptlrpc_request **reqp, u64 extra_lock_flags)
780 {
781         static const ldlm_policy_data_t lookup_policy = {
782                 .l_inodebits = { MDS_INODELOCK_LOOKUP }
783         };
784         static const ldlm_policy_data_t update_policy = {
785                 .l_inodebits = { MDS_INODELOCK_UPDATE }
786         };
787         static const ldlm_policy_data_t layout_policy = {
788                 .l_inodebits = { MDS_INODELOCK_LAYOUT }
789         };
790         static const ldlm_policy_data_t getxattr_policy = {
791                 .l_inodebits = { MDS_INODELOCK_XATTR }
792         };
793         ldlm_policy_data_t const *policy = &lookup_policy;
794         struct obd_device *obddev = class_exp2obd(exp);
795         struct ptlrpc_request *req;
796         u64 flags, saved_flags = extra_lock_flags;
797         struct ldlm_res_id res_id;
798         int generation, resends = 0;
799         struct ldlm_reply *lockrep;
800         enum lvb_type lvb_type = LVB_T_NONE;
801         int rc;
802
803         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
804                  einfo->ei_type);
805
806         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
807
808         if (it) {
809                 saved_flags |= LDLM_FL_HAS_INTENT;
810                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
811                         policy = &update_policy;
812                 else if (it->it_op & IT_LAYOUT)
813                         policy = &layout_policy;
814                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
815                         policy = &getxattr_policy;
816         }
817
818         LASSERT(reqp == NULL);
819
820         generation = obddev->u.cli.cl_import->imp_generation;
821 resend:
822         flags = saved_flags;
823         if (!it) {
824                 /* The only way right now is FLOCK, in this case we hide flock
825                    policy as lmm, but lmmsize is 0 */
826                 LASSERT(lmm && lmmsize == 0);
827                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
828                          einfo->ei_type);
829                 policy = (ldlm_policy_data_t *)lmm;
830                 res_id.name[3] = LDLM_FLOCK;
831                 req = NULL;
832         } else if (it->it_op & IT_OPEN) {
833                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
834                                            einfo->ei_cbdata);
835                 policy = &update_policy;
836                 einfo->ei_cbdata = NULL;
837                 lmm = NULL;
838         } else if (it->it_op & IT_UNLINK) {
839                 req = mdc_intent_unlink_pack(exp, it, op_data);
840         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
841                 req = mdc_intent_getattr_pack(exp, it, op_data);
842         } else if (it->it_op & IT_READDIR) {
843                 req = mdc_enqueue_pack(exp, 0);
844         } else if (it->it_op & IT_LAYOUT) {
845                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
846                         return -EOPNOTSUPP;
847                 req = mdc_intent_layout_pack(exp, it, op_data);
848                 lvb_type = LVB_T_LAYOUT;
849         } else if (it->it_op & IT_GETXATTR) {
850                 req = mdc_intent_getxattr_pack(exp, it, op_data);
851         } else {
852                 LBUG();
853                 return -EINVAL;
854         }
855
856         if (IS_ERR(req))
857                 return PTR_ERR(req);
858
859         if (req != NULL && it && it->it_op & IT_CREAT)
860                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
861                  * retry logic */
862                 req->rq_no_retry_einprogress = 1;
863
864         if (resends) {
865                 req->rq_generation_set = 1;
866                 req->rq_import_generation = generation;
867                 req->rq_sent = get_seconds() + resends;
868         }
869
870         /* It is important to obtain rpc_lock first (if applicable), so that
871          * threads that are serialised with rpc_lock are not polluting our
872          * rpcs in flight counter. We do not do flock request limiting, though*/
873         if (it) {
874                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
875                 rc = mdc_enter_request(&obddev->u.cli);
876                 if (rc != 0) {
877                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
878                         mdc_clear_replay_flag(req, 0);
879                         ptlrpc_req_finished(req);
880                         return rc;
881                 }
882         }
883
884         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
885                               0, lvb_type, lockh, 0);
886         if (!it) {
887                 /* For flock requests we immediately return without further
888                    delay and let caller deal with the rest, since rest of
889                    this function metadata processing makes no sense for flock
890                    requests anyway. But in case of problem during comms with
891                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
892                    can not rely on caller and this mainly for F_UNLCKs
893                    (explicits or automatically generated by Kernel to clean
894                    current FLocks upon exit) that can't be trashed */
895                 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
896                         goto resend;
897                 return rc;
898         }
899
900         mdc_exit_request(&obddev->u.cli);
901         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
902
903         if (rc < 0) {
904                 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
905                              "%s: ldlm_cli_enqueue failed: rc = %d\n",
906                              obddev->obd_name, rc);
907
908                 mdc_clear_replay_flag(req, rc);
909                 ptlrpc_req_finished(req);
910                 return rc;
911         }
912
913         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
914         LASSERT(lockrep != NULL);
915
916         lockrep->lock_policy_res2 =
917                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
918
919         /* Retry the create infinitely when we get -EINPROGRESS from
920          * server. This is required by the new quota design. */
921         if (it && it->it_op & IT_CREAT &&
922             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
923                 mdc_clear_replay_flag(req, rc);
924                 ptlrpc_req_finished(req);
925                 resends++;
926
927                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
928                        obddev->obd_name, resends, it->it_op,
929                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
930
931                 if (generation == obddev->u.cli.cl_import->imp_generation) {
932                         goto resend;
933                 } else {
934                         CDEBUG(D_HA, "resend cross eviction\n");
935                         return -EIO;
936                 }
937         }
938
939         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
940         if (rc < 0) {
941                 if (lustre_handle_is_used(lockh)) {
942                         ldlm_lock_decref(lockh, einfo->ei_mode);
943                         memset(lockh, 0, sizeof(*lockh));
944                 }
945                 ptlrpc_req_finished(req);
946
947                 it->d.lustre.it_lock_handle = 0;
948                 it->d.lustre.it_lock_mode = 0;
949                 it->d.lustre.it_data = NULL;
950         }
951
952         return rc;
953 }
954
955 static int mdc_finish_intent_lock(struct obd_export *exp,
956                                   struct ptlrpc_request *request,
957                                   struct md_op_data *op_data,
958                                   struct lookup_intent *it,
959                                   struct lustre_handle *lockh)
960 {
961         struct lustre_handle old_lock;
962         struct mdt_body *mdt_body;
963         struct ldlm_lock *lock;
964         int rc;
965
966         LASSERT(request != NULL);
967         LASSERT(request != LP_POISON);
968         LASSERT(request->rq_repmsg != LP_POISON);
969
970         if (!it_disposition(it, DISP_IT_EXECD)) {
971                 /* The server failed before it even started executing the
972                  * intent, i.e. because it couldn't unpack the request. */
973                 LASSERT(it->d.lustre.it_status != 0);
974                 return it->d.lustre.it_status;
975         }
976         rc = it_open_error(DISP_IT_EXECD, it);
977         if (rc)
978                 return rc;
979
980         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
981         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
982
983         /* If we were revalidating a fid/name pair, mark the intent in
984          * case we fail and get called again from lookup */
985         if (fid_is_sane(&op_data->op_fid2) &&
986             it->it_create_mode & M_CHECK_STALE &&
987             it->it_op != IT_GETATTR) {
988
989                 /* Also: did we find the same inode? */
990                 /* sever can return one of two fids:
991                  * op_fid2 - new allocated fid - if file is created.
992                  * op_fid3 - existent fid - if file only open.
993                  * op_fid3 is saved in lmv_intent_open */
994                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
995                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
996                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
997                                "\n", PFID(&op_data->op_fid2),
998                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
999                         return -ESTALE;
1000                 }
1001         }
1002
1003         rc = it_open_error(DISP_LOOKUP_EXECD, it);
1004         if (rc)
1005                 return rc;
1006
1007         /* keep requests around for the multiple phases of the call
1008          * this shows the DISP_XX must guarantee we make it into the call
1009          */
1010         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1011             it_disposition(it, DISP_OPEN_CREATE) &&
1012             !it_open_error(DISP_OPEN_CREATE, it)) {
1013                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1014                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1015         }
1016         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1017             it_disposition(it, DISP_OPEN_OPEN) &&
1018             !it_open_error(DISP_OPEN_OPEN, it)) {
1019                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1020                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1021                 /* BUG 11546 - eviction in the middle of open rpc processing */
1022                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1023         }
1024
1025         if (it->it_op & IT_CREAT) {
1026                 /* XXX this belongs in ll_create_it */
1027         } else if (it->it_op == IT_OPEN) {
1028                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1029         } else {
1030                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1031         }
1032
1033         /* If we already have a matching lock, then cancel the new
1034          * one.  We have to set the data here instead of in
1035          * mdc_enqueue, because we need to use the child's inode as
1036          * the l_ast_data to match, and that's not available until
1037          * intent_finish has performed the iget().) */
1038         lock = ldlm_handle2lock(lockh);
1039         if (lock) {
1040                 ldlm_policy_data_t policy = lock->l_policy_data;
1041
1042                 LDLM_DEBUG(lock, "matching against this");
1043
1044                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1045                                          &lock->l_resource->lr_name),
1046                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1047                          PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1048                 LDLM_LOCK_PUT(lock);
1049
1050                 memcpy(&old_lock, lockh, sizeof(*lockh));
1051                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1052                                     LDLM_IBITS, &policy, LCK_NL,
1053                                     &old_lock, 0)) {
1054                         ldlm_lock_decref_and_cancel(lockh,
1055                                                     it->d.lustre.it_lock_mode);
1056                         memcpy(lockh, &old_lock, sizeof(old_lock));
1057                         it->d.lustre.it_lock_handle = lockh->cookie;
1058                 }
1059         }
1060         CDEBUG(D_DENTRY,
1061                "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1062                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1063                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1064         return rc;
1065 }
1066
1067 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1068                         struct lu_fid *fid, __u64 *bits)
1069 {
1070         /* We could just return 1 immediately, but since we should only
1071          * be called in revalidate_it if we already have a lock, let's
1072          * verify that. */
1073         struct ldlm_res_id res_id;
1074         struct lustre_handle lockh;
1075         ldlm_policy_data_t policy;
1076         ldlm_mode_t mode;
1077
1078         if (it->d.lustre.it_lock_handle) {
1079                 lockh.cookie = it->d.lustre.it_lock_handle;
1080                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1081         } else {
1082                 fid_build_reg_res_name(fid, &res_id);
1083                 switch (it->it_op) {
1084                 case IT_GETATTR:
1085                         /* File attributes are held under multiple bits:
1086                          * nlink is under lookup lock, size and times are
1087                          * under UPDATE lock and recently we've also got
1088                          * a separate permissions lock for owner/group/acl that
1089                          * were protected by lookup lock before.
1090                          * Getattr must provide all of that information,
1091                          * so we need to ensure we have all of those locks.
1092                          * Unfortunately, if the bits are split across multiple
1093                          * locks, there's no easy way to match all of them here,
1094                          * so an extra RPC would be performed to fetch all
1095                          * of those bits at once for now. */
1096                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1097                          * but for old MDTs (< 2.4), permission is covered
1098                          * by LOOKUP lock, so it needs to match all bits here.*/
1099                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1100                                                   MDS_INODELOCK_LOOKUP |
1101                                                   MDS_INODELOCK_PERM;
1102                         break;
1103                 case IT_LAYOUT:
1104                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1105                         break;
1106                 default:
1107                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1108                         break;
1109                 }
1110
1111                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1112                                        LDLM_IBITS, &policy,
1113                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1114                                       &lockh);
1115         }
1116
1117         if (mode) {
1118                 it->d.lustre.it_lock_handle = lockh.cookie;
1119                 it->d.lustre.it_lock_mode = mode;
1120         } else {
1121                 it->d.lustre.it_lock_handle = 0;
1122                 it->d.lustre.it_lock_mode = 0;
1123         }
1124
1125         return !!mode;
1126 }
1127
1128 /*
1129  * This long block is all about fixing up the lock and request state
1130  * so that it is correct as of the moment _before_ the operation was
1131  * applied; that way, the VFS will think that everything is normal and
1132  * call Lustre's regular VFS methods.
1133  *
1134  * If we're performing a creation, that means that unless the creation
1135  * failed with EEXIST, we should fake up a negative dentry.
1136  *
1137  * For everything else, we want to lookup to succeed.
1138  *
1139  * One additional note: if CREATE or OPEN succeeded, we add an extra
1140  * reference to the request because we need to keep it around until
1141  * ll_create/ll_open gets called.
1142  *
1143  * The server will return to us, in it_disposition, an indication of
1144  * exactly what d.lustre.it_status refers to.
1145  *
1146  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1147  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1148  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1149  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1150  * was successful.
1151  *
1152  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1153  * child lookup.
1154  */
1155 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1156                     void *lmm, int lmmsize, struct lookup_intent *it,
1157                     int lookup_flags, struct ptlrpc_request **reqp,
1158                     ldlm_blocking_callback cb_blocking,
1159                     __u64 extra_lock_flags)
1160 {
1161         struct ldlm_enqueue_info einfo = {
1162                 .ei_type        = LDLM_IBITS,
1163                 .ei_mode        = it_to_lock_mode(it),
1164                 .ei_cb_bl       = cb_blocking,
1165                 .ei_cb_cp       = ldlm_completion_ast,
1166         };
1167         struct lustre_handle lockh;
1168         int rc = 0;
1169
1170         LASSERT(it);
1171
1172         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1173                 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1174                 op_data->op_name, PFID(&op_data->op_fid2),
1175                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1176                 it->it_flags);
1177
1178         lockh.cookie = 0;
1179         if (fid_is_sane(&op_data->op_fid2) &&
1180             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1181                 /* We could just return 1 immediately, but since we should only
1182                  * be called in revalidate_it if we already have a lock, let's
1183                  * verify that. */
1184                 it->d.lustre.it_lock_handle = 0;
1185                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1186                 /* Only return failure if it was not GETATTR by cfid
1187                    (from inode_revalidate) */
1188                 if (rc || op_data->op_namelen != 0)
1189                         return rc;
1190         }
1191
1192         /* For case if upper layer did not alloc fid, do it now. */
1193         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1194                 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1195                 if (rc < 0) {
1196                         CERROR("Can't alloc new fid, rc %d\n", rc);
1197                         return rc;
1198                 }
1199         }
1200         rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1201                          extra_lock_flags);
1202         if (rc < 0)
1203                 return rc;
1204
1205         *reqp = it->d.lustre.it_data;
1206         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1207         return rc;
1208 }
1209
1210 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1211                                               struct ptlrpc_request *req,
1212                                               void *args, int rc)
1213 {
1214         struct mdc_getattr_args  *ga = args;
1215         struct obd_export       *exp = ga->ga_exp;
1216         struct md_enqueue_info   *minfo = ga->ga_minfo;
1217         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1218         struct lookup_intent     *it;
1219         struct lustre_handle     *lockh;
1220         struct obd_device       *obddev;
1221         struct ldlm_reply        *lockrep;
1222         __u64                flags = LDLM_FL_HAS_INTENT;
1223
1224         it    = &minfo->mi_it;
1225         lockh = &minfo->mi_lockh;
1226
1227         obddev = class_exp2obd(exp);
1228
1229         mdc_exit_request(&obddev->u.cli);
1230         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1231                 rc = -ETIMEDOUT;
1232
1233         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1234                                    &flags, NULL, 0, lockh, rc);
1235         if (rc < 0) {
1236                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1237                 mdc_clear_replay_flag(req, rc);
1238                 goto out;
1239         }
1240
1241         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1242         LASSERT(lockrep != NULL);
1243
1244         lockrep->lock_policy_res2 =
1245                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1246
1247         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1248         if (rc)
1249                 goto out;
1250
1251         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1252
1253 out:
1254         OBD_FREE_PTR(einfo);
1255         minfo->mi_cb(req, minfo, rc);
1256         return 0;
1257 }
1258
1259 int mdc_intent_getattr_async(struct obd_export *exp,
1260                              struct md_enqueue_info *minfo,
1261                              struct ldlm_enqueue_info *einfo)
1262 {
1263         struct md_op_data       *op_data = &minfo->mi_data;
1264         struct lookup_intent    *it = &minfo->mi_it;
1265         struct ptlrpc_request   *req;
1266         struct mdc_getattr_args *ga;
1267         struct obd_device       *obddev = class_exp2obd(exp);
1268         struct ldlm_res_id       res_id;
1269         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1270          *     for statahead currently. Consider CMD in future, such two bits
1271          *     maybe managed by different MDS, should be adjusted then. */
1272         ldlm_policy_data_t       policy = {
1273                                         .l_inodebits = { MDS_INODELOCK_LOOKUP |
1274                                                          MDS_INODELOCK_UPDATE }
1275                                  };
1276         int                   rc = 0;
1277         __u64               flags = LDLM_FL_HAS_INTENT;
1278
1279         CDEBUG(D_DLMTRACE,
1280                 "name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1281                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1282                 ldlm_it2str(it->it_op), it->it_flags);
1283
1284         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1285         req = mdc_intent_getattr_pack(exp, it, op_data);
1286         if (IS_ERR(req))
1287                 return PTR_ERR(req);
1288
1289         rc = mdc_enter_request(&obddev->u.cli);
1290         if (rc != 0) {
1291                 ptlrpc_req_finished(req);
1292                 return rc;
1293         }
1294
1295         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1296                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1297         if (rc < 0) {
1298                 mdc_exit_request(&obddev->u.cli);
1299                 ptlrpc_req_finished(req);
1300                 return rc;
1301         }
1302
1303         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1304         ga = ptlrpc_req_async_args(req);
1305         ga->ga_exp = exp;
1306         ga->ga_minfo = minfo;
1307         ga->ga_einfo = einfo;
1308
1309         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1310         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1311
1312         return 0;
1313 }